我一直在玩MQTT的一个项目,遇到了一个奇怪的问题。我使用paho
作为我的MQTT客户端,并使用VerneMQ
作为代理。
VerneMQ代理服务已启动并正在运行,我可以通过runnnig netstat
来确认,我可以看到127.0.0.1:1883
条目处于LISTENING
模式。
这是我的客户端代码:
public class Producer implements MqttCallback {
private String brokerUri;
private String clientId;
public Producer(String brokerUri, String clientId){
this.brokerUri = brokerUri;
this.clientId = clientId;
}
public void doProduce(String topic, String payload){
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void connectionLost(Throwable throwable) {
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Message delivered!");
}
}
以下是我的主类
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
producer.doProduce("dummyTopic", "dummyMessage");
}
}
问题
当我跑我的申请,我看到Client is not connected (32104)
异常的输出。
如果我改变线mqttAsyncClient.connect(mqttConnectOptions);
到mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();
在Producer
类,我可以成功连接到经纪人,我可以在输出中看到Message delivered!
。
如果我没有弄错waitForCompletion()
会阻止呼叫,直到收到回复。通过添加这一行,我有效地改变了我的AsyncClient连接到阻塞连接,这对我来说不是我想要的。
问题
所以泛美卫生组织MQTT客户端连接在非阻塞的方式促成我怎样才能解决这个问题?我在路上错过了什么?
大,感谢您指出正确的方向。 – raidensan