2014-09-04 55 views
1

使用python watchdog文件系统事件监视库我注意到,在Windows Server 2003下使用它时,它进入“轮询模式”,从而停止使用异步操作系统通知,因此大量减少系统在大量文件更改下的性能。看门狗兼容性:“CancelIoEx”的解决方法

我追踪的问题watchdog/observers/winapi.py文件,其中CancelIoEx系统调用是为了使用,当用户想要停止监视监控目录或文件停止ReadDirectoryChangesW电话锁:

(winapi.py)

CancelIoEx = ctypes.windll.kernel32.CancelIoEx 
CancelIoEx.restype = ctypes.wintypes.BOOL 
CancelIoEx.errcheck = _errcheck_bool 
CancelIoEx.argtypes = (
    ctypes.wintypes.HANDLE, # hObject 
    ctypes.POINTER(OVERLAPPED) # lpOverlapped 
) 

... 
... 
... 

def close_directory_handle(handle): 
    try: 
     CancelIoEx(handle, None) # force ReadDirectoryChangesW to return 
    except WindowsError: 
     return 

CancelIoEx调用的问题是,它是不可用,直到Windows Server 2008的: http://msdn.microsoft.com/en-us/library/windows/desktop/aa363792(v=vs.85).aspx

一个POS sible的替代方法是更改​​close_directory_handle以使其在受监视目录内创建模拟文件,从而解锁等待ReadDirectoryChangesW返回的线程。

然而,我注意到,CancelIo系统调用是in fact available在Windows Server 2003:

取消所有未决的输入和输出(I/O)由指定的文件调用线程发出 操作。该函数不取消 取消其他线程针对文件句柄发出的I/O操作。从 取消另一个线程的I/O操作,使用CancelIoEx 函数。

但调用CancelIo不会影响等待的线程。

你对如何解决这个问题有什么想法吗? 可能是threading.enumerate()可以使用发出一个信号来处理由每个线程处理的CancelIo这些处理程序?

+1

我是该行的作者。我相信CancelIo仅用于取消异步IO操作。就好像ReadDirectoryChangesW被设计为不可解释,直到后来添加了CancelIoEx。尽管有一个“简单”解决方案:实现ReadDirectoryChangesW的异步(重叠)版本。然后,如果它按我认为的那样工作,应该可以通过在调用ReadDirectoryChangesW的同一个线程中调用CancelIo来停止它。如果你想完成这项任务,补丁将受到欢迎。 – takoi 2014-09-05 11:10:39

+0

@takoi非常感谢您的评论。我认为(我可能是错误的)'read_directory_changes'可以使用'ReadDirectoryChangesW'的异步模式来实现,如http://msdn.microsoft.com/en-us/library/windows/desktop/aa365465(v=vs .85).aspx通过完成例程检索其结果。每个线程的锁都可以在异步调用完成之后立即获取,并且可以通过完成例程逐个释放,或者同时由'close_directory_handle'获取。 – 2014-09-05 12:55:38

回答

1

自然的方法是执行完成程序并使用其重叠模式调用ReadDirectoryChangesW。下面的示例示出了这样做的方式:

RDCW_CALLBACK_F = ctypes.WINFUNCTYPE(None, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.POINTER(OVERLAPPED)) 

首先,创建一个WINFUNCTYPE工厂将被用来生成(从Windows API调用)C等从Python方法的功能。在这种情况下,没有返回值和3个参数相对应

VOID CALLBACK FileIOCompletionRoutine(
    _In_  DWORD dwErrorCode, 
    _In_  DWORD dwNumberOfBytesTransfered, 
    _Inout_ LPOVERLAPPED lpOverlapped 
); 

FileIOCompletionRoutine header。

回调参考以及重叠的结构需要被添加到ReadDirectoryChangesW参数列表:

ReadDirectoryChangesW = ctypes.windll.kernel32.ReadDirectoryChangesW 

