2016-08-02 128 views
1

我试图使用基本的Eclipse Paho MQTT客户端1.1.0版连接到CloudAMQP RabbitMQ实例,订阅主题并接收消息(我通过网络发送消息管理控制台)。Paho Android客户端在显示消息时断开连接

如果应用程序将所有消息载荷发送到日志输出,它会很好地工作。

如果应用程序将消息添加到TextView中,则会显示该消息,但该连接会立即丢弃,并且不会收到更多消息。

完整的项目可在GitHub。下面是一个简单的例子。

有一个基于服务的MQTT Paho客户端,但我认为对于非常简单的应用程序,基本客户端应该能够在Android应用程序UI中接收和显示消息。

... 

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttClientPersistence; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 

public class MainActivity extends AppCompatActivity implements MqttCallback { 

    private static final String TAG = "main"; 
    private Connection connection; 

    @Override 
    protected void onCreate(Bundle savedInstanceState) { 
     super.onCreate(savedInstanceState); 
     setContentView(R.layout.activity_main); 
     Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar); 
     setSupportActionBar(toolbar); 

     configureUI(); 
    } 

    private Button buttonConnect; 
    private TextView messageWindow; 


    private void configureUI() { 
     buttonConnect = (Button) findViewById(R.id.buttonConnect); 
     messageWindow = (TextView) findViewById(R.id.messageWindow); 

     buttonConnect.setOnClickListener(new View.OnClickListener() { 
      @Override 
      public void onClick(View v) { 
       String s = "***"; 
       String d = "test"; 
       String u = "***"; 
       String p = "***"; 

       if (connection != null && connection.isConnected()) { 
        connection.disconnect(); 
        connection = null; 
        messageWindow.setText(String.format("Disconnected from server %s", 
          new Object[]{s})); 
        return; 
       } 

       messageWindow.setText(String.format("Connecting to server %s as user %s", 
         new Object[]{s, u})); 

       connection = new Connection(MainActivity.this, MainActivity.this, s, u, p); 
       connection.connect(); 

       if (connection.isConnected()) { 
        messageWindow.append("\n\n"); 
        messageWindow.append(String.format("Connected, listening for messages from topic %s", 
          new Object[]{d})); 
        connection.subscribe(d); 
       } 
      } 
     }); 
    } 

    @Override 
    public void connectionLost(Throwable cause) { 
     Log.e(TAG, "connectionLost" + cause.getMessage()); 
    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws Exception { 
     String msg = new String(message.getPayload()); 
     Log.i(TAG, "Message Arrived: " + msg); 
     // messageWindow.append(msg); 
    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 
     Log.i(TAG, "Delivery Complete!"); 
    } 

    class Connection { 
     private static final String TAG = "conn"; 
     private static final String protocol = "tcp://"; 
     private static final int port = 1883; 
     private static final int version = MqttConnectOptions.MQTT_VERSION_3_1_1; 
     private static final int keepAliveSeconds = 20 * 60; 

     private final Context context; 
     private MqttClient client; 

     private final String server; 
     private final String user; 
     private final String pass; 

     private final MqttConnectOptions options = new MqttConnectOptions(); 

     public Connection(Context ctx, MqttCallback mqttCallback, String server, String user, String pass) { 
      this.context = ctx; 
      this.server = server; 
      this.user = user; 
      this.pass = pass; 

      MqttClientPersistence memPer = new MemoryPersistence(); 
      try { 
       String url = protocol + server + ":" + port; 
       client = new MqttClient(url, MqttClient.generateClientId(), memPer); 
       client.setCallback(mqttCallback); 
      } catch (MqttException e) { 
       e.printStackTrace(); 
      } 

      options.setUserName(user + ":" + user); 
      options.setPassword(pass.toCharArray()); 
      options.setMqttVersion(version); 
      options.setKeepAliveInterval(keepAliveSeconds); 
     } 

     void connect() { 
      Log.i(TAG, "buttonConnect"); 
      try { 
       client.connect(options); 
      } catch (MqttException ex) { 
       Log.e(TAG, "Connection attempt failed with reason code = " + ex.getReasonCode() + ":" + ex.getCause()); 
      } 
     } 

     public boolean isConnected() { 
      return client.isConnected(); 
     } 

     public void disconnect() { 
      try { 
       client.disconnect(); 
      } catch (MqttException e) { 
       Log.e(TAG, "Disconnect failed with reason code = " + e.getReasonCode()); 
      } 
     } 

     void subscribe(String dest) { 
      try { 
       client.subscribe(dest); 
      } catch (MqttException e) { 
       Log.e(TAG, "Subscribe failed with reason code = " + e.getReasonCode()); 
      } 
     } 
    } 
} 
+0

你的意思是“连接被立即被显示的消息后,收到”“连接被立即丢弃......” – hardillb

+0

@hardillb我固定它,非常感谢注意到! – mjn

回答

2

我想这是因为您正在尝试从无UI线程更新TextView。

尝试在runOnUiThread呼叫中包装messageWindow.append(msg);

public void messageArrived(String topic, MqttMessage message) throws Exception { 
    String msg = new String(message.getPayload()); 
    Log.i(TAG, "Message Arrived: " + msg); 
    runOnUiThread(new Runnable(){ 
     public void run() { 
      messageWindow.append(msg); 
     } 
    }); 
} 
+1

这解决了它,非常感谢! (我认为将整个连接移到一个AsyncTask可能会有所帮助) – mjn

+0

它的工作原理,感谢你们两位 –

相关问题