2017-10-17 159 views
2

我有一个在Docker容器中运行的python“Device”。它连接到Crossbar路由器,在订阅的频道上接收高速公路/ WAMP事件消息。处理来自高速通道的消息异步,非阻塞订阅

当某个事件发布时,我的设备正在调用几秒钟内完成的方法。 现在,我希望它跳过或处理收到的同一事件的任何消息,而该方法仍在运行。我试图通过使用Twisted的@inlinecallback修饰器并在设备上设置“self.busy”标志来实现此目的。

但是它并没有立即返回,而是像正常的阻塞方法那样工作,以便传入的消息被一个接一个地处理。

这里是我的代码:

from autobahn.twisted.wamp import ApplicationSession 
from twisted.internet.defer import inlineCallbacks 

class Pixel(ApplicationSession): 

@inlineCallbacks 
def onJoin(self, details): 
    yield self.subscribe(self.handler_no_access, 'com.event.no_access') 

@inlineCallbacks 
def handler_no_access(self, direction): 
    entries = len(self.handlers['no_access'][direction]) 

    if entries == 0: 
     self.handlers['no_access'][direction].append(direction) 
     result = yield self._handler_no_access() 
     return result 

    else: 
     yield print('handler_no_access: entries not 0: ', self.handlers['no_access']) 

@inlineCallbacks 
def _handler_no_access(self): 
    for direction in self.handlers['no_access']: 

     for message in self.handlers['no_access'][direction]: 
      yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5) 
      self.handlers['no_access'][direction] = [] 

我已经迈出了哈克路径与self.handler字典,顺便说一句。

EDIT

阻塞的方法是:

yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5) 

它控制在树莓派的一个的GPIO Neopixel,让它闪烁和关闭1秒。任何进一步的方法调用

def handler_no_access(self, direction) 

而_timed_switch尚未完成,应该被跳过,所以他们不堆叠。

SOLUTION

@inlineCallbacks 
def handler_no_access(self, direction): 
    direction = str(direction) 

    if self.busy[direction] is False: 

     self.busy[direction] = True 

     # non-blocking now 
     yield deferToThread(self._handler_no_access, direction) 

    else: 
     yield print('handler_no_access: direction {} busy '.format(direction)) 

def _handler_no_access(self, direction): 

    # this takes 1s to execute 
    self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5) 

    self.busy[direction] = False 

回答

0

inlineCallbacks不会使阻塞的代码到非阻塞代码。这只是一个使用Deferreds的替代API。延期只是一种管理回调的方法。

你需要重写你的阻塞代码,以非阻塞的方式。你实际上没有说过你的代码的哪一部分被阻塞,也没有说明它阻塞了什么,因此很难建议你如何做到这一点。将阻塞代码变为非阻塞的唯一两​​种通用工具是线程和进程。所以,你可以在一个单独的线程或进程中运行该函数。该函数可能会或可能不会在这样的执行环境中工作(再次,没有办法知道它确实知道它是什么)。

+0

恩,谢谢你的提醒,显然我错误地理解了Twisted中的Deferred-mechanics。 – stk

+0

[链接](http://twistedmatrix.com/documents/current/core/howto/gendefer.html#what-deferreds-don-t-do-make-your-code-asynchronous) 我试图现在使用deferToThread(f)并将报告回来,这将解决我的问题。 – stk