2017-04-05 67 views
1

我试图做一些感觉应该是直截了当的事情,但证明出人意料的困难。RxJS使用异步订户功能的Observable

我有一个订阅RabbitMQ队列的函数。具体来说,这里是Channel.consume函数:http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

它返回一个承诺,该承诺用订阅ID解析 - 稍后需要取消订阅 - 还有一个回调参数,用于在消息从队列中拉出时调用。

当我想取消订阅队列时,我需要使用Channel.cancel函数在这里取消使用者:http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel。这需要先前返回的订阅ID。

我想将所有这些东西包装在观察者订阅时订阅队列的Observable中,并在observable退订时取消订阅。然而,由于调用的“双重异步”性质(我的意思是说它们同时具有回调和返回承诺),这证明有点困难。

理想情况下,我想能够编写的代码是:

return new Rx.Observable(async (subscriber) => { 
    var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message)); 
    return async() => { 
    await channel.cancel(consumeResult.consumerTag); 
    }; 
}); 

然而,这是不可能的,因为此构造不支持异步订户功能或拆除逻辑。

我一直无法找出这一个。我在这里错过了什么吗?为什么这么难?

干杯, 亚历

回答

1

创建的可观测并不需要等待channel.consume承诺来解决,作为观察员(这是一个的传递观察者,而不是一个用户)只从函数中调用你提供。

但是,您返回的取消订阅功能必须等待该承诺才能解决。它可以这样做内部,如下所示:

return new Rx.Observable((observer) => { 
    var consumeResult = channel.consume(queueName, (message) => observer.next(message)); 
    return() => { 
    consumeResult.then(() => channel.cancel(consumeResult.consumerTag)); 
    }; 
}); 
+0

感谢您的回应。在提出问题之前,我也考虑了你的建议,但我的问题是没有任何东西等待渠道的许诺。所以,假设对channel.cancel的调用仅在3秒钟后解析。有可能在该频道上收到新消息,但Rx观察者已经被取消订阅,因此这些消息将丢失到以太网中。这是我想要避免的。你有什么建议来解决这个问题吗? – AlexC

+0

我不明白这是一个问题。根据[observable contract]中的*订阅和取消订阅*部分(http://reactivex.io/documentation/contract.html):*当观察者向Observable发出取消订阅通知时,Observable将尝试停止发布通知观察员。然而,观察者在发出退订通知后,Observable将不会向观察者发出通知。* – cartant

+0

因此,如果通道持续抽出消息直到取消解析,观察者应该接收它们。也就是说,观察者的实现不应该期待没有进一步的消息被发布。 – cartant