2017-10-05 1027 views
0

背景如何解决paho mqtt客户端的异步连接问题?

我一直在玩MQTT的一个项目,遇到了一个奇怪的问题。我使用paho作为我的MQTT客户端,并使用VerneMQ作为代理。

VerneMQ代理服务已启动并正在运行,我可以通过runnnig netstat来确认,我可以看到127.0.0.1:1883条目处于LISTENING模式。

这是我的客户端代码:

public class Producer implements MqttCallback { 

    private String brokerUri; 
    private String clientId; 

    public Producer(String brokerUri, String clientId){ 
     this.brokerUri = brokerUri; 
     this.clientId = clientId; 
    } 

    public void doProduce(String topic, String payload){ 
     MemoryPersistence memoryPersistence = new MemoryPersistence(); 

     try { 
      MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence); 
      MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); 
      mqttConnectOptions.setCleanSession(true); 
      mqttAsyncClient.setCallback(this); 
      mqttAsyncClient.connect(mqttConnectOptions); 
      MqttMessage mqttMessage = new MqttMessage(); 
      mqttMessage.setPayload(payload.getBytes()); 
      mqttAsyncClient.publish(topic, mqttMessage); 
     } catch (MqttException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void connectionLost(Throwable throwable) { 

    } 


    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 

    } 


    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 
     System.out.println("Message delivered!"); 
    } 
} 

以下是我的主类

public class Main { 
    public static void main(String[] args) { 
     Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1"); 
     producer.doProduce("dummyTopic", "dummyMessage"); 
    } 
} 

问题

当我跑我的申请,我看到Client is not connected (32104)异常的输出。

如果我改变线mqttAsyncClient.connect(mqttConnectOptions);mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();Producer类,我可以成功连接到经纪人,我可以在输出中看到Message delivered!

如果我没有弄错waitForCompletion()会阻止呼叫,直到收到回复。通过添加这一行,我有效地改变了我的AsyncClient连接到阻塞连接,这对我来说不是我想要的。

问题

所以泛美卫生组织MQTT客户端连接在非阻塞的方式促成我怎样才能解决这个问题?我在路上错过了什么?

回答

2

此涵盖的文档中为IMqttAsyncClient

IMqttToken token method(parms, Object userContext, IMqttActionListener callback) 

在这种形式的回调与方法注册。当动作成功或失败时,将通知回调 。在由MQTT客户端管理的线程上调用的回调为 ,所以重要的是 处理在回调中被最小化。如果不是 的操作,MQTT客户端将被禁止。对于被通知的例子(称为 背面)当一个连接完成:

IMqttToken conToken; 
    conToken = asyncClient.connect("some context", new MqttAsyncActionListener() { 
     public void onSuccess(IMqttToken asyncActionToken) { 
     log("Connected"); 
     } 

     public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 
     log ("connect failed" +exception); 
     } 
}); 

一个可选的上下文对象可以被传递到这将 然后在回调来提供该方法。该上下文由 MQTT客户端存储在令牌中,然后返回给调用者。将 令牌提供给回调方法,然后可以访问上下文 。

所以你try/catch块应该是这样的:

try { 
    MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence); 
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); 
    mqttConnectOptions.setCleanSession(true); 
    mqttAsyncClient.setCallback(this); 
    mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() { 
    public void onSuccess(IMqttToken asyncActionToken) { 
     MqttMessage mqttMessage = new MqttMessage(); 
     mqttMessage.setPayload(payload.getBytes()); 
     mqttAsyncClient.publish(topic, mqttMessage); 
    } 

    public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 
     exception.printStackTrace(); 
    } 
}); 

} catch (MqttException e) { 
    e.printStackTrace(); 
} 
+0

大,感谢您指出正确的方向。 – raidensan