2017-06-23 232 views
0

我在django中使用paho-mqtt来接收消息。一切正常。但是on_message()函数被执行两次。Paho-Mqtt django,on_message()函数运行两次

我试过调试,但好像函数调用了一次,但数据库插入发生两次,消息的打印发生两次,on_message()函数内的所有内容都发生两次,并且我的数据被插入每次发布两次。

我怀疑它发生在一个并行线程中,并安装了一个芹菜redis后端队列插入,并避免重复插入。但数据仍被插入两次。

我也尝试锁定变量,以避免并行线程问题,但仍然插入两次数据。

我正在使用Postgres DB

如何解决此问题?我想ON_MESSAGE()函数只有一次每个发布

初始化的.py

from . import mqtt 
mqtt.client.loop_start() 

我mqtt.py

import ast 
import json 

import paho.mqtt.client as mqtt 


# Broker CONNACK response 
from datetime import datetime 

from raven.utils import logger 

from kctsmarttransport import settings 


def on_connect(client, userdata, flags, rc): 
    # Subcribing to topic and recoonect for 
    client.subscribe("data/gpsdata/server/#") 
    print 'subscribed to data/gpsdata/server/#' 


# Receive message 

def on_message(client, userdata, msg): 
    # from kctsmarttransport.celery import bus_position_insert_task 
    # bus_position_insert_task.delay(msg.payload) 
    from Transport.models import BusPosition 
    from Transport.models import Student, SpeedWarningLog, Bus 
    from Transport.models import Location 
    from Transport.models import IdleTimeLog 
    from pytz import timezone 
    try: 
     dumpData = json.dumps(msg.payload) 
     rawGpsData = json.loads(dumpData) 
     jsonGps = ast.literal_eval(rawGpsData) 
     bus = Bus.objects.get(bus_no=jsonGps['Busno']) 
     student = None 
     stop = None 
     if jsonGps['card'] is not False: 
      try: 
       student = Student.objects.get(rfid_value=jsonGps['UID']) 
      except Student.DoesNotExist: 
       student = None 
     if 'stop_id' in jsonGps: 
      stop = Location.objects.get(pk=jsonGps['stop_id']) 

     dates = datetime.strptime(jsonGps['Date&Time'], '%Y-%m-%d %H:%M:%S') 
     tz = timezone('Asia/Kolkata') 
     dates = tz.localize(dates) 
     lat = float(jsonGps['Latitude']) 
     lng = float(jsonGps['Longitude']) 
     speed = float(jsonGps['speed']) 

     # print msg.topic + " " + str(msg.payload) 
     busPosition = BusPosition.objects.filter(bus=bus, created_at=dates, 
               lat=lat, 
               lng=lng, 
               speed=speed, 
               geofence=stop, 
               student=student) 
     if busPosition.count() == 0: 
      busPosition = BusPosition.objects.create(bus=bus, created_at=dates, 
                lat=lat, 
                lng=lng, 
                speed=speed, 
                geofence=stop, 
                student=student) 
      if speed > 60: 
       SpeedWarningLog.objects.create(bus=busPosition.bus, speed=busPosition.speed, 
               lat=lat, lng=lng, created_at=dates) 
       sendSMS(settings.TRANSPORT_OFFICER_NUMBER, jsonGps['Busno'], jsonGps['speed']) 
      if speed <= 2: 
       try: 
        old_entry_query = IdleTimeLog.objects.filter(bus=bus, done=False).order_by('idle_start_time') 
        if old_entry_query.count() > 0: 
         old_entry = old_entry_query.reverse()[0] 
         old_entry.idle_end_time = dates 
         old_entry.save() 
        else: 
         new_entry = IdleTimeLog.objects.create(bus=bus, idle_start_time=dates, lat=lat, lng=lng) 
       except IdleTimeLog.DoesNotExist: 
        new_entry = IdleTimeLog.objects.create(bus=bus, idle_start_time=dates, lat=lat, lng=lng) 
      else: 
       try: 
        old_entry_query = IdleTimeLog.objects.filter(bus=bus, done=False).order_by('idle_start_time') 
        if old_entry_query.count() > 0: 
         old_entry = old_entry_query.reverse()[0] 
         old_entry.idle_end_time = dates 
         old_entry.done = True 
         old_entry.save() 
       except IdleTimeLog.DoesNotExist: 
        pass 
    except Exception, e: 
     logger.error(e.message, exc_info=True) 


client = mqtt.Client() 
client.on_connect = on_connect 
client.on_message = on_message 
client.connect("10.1.75.106", 1883, 60) 
+0

我知道的唯一的事情是,你正在运行2倍的脚本,而不是funcion,你真的找到一个解决办法2呢?我有同样的问题 –

+0

它看起来像init被调用两次。有人说Django有两个进程,主进程和用于实时更新的自动更新进程。如果你得到进程ID(导入OS os.getpid),你会看到两个不同的进程。请参阅https://stackoverflow.com/questions/2110545/why-is-init-module-in-django-project-loaded-twice。使用--noreload标志只加载一个进程。 – Ben987654

回答

0

我有同样的问题执行!

尝试使用:

def on_disconnect(client, userdata, rc): 
    client.loop_stop(force=False) 
    if rc != 0: 
     print("Unexpected disconnection.") 
    else: 
     print("Disconnected") 
+0

Warnign:如果你的经纪人倒闭,你的客户不要重新连接! – heltonitba