Home > database >  Mocking an I/O event in Python
Mocking an I/O event in Python

Time:12-06

My code is listening for file changes in a folder, in class A. When a change occurs, then I trigger a function of class B, which is a field in class A.

class A:
  def __init__(self, b):
    ...
    self.handler = b

  def run(self):
    # listen for changes in a folder using watchdog.observers.Observer
    self.observer.schedule(self.handler, self.input_directory, recursive=True)
        self.observer.start()
        try:
            while not self.stopped:
                time.sleep(self.scanning_frequency)
        except:
            self.observer.stop()

        self.observer.join()

class B(FileSystemEventHandler):
  ...
  def on_any_event(self, event):
    # a change occurred and handled here.

Now what I want to test is that when a file is copied to this folder, then on_any_event should be triggered. This is how I tried to do that:

    def test_file_watcher(self):
        # arrange
        b = B()
        a = A(b)
        a.handler.on_any_event = MagicMock()

        shutil.copy(# copy file to the watched folder)
        p1 = Process(target=a.run)
        p1.start()

        time.sleep(folder scanning frequency   1 second)
        a.stop() # stops watching the folder

        assert a.handler.on_any_event.called
        p1.join()

However this assertion turns out to be false all the time. Where am I doing wrong exactly? Also would it be possible to achieve this by also mocking B completely?


Edit: I think the reason could be that I am using a different process, therefore a.handler.on_any_event.called is always false. But I couldn't figure out how to solve this.

CodePudding user response:

Workaround for your test in a multiprocessing context

I agree with you that multiprocessing causes the failure of the test. I have found a workaround that can help you to do the test in a strange way, but that you can adapt for your needs.

The workaround is based on the use of Sharing Global Variables in Multiprocessing by multiprocessing.Value (see the documentation).

To do this I have defined 2 sharing variables and 2 functions as below:

from multiprocessing import Value

shared_value = Value('i', 0)
stopped = Value('i',0)

# this function is used to substitute method on_any_event() of class B
def on_any_event_spy(event):
    shared_value.value  = 1

# this is the target for Process p1
def f(a: A):
    # substitution of on_any_event method with on_any_event_spy()
    a.handler.on_any_event = on_any_event_spy
    a.run()

Furthermore I have to modify the run() and stop() methods of class A.

New method stop() of class A:

def stop(self):
    stopped.value = 1

Method run() of class A (change only the condition of the while):

def run(self):
    # listen for changes in a folder using watchdog.observers.Observer
    self.observer.schedule(self.handler, self.input_directory, recursive=True)
        self.observer.start()
        try:
            #while not self.stopped:   # <------ comment this instruction
            while stopped.value == 0:   # <----- add this instruction
                time.sleep(self.scanning_frequency)
        except:
            self.observer.stop()

        self.observer.join()

The test method becomes:

class MyTestCase(unittest.TestCase):

    def test_file_watcher(self):

        # arrange
        b = B()
        a = A(b)

        shutil.copy(  # copy file to the watched folder)
        # I have changed yor target value and add args
        p1 = Process(target=f, args=(a, ))
        p1.start()

        time.sleep(a.scanning_frequency   1)
        a.stop()  # stops watching the folder

        # the shared_value value MUST BE > 0
        self.assertGreater(shared_value.value, 0)

        p1.join()

if __name__ == '__main__':
    unittest.main()

How to mock B completely

The previous paragraph of this answer tells that the real problem of this test is the multiprocessing, but if you want mock B completely as you ask in your question, try to change your test_file_watcher() as following:

def test_file_watcher(self):
    # arrange
    #b = B()   # <---------------- ------------- comment this instruction
    b = Mock(wraps=B()) # <--------------------- add this instruction
    a = A(b)
    #a.handler.on_any_event = MagicMock() # <--- comment this instruction

    shutil.copy(# copy file to the watched folder)
    p1 = Process(target=a.run)
    p1.start()

    time.sleep(folder scanning frequency   1 second)
    a.stop() # stops watching the folder

    #assert a.handler.on_any_event.called# <---- comment this instruction
    assert b.on_any_event.called           <---- add this instruction
    p1.join()

I hope that with the instruction:

b = Mock(wraps=B())

you will wrap B completely as you ask in your question and this can be useful for future more traditionally tests.

  • Related