2017-04-02 92 views
0

我在WebSocket(WS)内发出Redis订阅。当我接收到WS打开时,我将请求线程化,然后实例化Redis客户端。在公开之内,我为Redis提供线程并发布订阅。WebSocket和Redis导致从pubsub和/或brpop挂起连接

这一切正常,直到我收到一个意想不到的WS关闭。此时,运行Redis订阅的线程消失了。如果我发出取消订阅,我会得到一个挂起。如果我不退订,我已经离开了一个幻影订阅,导致我接下来发生麻烦。

发布它的线程终止后,是否有某种方法可以删除订阅?我已经注意到,Redis实例对于该终止的线程有一个mon变量。示例Ruby代码是:

class Backend 
    include MInit 

    def initialize(app) 
    setup 
    @app = app 
    end 

    def run!(env) 
    if Faye::WebSocket.websocket?(env) 
     ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME) 
     ws_thread = Thread.fork(env) do 
     credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password) 

     ws.on :open do |event| 
      channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length] 
      redis_thread = Thread.fork do 
      credis.subscribe(channel) do |on| 
       on.message do |message_channel, message| 
       sent = ws.send(message) 
       end 
       on.unsubscribe do |message_channel| 
       puts "Unsubscribe on channel:#{channel};" 
       end 
      end 
      end 
     end 

     ws.on :message do |event| 
      handoff(ws: ws, event: event) 
     end 

     ws.on :close do |event| 
      # Hang occurs here 
      unsubscribed = credis.unsubscribe(channel) 
     end 

     ws.on :error do |event| 
      ws.close 
     end 

     # Return async Rack response 
     ws.rack_response 

     end 
    end 
    else 
    @app.call(env) 
    end 

    private 
    def handoff(ws: nil, event: nil, source: nil, message: nil) 
    # processing 
    end 
end 

回答

1

一旦我真正理解了问题,该修复就相当简单。 Redis线程实际上依然存在。但是,Redis仍然挂着,因为它正在等待线程获得控制权。要做到这一点,在WS.close代码需要通过如下WS.close内使用EM.next_tick割让控制:

ws.on :close do |event| 
    EM.next_tick do 
    # Hang occurs here 
    unsubscribed = credis.unsubscribe(channel) 
    end 
end 
+0

您可能会尝试的另一种方法是让Redis定期向主题发布一些无害消息,强制所有订阅都执行_write_。任何“幻像”订阅应该随后被清除/关闭,因为Redis检测到写入死/关对方的错误。 – Castaglia

+0

@Castaglia有趣。就我而言,知道以前的订阅没有清楚地知道如何清除它,但在恢复时,我发出了另一个订阅并继续处理。这会将消息发布到相同频道,包括幻像订阅和同名的新直播频道。你希望能够清除幻影吗?我问,因为随着时间的推移,我遇到了这种情况下的资源限制。 –

+0

啊,我想我明白了。订阅绑定到TCP连接,_that_ TCP连接仍然存在;它是客户端上的_thread_,它从应用程序的其余部分的POV中去世,从而“失去”订阅。是?我想知道,如果从同一个TCP连接(清除任何幻像/丢失/滞留的订阅)执行'UNSUBSCRIBE'(指定所有频道,或者不指定所有频道),然后重新订阅,可能会工作。 – Castaglia

1

这更多的是一种长的评论提供了一个解决办法,而不是一个解决方案。

如果是我的申请,我会重新考虑设计。

为每个websocket客户端打开一个新的Redis连接和一个新线程是一个相当大的资源承诺。

只是为了说明,与Redis的每个连接都需要TCP/IP套接字(这是一种有限的资源)和内存。线程的保留堆栈内存应该为每个线程花费大约2Mib ...因此,1K Redis连接和线程会在内存中产生大约2Gib的成本。

除了Redis服务器本身具有有限数量的连接,它通常可以接受这一事实之外(尽管这通常是价格而不是硬性限制的问题,因为它们是按比例设计的) 。

重新调整设计应该非常简单,以便单个线程和连接为所有websocket客户端提供服务,这也将使管理更容易。

这可以使用内部每进程广播系统(例如由plezi.io实现)或使用Redis subscribe/punsubscribe命令执行。

+0

我首先看了plezi.io。事实上,我最近审查它,看看它是否有退订。它根本不符合我的需求。是的,我了解Redis连接的负担,因为我监控它们。这是我专注于清理孤儿的原因之一。实际上,在这个过程中,我从PUBSUB转换到LPUSH/BRPOP,虽然在这里遇到同样的问题,但它更容易管理。我的最终用例只需要我选择的设计。但是,感谢您的建议。 –

+0

@R_G,我想有时候设计不在我们手中......我只能祝你好运! ... P.S. ......我不是故意推plezi,我很抱歉,如果它出现这种方式。 – Myst

+0

当然不是!我来这里寻求建议,并感谢大家的努力! –