2017-02-14 232 views
0

试图将HiveMQ的两个功能:共享订阅和永久会话结合起来。HiveMQ与永久会话共享订阅

如果创建了一个非常简单的消息生产者。和一个非常简单的消费者。 运行多个使用者时,所有使用者都会收到所有消息。

为消费者设置clearSession为'false'后,当运行消费者并重新启动消费者时,消费者也会在未连接时收到消息。优秀。

现在将其与共享订阅功能相结合。 仅使用共享订阅时,clearSession为'true'。运行多个消费者时,消息只能由单个消费者接收。它应该是循环法,情况也是如此,但是一旦停止消费者,消息就不再循环,但其中一个消费者获得的消息明显多于其他消息。

如果我现在再次启用持久性会话,clearSession为'false',并启动共享订阅使用者,消费者开始再次接收所有消息,而不是仅将消息发送到一个客户端。

这里有什么问题? 这是HiveMQ中的错误吗? 永久会话和共享订阅是否可以一起使用?那真是一个无赖。

UPDATE 15/2/2017 作为@fraschbi建议我清除了所有数据,并再次重新测试与持续性会话消费者共享订阅。它似乎工作!

奇怪的是,只有第一位消费者重新连接后才会收到错过的消息。 所有消费者都有相同的代码,它们只是以不同的clientId参数开始。见下面的代码。 我的测试序列:

  • 开始消费者1:所有消息都是这个消费者。
  • 开始消费者2:每个消费者收到其他所有消息。
  • 开始消费者3:每个消费者获得3比3的消息。
  • 停止消费者1:现在消费者2和3收到其他所有消息。 (不知道为什么我昨天看到了这个不均匀的分布,但可能是@fraschbi提到的,​​因为我正在重新使用clientId,并且没有取消订阅或正确断开连接)
  • 现在停止consumer2:现在消费者3接收的所有消息。
  • 停止consumer3:不再收到消息。
  • 重新启动consumer3:它继续发送生产者发送的第一条消息。 它没有收到丢失的消息
  • 重新启动consumer2:消息再次均匀分布。
  • 重新启动消费者1:这一个现在收到所有丢失的消息,然后继续接收每3个消息中的1。

所以我的新问题是:为什么只有第一位消费者收到丢失的消息?

注意:这里的技巧仍然是在停止客户端时取消订阅,因为订阅/持久性设置已丢失!

Producer.scala

object Producer extends App { 

    val topic = args(0) 
    val brokerUrl = "tcp://localhost:1883" 

    val clientId = UUID.randomUUID().toString 

    val client = new MqttClient(brokerUrl, clientId) 
    client.connect() 
    val theTopic = client.getTopic(topic) 

    var count = 0 

    sys.addShutdownHook { 
    println("Disconnecting client...") 
    client.disconnect() 
    println("Disconnected.") 
    } 

    while(true) { 
    val msg = new MqttMessage(s"Message: $count".getBytes()) 
    theTopic.publish(msg) 
    println(s"Published: $msg") 

    Thread.sleep(1000) 

    count = count + 1 
    } 
} 

Consumer.scala

object Consumer extends App { 

    val topic = args(0) 
    val brokerUrl = "tcp://localhost:1883" 

    val clientId = args(1) 
// val clientId = UUID.randomUUID().toString 

    val client = new MqttClient(brokerUrl, clientId) 
    client.setCallback(new MqttCallback { 
    override def deliveryComplete(token: IMqttDeliveryToken) =() 

    override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}") 

    override def connectionLost(cause: Throwable) = println("Connection lost") 
    }) 

    println(s"Start $clientId consuming from topic: $topic") 
    val options = new MqttConnectOptions() 
    options.setCleanSession(false); 

    client.connect(options) 
    client.subscribe(topic) 

    sys.addShutdownHook { 
    println("Disconnecting client...") 
// client.unsubscribe(topic) 
    client.disconnect() 
    println("Disconnected.") 
    } 


    while(true) { 

    } 

} 

回答

1

我会尽量回答你单独遇到的两个问题。

它应该是循环法,也是这种情况,但只要您停止消费者,消息不再循环,但其中一个消费者获得明显更多的消息, 。

在为共享订阅分发邮件时,HiveMQ更喜欢在线客户端。

如果我现在再次启用持久性会话,clearSession为'false',并启动共享订阅使用者,消费者开始接收所有消息而不是消息只是传递给一个客户端。

在问题开始时,您表示您将客户端与cleanSession=false连接到经纪商并订阅该主题。 (听起来好像您只使用一个主题。) 是否有可能在重新连接cleanSession=false和共享订阅之前取消订阅这些客户端?在这种情况下,来自场景第一步的订阅仍将保留给这些客户端,当然他们每个都会收到这些消息。

编辑:

所以我的新问题是:为什么只有第一个消费者收到丢失的消息?

从HiveMQ用户指南:

当用户离线队列已满,该客户端的消息将不会被丢弃,但排队等待在共享订阅组的下一个脱机客户端。

当所有客户端都处于脱机状态时,分配不再循环。所以你描述的场景是在预期的行为之内。

消息队列的默认值为1000.因此,您可以发送超过1000条消息,而客户端处于脱机状态,或者减少消息队列大小。

... 
<persistence> 
    <queued-messages> 

     <max-queued-messages>50</max-queued-messages> 

    </queued-messages> 
    ... 
</persistence> 
... 

将此添加到您的​​3210用于减少消息队列大小。

+0

是的,客户端不会退订。刚刚停下来。明天再试。 –

+0

重新测试并更新了问题 –