自然的方法是执行完成程序并使用其重叠模式调用ReadDirectoryChangesW
。下面的示例示出了这样做的方式:
RDCW_CALLBACK_F = ctypes.WINFUNCTYPE(None, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.POINTER(OVERLAPPED))
首先,创建一个WINFUNCTYPE工厂将被用来生成(从Windows API调用)C等从Python方法的功能。在这种情况下,没有返回值和3个参数相对应
VOID CALLBACK FileIOCompletionRoutine(
_In_ DWORD dwErrorCode,
_In_ DWORD dwNumberOfBytesTransfered,
_Inout_ LPOVERLAPPED lpOverlapped
);
FileIOCompletionRoutine header。
回调参考以及重叠的结构需要被添加到ReadDirectoryChangesW
参数列表:
ReadDirectoryChangesW = ctypes.windll.kernel32.ReadDirectoryChangesW
ReadDirectoryChangesW.restype = ctypes.wintypes.BOOL
ReadDirectoryChangesW.errcheck = _errcheck_bool
ReadDirectoryChangesW.argtypes = (
ctypes.wintypes.HANDLE, # hDirectory
LPVOID, # lpBuffer
ctypes.wintypes.DWORD, # nBufferLength
ctypes.wintypes.BOOL, # bWatchSubtree
ctypes.wintypes.DWORD, # dwNotifyFilter
ctypes.POINTER(ctypes.wintypes.DWORD), # lpBytesReturned
ctypes.POINTER(OVERLAPPED), # lpOverlapped
RDCW_CALLBACK_F # FileIOCompletionRoutine # lpCompletionRoutine
)
从这里,我们已准备好执行重叠的系统调用。 这是一个简单的通话BACL只是usefult来测试一切正常:
def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p):
print("dir_change_callback! PID:" + str(os.getpid()))
print("CALLBACK THREAD: " + str(threading.currentThread()))
准备和执行的呼叫:
event_buffer = ctypes.create_string_buffer(BUFFER_SIZE)
nbytes = ctypes.wintypes.DWORD()
overlapped_read_dir = OVERLAPPED()
call2pass = RDCW_CALLBACK_F(dir_change_callback)
hand = get_directory_handle(os.path.abspath("/test/"))
def docall():
ReadDirectoryChangesW(hand, ctypes.byref(event_buffer),
len(event_buffer), False,
WATCHDOG_FILE_NOTIFY_FLAGS,
ctypes.byref(nbytes),
ctypes.byref(overlapped_read_dir), call2pass)
print("Waiting!")
docall()
如果加载并执行所有这些代码放到一个DreamPie交互shell你可以检查系统调用是否完成,并执行回调,从而在c:\test
目录下完成第一次更改后打印线程和PID号。此外,您会注意到这些内容与主线程和进程相同:尽管事件是由单独的线程引发的,但回调与我们的主程序在相同的进程和线程中运行,因此提供了不希望的行为:
lck = threading.Lock()
def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p):
print("dir_change_callback! PID:" + str(os.getpid()))
print("CALLBACK THREAD: " + str(threading.currentThread()))
...
...
...
lck.acquire()
print("Waiting!")
docall()
lck.acquire()
该程序将锁定主线程,回调将永远不会执行。 我试过很多的同步工具,甚至是Windows API信号灯总是得到同样的行为,因此,最后,我决定使用同步配置来实现ansynchronous呼吁ReadDirectoryChangesW
管理一个单独的进程中,并使用multiprocessing
Python库同步:
电话到get_directory_handle
不会返回由Windows API,但一个由winapi
库管理给出的句柄号,为我实现了一个手柄发生器:
class FakeHandleFactory():
_hl = threading.Lock()
_next = 0
@staticmethod
def next():
FakeHandleFactory._hl.acquire()
ret = FakeHandleFactory._next
FakeHandleFactory._next += 1
FakeHandleFactory._hl.release()
return ret
每产生手柄具有与文件系统路径全局相关:
handle2file = {}
到read_directory_changes
每个呼叫现在将产生ReadDirectoryRequest
(来自multiprocessing.Process
派生)对象:
class ReadDirectoryRequest(multiprocessing.Process):
def _perform_and_wait4request(self, path, recursive, event_buffer, nbytes):
hdl = CreateFileW(path, FILE_LIST_DIRECTORY, WATCHDOG_FILE_SHARE_FLAGS,
None, OPEN_EXISTING, WATCHDOG_FILE_FLAGS, None)
#print("path: " + path)
aux_buffer = ctypes.create_string_buffer(BUFFER_SIZE)
aux_n = ctypes.wintypes.DWORD()
#print("_perform_and_wait4request! PID:" + str(os.getpid()))
#print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------")
try:
ReadDirectoryChangesW(hdl, ctypes.byref(aux_buffer),
len(event_buffer), recursive,
WATCHDOG_FILE_NOTIFY_FLAGS,
ctypes.byref(aux_n), None, None)
except WindowsError as e:
print("!" + str(e))
if e.winerror == ERROR_OPERATION_ABORTED:
nbytes = 0
event_buffer = []
else:
nbytes = 0
event_buffer = []
# Python 2/3 compat
nbytes.value = aux_n.value
for i in xrange(self.int_class(aux_n.value)):
event_buffer[i] = aux_buffer[i]
CloseHandle(hdl)
try:
self.lck.release()
except:
pass
def __init__(self, handle, recursive):
buffer = ctypes.create_string_buffer(BUFFER_SIZE)
self.event_buffer = multiprocessing.Array(ctypes.c_char, buffer)
self.nbytes = multiprocessing.Value(ctypes.wintypes.DWORD, 0)
targetPath = handle2file.get(handle, None)
super(ReadDirectoryRequest, self).__init__(target=self._perform_and_wait4request, args=(targetPath, recursive, self.event_buffer, self.nbytes))
self.daemon = True
self.lck = multiprocessing.Lock()
self.result = None
try:
self.int_class = long
except NameError:
self.int_class = int
if targetPath is None:
self.result = ([], -1)
def CancelIo(self):
try:
self.result = ([], 0)
self.lck.release()
except:
pass
def read_changes(self):
#print("read_changes! PID:" + str(os.getpid()))
#print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------")
if self.result is not None:
raise Exception("ReadDirectoryRequest object can be used only once!")
self.lck.acquire()
self.start()
self.lck.acquire()
self.result = (self.event_buffer, self.int_class(self.nbytes.value))
return self.result
此类指定Process
提供一种执行该系统调用,并等待直到处理(或):
- 变更事件已经发生。
- 主线程通过调用
ReadDirectoryRequest
对象CancelIo
方法取消请求。
需要注意的是:
- get_directory_handle
- close_directory_handle
- read_directory_changes
角色现在管理的要求。为此,螺纹锁和辅助数据结构需要:
rqIndexLck = threading.Lock() # Protects the access to `rqIndex`
rqIndex = {} # Maps handles to request objects sets.
get_directory_handle
def get_directory_handle(path):
rqIndexLck.acquire()
ret = FakeHandleFactory.next()
handle2file[ret] = path
rqIndexLck.release()
return ret
close_directory_handle
def close_directory_handle(handle):
rqIndexLck.acquire()
rqset4handle = rqIndex.get(handle, None)
if rqset4handle is not None:
for rq in rqset4handle:
rq.CancelIo()
del rqIndex[handle]
if handle in handle2file:
del handle2file[handle]
rqIndexLck.release()
最后但并非最不重要的:read_directory_changes
def read_directory_changes(handle, recursive):
rqIndexLck.acquire()
rq = ReadDirectoryRequest(handle, recursive)
set4handle = None
if handle in rqIndex:
set4handle = rqIndex[handle]
else:
set4handle = set()
rqIndex[handle] = set4handle
set4handle.add(rq)
rqIndexLck.release()
ret = rq.read_changes()
rqIndexLck.acquire()
if rq in set4handle:
set4handle.remove(rq)
rqIndexLck.release()
return ret
我是该行的作者。我相信CancelIo仅用于取消异步IO操作。就好像ReadDirectoryChangesW被设计为不可解释,直到后来添加了CancelIoEx。尽管有一个“简单”解决方案:实现ReadDirectoryChangesW的异步(重叠)版本。然后,如果它按我认为的那样工作,应该可以通过在调用ReadDirectoryChangesW的同一个线程中调用CancelIo来停止它。如果你想完成这项任务,补丁将受到欢迎。 – takoi 2014-09-05 11:10:39
@takoi非常感谢您的评论。我认为(我可能是错误的)'read_directory_changes'可以使用'ReadDirectoryChangesW'的异步模式来实现,如http://msdn.microsoft.com/en-us/library/windows/desktop/aa365465(v=vs .85).aspx通过完成例程检索其结果。每个线程的锁都可以在异步调用完成之后立即获取,并且可以通过完成例程逐个释放,或者同时由'close_directory_handle'获取。 – 2014-09-05 12:55:38