Home > Mobile >  How to pause ThreadPool, run function, and resume?
How to pause ThreadPool, run function, and resume?

Time:09-17

The below code is intended to trigger when a file is added to its folder. If the file type is .csv or .txt, it needs to read the file. At first, I was getting message

Message=[Errno 13] Permission denied: 'TestDataFile2021-09-11_15-54.csv'

when trying to read the file. I believe it is because the folder is in use by the worker thread. I have been trying to figure out a way to pause the thread, run the function, and resume monitoring the folder.

Error:

# Loop the data lines
with open(data_file, 'r ') as temp_f:

Where I need to pause:

                      if fType == ".csv" or fType == ".txt":
                          data_file = a_string
                          thread_pool_executor.submit(self.process_csv) # Pause thread, run process_csv, and resume
                          self.process_csv()    

Full code:

import os
import win32file
import win32event
import win32con

from win32com import client
import ctypes
import pandas as pd
import csv
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from openpyxl import load_workbook
import wx

import glob
from os.path import splitext
from concurrent import futures
import sys
import time

thread_pool_executor = futures.ThreadPoolExecutor(max_workers=3)

class MainFrame(wx.Frame):
    def __init__(self, parent, title):
        super(MainFrame, self).__init__(parent, title=title,size=(600,400))

        self.panel = wx.Panel(self)
        self.panel.SetBackgroundColour("light gray")
        #Create sizers
        vbox = wx.BoxSizer(wx.VERTICAL)
        #Create widgets
        st1 = wx.StaticText(self.panel, label='Script is not running.')
        tc = wx.TextCtrl(self.panel)
        btn_start = wx.Button(self.panel, label='Run Script', size=(100, 30))
        btn_start.SetBackgroundColour(wx.Colour(198, 89, 17))
        #self.btn_login.SetFont(self.b_font)
        btn_start.Bind(wx.EVT_BUTTON, self.onStart)

        vbox.Add(st1,-1 ,  wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(btn_start, 0, wx.ALIGN_CENTRE | wx.ALL, 5)
        vbox.Add(tc,2, wx.EXPAND| wx.ALL, 10)

        #Layout
        self.panel.SetSizer(vbox)
        self.Centre()
        self.Show()

    def onStart(self,event):
        print('Listening')
        self._quit = False
        thread_pool_executor.submit(self.monitor_folder)
        thread_pool_executor.submit(self.active_listening)

    def process_csv(self):
        data_file = "TestDataFile2021-09-11_15-54.csv"
        data_file_delimiter = ','
        largest_column_count = 0

        # Loop the data lines
        with open(data_file, 'r ') as temp_f:
            lines = temp_f.readlines()
            for l in lines:
                column_count = len(l.split(data_file_delimiter))   1
                largest_column_count = column_count if largest_column_count < column_count else largest_column_count
        column_names = [i for i in range(0, largest_column_count)]

        # Read csv
        df = pd.read_csv(data_file,  delimiter=data_file_delimiter, names=column_names) #header=None,
        df.fillna("", inplace=True)
        print(df)

    def active_listening(self):
        m = 'Listening'
        i = 1
        while self._quit == False:
            time.sleep(2)
            if i <= 3:
              m = m   "."
              print(m)
              i = i   1
            else:
              i = 1
              m = 'Listening'

    def monitor_folder(self):
        #wx.CallAfter(self.print1)
        while self._quit == False:
            path_to_watch = os.path.abspath (".")

            # FindFirstChangeNotification sets up a handle for watching
            #  file changes. The first parameter is the path to be
            #  watched; the second is a boolean indicating whether the
            #  directories underneath the one specified are to be watched;
            #  the third is a list of flags as to what kind of changes to
            #  watch for. We're just looking at file additions / deletions.
            #
            change_handle = win32file.FindFirstChangeNotification (
              path_to_watch,
              0,
              win32con.FILE_NOTIFY_CHANGE_FILE_NAME
            )
            #
            # Loop forever, listing any file changes. The WaitFor... will
            #  time out every half a second allowing for keyboard interrupts
            #  to terminate the loop.
            try:
              old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
              while 1:
                result = win32event.WaitForSingleObject (change_handle, 500)
                # If the WaitFor... returned because of a notification (as
                #  opposed to timing out or some error) then look for the
                #  changes in the directory contents.
                if result == win32con.WAIT_OBJECT_0:
                  new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])
                  added = [f for f in new_path_contents if not f in old_path_contents]
                  deleted = [f for f in old_path_contents if not f in new_path_contents]
                  if added:
                      print ("Added: ", ", ".join (added))

                      #Get file type
                      a_string = ", ".join (added)
                      length = len(a_string)
                      fType = a_string[length - 4:]

                      if fType == ".csv" or fType == ".txt":
                          data_file = a_string
                          thread_pool_executor.submit(self.process_csv) # Pause thread, run process_csv, and resume
                          self.process_csv()

                      else:
                          print('Not what we want')

                  if deleted: print ("Deleted: ", ", ".join (deleted))

                  old_path_contents = new_path_contents
                  win32file.FindNextChangeNotification (change_handle)

            finally:
              win32file.FindCloseChangeNotification (change_handle)

def main():
    app = wx.App()
    ex = MainFrame(None, title='Border')
    ex.Show()
    app.MainLoop()

if __name__ == '__main__':
    main()

CodePudding user response:

It is not possible to run your program, so this is a bit of speculation.

If it is indeed a race condition within your program to access the file, you can solve it by using locks.

from threading import Lock
....

foo = Lock()    #this is a global level variable

Every time you access the file / directory / other protected resource you will encapsulate the operation with the lock:

with foo:
    with open(data_file, 'r ') as temp_f:
       ...

The important part is to make sure every location in your code that accesses the resource is encapsulated like this. This ensures only one thread can access the resource at any particular time. If a thread attempts to acquire the lock while being held by another, the attempt blocks and only proceeds when the other thread releases the lock.

If the problem is a race condition with an external source, then this does not work. If an external program creates those files your program attempts to read, your operating system may protect the file while being held open for writing by the other process and not allow you to access it. In this case, the correct way is to catch the exception, wait a short while and try again, and keep trying until you succeed.

CodePudding user response:

I was able to figured out a work around. It seems you can't interact with a file while another thread is interacting in the same folder with Win32.

This:

        data_file = "TestDataFile2021-09-11_15-54.csv"
        data_file_delimiter = ','
        largest_column_count = 0

        #with FileLock('TestDataFile2021-09-11_15-54.csv'):
        #with thread_lock:
        #Loop the data lines
        with open(data_file, 'r ') as temp_f:
            lines = temp_f.readlines()
            for l in lines:
                column_count = len(l.split(data_file_delimiter))   1
                largest_column_count = column_count if largest_column_count < column_count else largest_column_count

        column_names = [i for i in range(0, largest_column_count-1)]

        # Read csv
        df = pd.read_csv(data_file,  delimiter=data_file_delimiter, names=column_names) #header=None, # , names=column_names
        df.fillna("", inplace=True)
        df.set_index(0, inplace = True)

Was replaced with:

        df = pd.read_fwf('TestDataFile2021-09-11_15-54.csv', header=None)
        df = df[0].str.split(',', expand=True)
        df.set_index(0, inplace = True)
        df.fillna("", inplace=True)
  • Related