ReadDirectoryChangesW.restype = ctypes.wintypes.BOOL 
ReadDirectoryChangesW.errcheck = _errcheck_bool 
ReadDirectoryChangesW.argtypes = (
    ctypes.wintypes.HANDLE, # hDirectory 
    LPVOID, # lpBuffer 
    ctypes.wintypes.DWORD, # nBufferLength 
    ctypes.wintypes.BOOL, # bWatchSubtree 
    ctypes.wintypes.DWORD, # dwNotifyFilter 
    ctypes.POINTER(ctypes.wintypes.DWORD), # lpBytesReturned 
    ctypes.POINTER(OVERLAPPED), # lpOverlapped 
    RDCW_CALLBACK_F # FileIOCompletionRoutine # lpCompletionRoutine 
) 

从这里,我们已准备好执行重叠的系统调用。 这是一个简单的通话BACL只是usefult来测试一切正常:

def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p): 
    print("dir_change_callback! PID:" + str(os.getpid())) 
    print("CALLBACK THREAD: " + str(threading.currentThread())) 

准备和执行的呼叫:

event_buffer = ctypes.create_string_buffer(BUFFER_SIZE) 
nbytes = ctypes.wintypes.DWORD() 
overlapped_read_dir = OVERLAPPED() 
call2pass = RDCW_CALLBACK_F(dir_change_callback) 

hand = get_directory_handle(os.path.abspath("/test/")) 

def docall(): 
    ReadDirectoryChangesW(hand, ctypes.byref(event_buffer), 
          len(event_buffer), False, 
          WATCHDOG_FILE_NOTIFY_FLAGS, 
          ctypes.byref(nbytes), 
          ctypes.byref(overlapped_read_dir), call2pass) 

print("Waiting!") 
docall() 

如果加载并执行所有这些代码放到一个DreamPie交互shell你可以检查系统调用是否完成,并执行回调,从而在c:\test目录下完成第一次更改后打印线程和PID号。此外,您会注意到这些内容与主线程和进程相同:尽管事件是由单独的线程引发的,但回调与我们的主程序在相同的进程和线程中运行,因此提供了不希望的行为:

lck = threading.Lock() 

def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p): 
    print("dir_change_callback! PID:" + str(os.getpid())) 
    print("CALLBACK THREAD: " + str(threading.currentThread())) 

... 
... 
... 

lck.acquire() 
print("Waiting!") 
docall() 
lck.acquire() 

该程序将锁定主线程,回调将永远不会执行。 我试过很多的同步工具,甚至是Windows API信号灯总是得到同样的行为,因此,最后,我决定使用同步配置来实现ansynchronous呼吁ReadDirectoryChangesW管理一个单独的进程中,并使用multiprocessing Python库同步:

电话到get_directory_handle不会返回由Windows API,但一个由winapi库管理给出的句柄号,为我实现了一个手柄发生器:

class FakeHandleFactory(): 
    _hl = threading.Lock() 
    _next = 0 
    @staticmethod 
    def next(): 
     FakeHandleFactory._hl.acquire() 
     ret = FakeHandleFactory._next 
     FakeHandleFactory._next += 1 
     FakeHandleFactory._hl.release() 
     return ret 

每产生手柄具有与文件系统路径全局相关:

handle2file = {} 

read_directory_changes每个呼叫现在将产生ReadDirectoryRequest(来自multiprocessing.Process派生)对象:

