我在WebSocket(WS)内发出Redis订阅。当我接收到WS打开时,我将请求线程化,然后实例化Redis客户端。在公开之内,我为Redis提供线程并发布订阅。WebSocket和Redis导致从pubsub和/或brpop挂起连接
这一切正常,直到我收到一个意想不到的WS关闭。此时,运行Redis订阅的线程消失了。如果我发出取消订阅,我会得到一个挂起。如果我不退订,我已经离开了一个幻影订阅,导致我接下来发生麻烦。
发布它的线程终止后,是否有某种方法可以删除订阅?我已经注意到,Redis实例对于该终止的线程有一个mon变量。示例Ruby代码是:
class Backend
include MInit
def initialize(app)
setup
@app = app
end
def run!(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME)
ws_thread = Thread.fork(env) do
credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password)
ws.on :open do |event|
channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
redis_thread = Thread.fork do
credis.subscribe(channel) do |on|
on.message do |message_channel, message|
sent = ws.send(message)
end
on.unsubscribe do |message_channel|
puts "Unsubscribe on channel:#{channel};"
end
end
end
end
ws.on :message do |event|
handoff(ws: ws, event: event)
end
ws.on :close do |event|
# Hang occurs here
unsubscribed = credis.unsubscribe(channel)
end
ws.on :error do |event|
ws.close
end
# Return async Rack response
ws.rack_response
end
end
else
@app.call(env)
end
private
def handoff(ws: nil, event: nil, source: nil, message: nil)
# processing
end
end
您可能会尝试的另一种方法是让Redis定期向主题发布一些无害消息,强制所有订阅都执行_write_。任何“幻像”订阅应该随后被清除/关闭,因为Redis检测到写入死/关对方的错误。 – Castaglia
@Castaglia有趣。就我而言,知道以前的订阅没有清楚地知道如何清除它,但在恢复时,我发出了另一个订阅并继续处理。这会将消息发布到相同频道,包括幻像订阅和同名的新直播频道。你希望能够清除幻影吗?我问,因为随着时间的推移,我遇到了这种情况下的资源限制。 –
啊,我想我明白了。订阅绑定到TCP连接,_that_ TCP连接仍然存在;它是客户端上的_thread_,它从应用程序的其余部分的POV中去世,从而“失去”订阅。是?我想知道,如果从同一个TCP连接(清除任何幻像/丢失/滞留的订阅)执行'UNSUBSCRIBE'(指定所有频道,或者不指定所有频道),然后重新订阅,可能会工作。 – Castaglia