2016-10-05 51 views
1

尝试简单订阅,而不会断开与Mosquitto代理的连接,以便从发布特定主题数据的设备获取所有消息,将它们保存在BD中并将其发布到执行“员工”的PHP。MQTT Paho Python客户端订户,如何订阅永久?

这里是我的subscribe.py

import paho.mqtt.client as mqtt 
from mqtt_myapp import * 

topic="topic/#"     # MQTT broker topic 
myclient="my-paho-client"   # MQTT broker My Client 
user="user"      # MQTT broker user 
pw="pass"      # MQTT broker password 
host="localhost"    # MQTT broker host 
port=1883      # MQTT broker port 
value="123"      # somethin i need for myapp 

def on_connect(mqttc, userdata, rc): 
    print('connected...rc=' + str(rc)) 
    mqttc.subscribe(topic, qos=0) 

def on_disconnect(mqttc, userdata, rc): 
    print('disconnected...rc=' + str(rc)) 

def on_message(mqttc, userdata, msg): 
    print('message received...') 
    print('topic: ' + msg.topic + ', qos: ' + 
      str(msg.qos) + ', message: ' + str(msg.payload)) 
    save_to_db(msg) 
    post_data(msg.payload,value) 

def on_subscribe(mqttc, userdata, mid, granted_qos): 
    print('subscribed (qos=' + str(granted_qos) + ')') 

def on_unsubscribe(mqttc, userdata, mid, granted_qos): 
    print('unsubscribed (qos=' + str(granted_qos) + ')') 

mqttc = mqtt.Client(myclient) 
mqttc.on_connect = on_connect 
mqttc.on_disconnect = on_disconnect 
mqttc.on_message = on_message 
mqttc.on_subscribe = on_subscribe 
mqttc.on_unsubscribe = on_unsubscribe 
mqttc.username_pw_set(user,pw) 
mqttc.connect(host, port, 60) 
mqttc.loop_forever() 

这里是我的mqtt_myapp.py:

import MySQLdb 
import requests # pip install requests 

url = "mydomain/data_from_broker.php" 

def save_to_db(msg): 
    with db: 
     cursor = db.cursor() 
     try: 
      cursor.execute("INSERT INTO MQTT_LOGS (topic, payload) VALUES (%s,%s)", (msg.topic, msg.payload)) 
     except (MySQLdb.Error, MySQLdb.Warning) as e: 
      print('excepttion BD ' + e) 
      return None 

def post_data(payload,value): 
    datos = {'VALUE': value,'data-from-broker': payload} 
    r = requests.post(url, datos) 
    r.status_code 
    print('response POST' + str(r.status_code)) 

db = MySQLdb.connect("localhost","user_db","pass_db","db") 

当我运行在后台我的Python脚本python -t mqtt_subscribe.py &我得到的消息给其他客户端发布,但运行几个小时后,我的subscribe.py脚本就会发生套接字错误。

Mosquito.log:

... 
    1475614815: Received PINGREQ from my-paho-client 
    1475614815: Sending PINGRESP to my-paho-client 
    1475614872: New connection from xxx.xxx.xxx.xxx on port 1883. 
    1475614872: Client device1 disconnected. 
    1475614872: New client connected from xxx.xxx.xxx.xxx as device1(c0, k0, u'user1'). 
    1475614872: Sending CONNACK to device1(0, 0) 
    1475614873: Received PUBLISH from device1(d0, q1, r0, m1, 'topic/data', ... (33 bytes)) 
    1475614873: Sending PUBACK to device1 (Mid: 1) 
    1475614873: Sending PUBLISH to my-paho-client (d0, q0, r0, m0, 'topic/data', ... (33 bytes)) 
    1475614874: Received DISCONNECT from device1 
    1475614874: Client device1 disconnected. 
... 
    1475625566: Received PINGREQ from my-paho-client 
    1475625566: Sending PINGRESP to my-paho-client 
    1475625626: Received PINGREQ from my-paho-client 
    1475625626: Sending PINGRESP to my-paho-client 
    1475625675: New connection from xxx.xxx.xxx.xxx on port 1883. 
    1475625675: Client device1 disconnected. 
    1475625675: New client connected from xxx.xxx.xxx.xxx as device1 (c0, k0, u'user1'). 
    1475625675: Sending CONNACK to device1 (0, 0) 
    1475625677: Received PUBLISH from device1 (d0, q1, r0, m1, 'topic/data', ... (33 bytes)) 
    1475625677: Sending PUBACK to device1 (Mid: 1) 
    1475625677: Sending PUBLISH to my-paho-client (d0, q0, r0, m0, 'topic/data', ... (33 bytes)) 
    1475625677: Socket error on client my-paho-client, disconnecting. 
    1475625677: Received DISCONNECT from device1 
... 

可能是什么问题呢?任何想法或建议?

在此先感谢

回答

1

如果你的方法“ON_MESSAGE”代码抛出一个异常,你不抓住它,你将被断开。 尝试取消打印语句以外的所有语句的注释。可能下列其中一种说法是抛出异常。

save_to_db(msg) 
post_data(msg.payload,value) 
+0

如果我想要捕获“on_message”事件,我应该在哪里编码“save_to_db”? – smontoya

+0

你可以在那里编写代码,只要确保你在那里捕获所有异常,如果你不想断开连接。 – SebastianK

+0

试着理解,所以只是因为我不捕捉异常,脚本断开连接?即使我发现异常并且什么也不做脚本不停止? SebastianK,你能否给我一个参考来研究这个问题?非常感谢。 – smontoya