2016-04-23 54 views
1

更新MQTT连接到ActiveMQ的主题需要大量的时间在事件接收器WSO2 CEP

在碳分析常见项目的MQTTAdapterListener的源代码,因为这link

run()进入休眠状态我相信这就是为什么MQTT连接需要这么长时间的原因。

@Override 
public void run() { 
    while (!connectionSucceeded) { 
     try { 
      MQTTEventAdapterConstants.initialReconnectDuration = MQTTEventAdapterConstants.initialReconnectDuration 
        * MQTTEventAdapterConstants.reconnectionProgressionFactor; 
      Thread.sleep(MQTTEventAdapterConstants.initialReconnectDuration); 
      startListener(); 
      connectionSucceeded = true; 
      log.info("MQTT Connection successful"); 
     } catch (InterruptedException e) { 
      log.error("Interruption occurred while waiting for reconnection", e); 
     } catch (MqttException e) { 
      log.error("MQTT Exception occurred when starting listener", e); 

     } 

    } 
} 

的initialReconnectDuration和reconnectionProgressionFactor在MQTTEventAdapterConstants

public static int initialReconnectDuration = 10000; 
public static final int reconnectionProgressionFactor = 2; 

如下如果我有12接收机与MQTT,第12一个将睡眠40960秒。似乎没有办法修改这两个常量?有什么办法可以解决这个问题吗?为什么MQTT连接将以这种方式设置为睡眠线程?


我们使用WSO2 CEP版本4.0.0和ActiveMQ版本5.13.0。部署的事件接收器是如下

<?xml version="1.0" encoding="UTF-8"?> 
<eventReceiver name="5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER" 
    statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> 
    <from eventAdapterType="mqtt"> 
     <property name="clientId">5718a6b1851cb3474c6f03c2</property> 
     <property name="topic">PROBE_DATA_571844f5851cb3474c6f0391_57184581851cb3474c6f0394_Topic</property> 
     <property name="cleanSession">false</property> 
     <property name="url">tcp://127.0.0.1:1883</property> 
    </from> 
    <mapping customMapping="enable" type="json"> 
     <property> 
      <from jsonPath="$.event.payloadData.dyna.Speed3"/> 
      <to name="speed" type="double"/> 
     </property> 
    </mapping> 
    <to streamName="5718a6b1851cb3474c6f03c2_PROBE_DATA_IS" version="1.0.0"/> 
</eventReceiver> 

ClientID的是在我们的数据库内部此事件执行计划的随机对象ID。这个话题存在于ActiveMQ中,我相信这条消息已经入队了。对于持久订阅,干净会话属性已设置为false。

但是,无论我们重新启动WSO2 CEP还是部署新接收器,接收器都需要大量时间连接到ActiveMQ主题。

[2016-04-24 01:07:59,018] INFO {org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService} - Starting polling event receivers 
[2016-04-24 01:07:59,019] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66bcc7a22442328cd229a_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,020] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,021] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 57144fa0851cb33590672418_X_DATA_RECEIVER 
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714b5f4851cb3338c26a1cc_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571b9ef4851cb356e04ec90b_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714a305851cb3338c26a1c4_DELIVERY_ORDER_RECEI 
VER 
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f6b03e7a22440c30855015_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f665f27a22442328cd2299_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a5d7851cb3474c6f03bf_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,025] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56ef98137a22442e28360c0b_DELIVERY_ORDER_RECEI 
VER 
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571747f2851cb3338c26a1da_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66c657a22442328cd229c_PROBE_DATA_RECEIVER 
[2016-04-24 01:08:19,044] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:08:39,041] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:09:19,034] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:10:39,037] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:13:19,046] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:18:39,035] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:29:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:50:39,036] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 02:33:19,039] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 03:58:39,043] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 06:49:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 12:30:39,040] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 

有12个接收器,WSO2在01:07:59开始轮询。大约10分钟后,12台接收机中只有6台连接成功,其他接收机大约需要10个小时以上。

有谁知道为什么WSO2 CEP接收器的MQTT会连接这么慢吗?

回答

1

无法提供开箱即用的配置重新连接持续时间和渐进系数值。但是,由于您已经计算出代码,因此可以修补该组件(从配置文件中读取这些值或通过适配器ui或仅使用硬编码值对其进行配置),并将修补程序应用为described here以克服此限制。

+0

嗨Rajeev,我真的很感激你的美妙想法和答案。我检查了我使用的版本是4.0.0的WSO2CEP,并且其repository \ components \ plugins中的MQTT是org.wso2.carbon.event.input.adapter.mqtt_5.0.3.jar。所以我查了一下carbon-analytics-common项目的标签v5.0.3的源代码,修改了代码,maven构建了org.wso2.carbon.event.input.adapter.mqtt模块。最后,我有一个org.wso2.carbon.event.input.adapter.mqtt-5.0.3.jar! – Bruce

+0

但是,我仍然无法弄清楚如何修补WSO2CEP。 WSO2补丁只是一个简单的.jar文件,并将其放入/repository/components/patches /中。我已经做到了这一点,并重新启动WSO2CEP服务器,但无法使用-DapplyPatches arugment。 – Bruce

+0

顺便说一下,由于我建立的文件名是org.wso2.carbon.event.input.adapter.mqtt-5.0.3.jar,而WSO2CEP插件中的一个是org.wso2.carbon.event.input.adapter .mqtt_5.0.3.jar。所以我将我的内置jar重命名为org.wso2.carbon.event.input.adapter.mqtt_5.0.3.jar,并将其放入修补程序文件夹。 – Bruce