class ReadDirectoryRequest(multiprocessing.Process): 

    def _perform_and_wait4request(self, path, recursive, event_buffer, nbytes): 
     hdl = CreateFileW(path, FILE_LIST_DIRECTORY, WATCHDOG_FILE_SHARE_FLAGS, 
         None, OPEN_EXISTING, WATCHDOG_FILE_FLAGS, None) 
     #print("path: " + path) 
     aux_buffer = ctypes.create_string_buffer(BUFFER_SIZE) 
     aux_n = ctypes.wintypes.DWORD() 
     #print("_perform_and_wait4request! PID:" + str(os.getpid())) 
     #print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------") 
     try: 
      ReadDirectoryChangesW(hdl, ctypes.byref(aux_buffer), 
           len(event_buffer), recursive, 
           WATCHDOG_FILE_NOTIFY_FLAGS, 
           ctypes.byref(aux_n), None, None) 
     except WindowsError as e: 
      print("!" + str(e)) 
      if e.winerror == ERROR_OPERATION_ABORTED: 
       nbytes = 0 
       event_buffer = [] 
      else: 
       nbytes = 0 
       event_buffer = [] 
     # Python 2/3 compat 
     nbytes.value = aux_n.value 
     for i in xrange(self.int_class(aux_n.value)): 
      event_buffer[i] = aux_buffer[i] 
     CloseHandle(hdl) 
     try: 
      self.lck.release() 
     except: 
      pass 



    def __init__(self, handle, recursive): 
     buffer = ctypes.create_string_buffer(BUFFER_SIZE) 
     self.event_buffer = multiprocessing.Array(ctypes.c_char, buffer) 
     self.nbytes = multiprocessing.Value(ctypes.wintypes.DWORD, 0) 
     targetPath = handle2file.get(handle, None) 
     super(ReadDirectoryRequest, self).__init__(target=self._perform_and_wait4request, args=(targetPath, recursive, self.event_buffer, self.nbytes)) 
     self.daemon = True 
     self.lck = multiprocessing.Lock() 
     self.result = None 
     try: 
      self.int_class = long 
     except NameError: 
      self.int_class = int 
     if targetPath is None: 
      self.result = ([], -1) 

    def CancelIo(self): 
     try: 
      self.result = ([], 0) 
      self.lck.release() 
     except: 
      pass 

    def read_changes(self): 
     #print("read_changes! PID:" + str(os.getpid())) 
     #print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------") 
     if self.result is not None: 
      raise Exception("ReadDirectoryRequest object can be used only once!") 
     self.lck.acquire() 
     self.start() 
     self.lck.acquire() 
     self.result = (self.event_buffer, self.int_class(self.nbytes.value)) 
     return self.result 

此类指定Process提供一种执行该系统调用,并等待直到处理(或):

  • 变更事件已经发生。
  • 主线程通过调用ReadDirectoryRequest对象CancelIo方法取消请求。

需要注意的是:

  • get_directory_handle
  • close_directory_handle
  • read_directory_changes

角色现在管理的要求。为此,螺纹锁和辅助数据结构需要:

rqIndexLck = threading.Lock() # Protects the access to `rqIndex` 
rqIndex = {} # Maps handles to request objects sets. 

get_directory_handle

def get_directory_handle(path): 
    rqIndexLck.acquire() 
    ret = FakeHandleFactory.next() 
    handle2file[ret] = path 
    rqIndexLck.release() 
    return ret 

close_directory_handle

def close_directory_handle(handle): 
    rqIndexLck.acquire() 
    rqset4handle = rqIndex.get(handle, None) 
    if rqset4handle is not None: 
     for rq in rqset4handle: 
      rq.CancelIo() 
     del rqIndex[handle] 
    if handle in handle2file: 
     del handle2file[handle] 
    rqIndexLck.release() 

最后但并非最不重要的:read_directory_changes

def read_directory_changes(handle, recursive): 
    rqIndexLck.acquire() 
    rq = ReadDirectoryRequest(handle, recursive) 
    set4handle = None 
    if handle in rqIndex: 
     set4handle = rqIndex[handle] 
    else: 
     set4handle = set() 
     rqIndex[handle] = set4handle 
    set4handle.add(rq) 
    rqIndexLck.release() 
    ret = rq.read_changes() 
    rqIndexLck.acquire() 
    if rq in set4handle: 
     set4handle.remove(rq) 
    rqIndexLck.release() 
    return ret 
+0

你可以发布你的第一个回调模式实现的其余部分吗?我无法让你的榜样工作。首先,必须传递FILE_FLAG_OVERLAPPED *,否则它不会是异步的,即使您将重叠和回调传递给ReadDirectoryChangesW。 – takoi 2014-09-16 11:51:45

+0

关于FILE_FLAG_OVERLAPPED我改变了一个标志常量deffinition:WATCHDOG_FILE_FLAGS = FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED如果你想要我的示例集,我可以发送给你,而不是在这里张贴这么多的代码。 – 2014-09-21 09:25:39

+0

这么多工作和对XP的支持仍然缺失。你完成了吗?有@takoi收到了吗? – user 2015-11-11 04:59:19