2014-03-12 152 views
0

我写了一个mqtt的客户端代码,它应该连接(现在只需连接到服务器)到活动的mq服务器。以下是代码:MQTT连接activeMQ

#include "stdafx.h" 
#include<iostream> 
#include<winsock2.h> 
#include "mqtt.h" 

#pragma comment(lib,"ws2_32.lib") //Winsock Library 
#include <stdlib.h> 
#include <winsock2.h> 


void mqtt_init(mqtt_broker_handle_t* broker, const char* clientid) { 
    // Connection options 
    broker->alive = 300; // 300 seconds = 5 minutes 
    broker->seq = 1; // Sequence for message identifiers 

    // Client options 
    memset(broker->clientid, 0, sizeof(broker->clientid)); 
    memset(broker->username, 0, sizeof(broker->username)); 
    memset(broker->password, 0, sizeof(broker->password)); 
    if(clientid) { 
     strncpy_s(broker->clientid, clientid, sizeof(broker->clientid)); 
    } else { 
     strcpy_s(broker->clientid, "emqtt"); 
    } 
    // Will topic 
    broker->clean_session = 1; 
} 

void mqtt_init_auth(mqtt_broker_handle_t* broker, const char* username, const char* password) 
{ 
    if(username && username[0] != '\0') 
     strncpy_s(broker->username, username, sizeof(broker->username)-1); 
    if(password && password[0] != '\0') 
     strncpy_s(broker->password, password, sizeof(broker->password)-1); 
} 

using namespace std; 

int main(int argc , char *argv[]) 
{ 
    WSADATA wsa; 
    SOCKET s; 
    struct sockaddr_in server; 
    char message[1000] , server_reply[2000]; 
    int recv_size; 
    int packet_length; 
    uint16_t msg_id, msg_id_rcv; 
    mqtt_broker_handle_t broker; 


    mqtt_init(&broker, "localhost"); 
    mqtt_init_auth(&broker, "cid", "campeador"); 

    printf("\nInitialising Winsock..."); 

    if (WSAStartup(MAKEWORD(2,2),&wsa) != 0) { 
     cout<<"Failed. Error Code : "<<WSAGetLastError(); 
     return 1; 
    } 

    cout<<"Initialised.\n"; 

    //Create a socket 
    if((s = socket(AF_INET , SOCK_STREAM , 0)) == INVALID_SOCKET) 
    { 
     cout<<"Could not create socket : " << WSAGetLastError(); 
    } 

    cout<<"Socket created.\n"; 


    server.sin_addr.s_addr = inet_addr("127.0.0.1"); 
    server.sin_family = AF_INET; 
    server.sin_port = htons(1993); 

    //Connect to remote server 
    if (connect(s , (struct sockaddr *)&server , sizeof(server)) < 0) 
    { 
     puts("connect error"); 
     return 1; 
    } 
    puts("Connected"); 
    mqtt_connect(&broker); 

    recv_size = recv(s, server_reply, 2000, 0); 
    server_reply[recv_size] = '\0'; 
    while (server_reply != "end"){ 
     cout<<server_reply<<endl; 
     recv_size = recv(s, server_reply, 2000, 0); 
     server_reply[recv_size] = '\0'; 
    } 

    closesocket(s); 
    WSACleanup(); 

    return 0; 
} 

而且还有以下是mqtt.h

#ifndef __LIBEMQTT_H__ 
#define __LIBEMQTT_H__ 
#endif 
#include <stdint.h> 
#include <string.h> 

#ifndef MQTT_CONF_USERNAME_LENGTH 
#define MQTT_CONF_USERNAME_LENGTH 13 // Recommended by MQTT Specification (12 + '\0') 
#endif 

#ifndef MQTT_CONF_PASSWORD_LENGTH 
#define MQTT_CONF_PASSWORD_LENGTH 13 // Recommended by MQTT Specification (12 + '\0') 
#endif 

#define MQTT_MSG_CONNECT  1<<4 
#define MQTT_MSG_CONNACK  2<<4 
#define MQTT_MSG_PUBLISH  3<<4 
#define MQTT_MSG_PUBACK  4<<4 
#define MQTT_MSG_PUBREC  5<<4 
#define MQTT_MSG_PUBREL  6<<4 
#define MQTT_MSG_PUBCOMP  7<<4 
#define MQTT_MSG_SUBSCRIBE  8<<4 
#define MQTT_MSG_SUBACK  9<<4 
#define MQTT_MSG_UNSUBSCRIBE 10<<4 
#define MQTT_MSG_UNSUBACK  11<<4 
#define MQTT_MSG_PINGREQ  12<<4 
#define MQTT_MSG_PINGRESP  13<<4 
#define MQTT_MSG_DISCONNECT 14<<4 

#define MQTT_DUP_FLAG  1<<3 
#define MQTT_QOS0_FLAG 0<<1 
#define MQTT_QOS1_FLAG 1<<1 
#define MQTT_QOS2_FLAG 2<<1 

#define MQTT_RETAIN_FLAG 1 
#define MQTT_CLEAN_SESSION 1<<1 
#define MQTT_WILL_FLAG  1<<2 
#define MQTT_WILL_RETAIN 1<<5 
#define MQTT_USERNAME_FLAG 1<<7 
#define MQTT_PASSWORD_FLAG 1<<6 

typedef struct { 
    void* socket_info; 
    int (*send)(void* socket_info, const void* buf, unsigned int count); 
    // Connection info 
    char clientid[50]; 
    // Auth fields 
    char username[MQTT_CONF_USERNAME_LENGTH]; 
    char password[MQTT_CONF_PASSWORD_LENGTH]; 
    // Will topic 
    uint8_t will_retain; 
    uint8_t will_qos; 
    uint8_t clean_session; 
    // Management fields 
    uint16_t seq; 
    uint16_t alive; 
} mqtt_broker_handle_t; 

int mqtt_connect(mqtt_broker_handle_t* broker) 
{ 
    uint8_t flags = 0x00; 

    uint16_t clientidlen = strlen(broker->clientid); 
    uint16_t usernamelen = strlen(broker->username); 
    uint16_t passwordlen = strlen(broker->password); 
    uint16_t payload_len = clientidlen + 2; 

    // Preparing the flags 
    if(usernamelen) { 
     payload_len += usernamelen + 2; 
     flags |= MQTT_USERNAME_FLAG; 
    } 
if(passwordlen) { 
     payload_len += passwordlen + 2; 
     flags |= MQTT_PASSWORD_FLAG; 
    } 
    if(broker->clean_session) { 
     flags |= MQTT_CLEAN_SESSION; 
    } 

    // Variable header 
    uint8_t var_header[] = { 
     0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70, // Protocol name: MQTT 
     0x03, // Protocol version 
     flags, // Connect flags 
     broker->alive>>8, broker->alive&0xFF, // Keep alive 
    }; 


    // Fixed header 
    uint8_t fixedHeaderSize = 2; // Default size = one byte Message Type + one byte Remaining Length 
    uint8_t remainLen = sizeof(var_header)+payload_len; 
    if (remainLen > 127) { 
     fixedHeaderSize++;   // add an additional byte for Remaining Length 
    } 
    uint8_t fixed_header[2]; 

    // Message Type 
    fixed_header[0] = MQTT_MSG_CONNECT; 

    // Remaining Length 
    if (remainLen <= 127) { 
     fixed_header[1] = remainLen; 
    } else { 
     // first byte is remainder (mod) of 128, then set the MSB to indicate more bytes 
     fixed_header[1] = remainLen % 128; 
     fixed_header[1] = fixed_header[1] | 0x80; 
     // second byte is number of 128s 
     fixed_header[2] = remainLen/128; 
    } 

    uint16_t offset = 0; 
    uint8_t packet[sizeof(fixed_header)+sizeof(var_header)+ sizeof(payload_len)]; 
    memset(packet, 0, sizeof(packet)); 
    memcpy(packet, fixed_header, sizeof(fixed_header)); 
    offset += sizeof(fixed_header); 
    memcpy(packet+offset, var_header, sizeof(var_header)); 
    offset += sizeof(var_header); 
    // Client ID - UTF encoded 
    packet[offset++] = clientidlen>>8; 
    packet[offset++] = clientidlen&0xFF; 
    memcpy(packet+offset, broker->clientid, clientidlen); 
    offset += clientidlen; 

    if(usernamelen) { 
     // Username - UTF encoded 
     packet[offset++] = usernamelen>>8; 
     packet[offset++] = usernamelen&0xFF; 
     memcpy(packet+offset, broker->username, usernamelen); 
     offset += usernamelen; 
    } 

    if(passwordlen) { 
     // Password - UTF encoded 
     packet[offset++] = passwordlen>>8; 
     packet[offset++] = passwordlen&0xFF; 
     memcpy(packet+offset, broker->password, passwordlen); 
     offset += passwordlen; 
    } 

    return 1; 
} 

代码当我调试的代码有没有错误,但是当我运行代码,我得到错误,指出“变量'数据包'周围的堆栈已损坏”。我对套接字编程和mqtt一般都很陌生,所以任何帮助都会很棒。

回答

0

你显示的是从libemqtt(这是GPL许可的方式,不要忘了:),你有没有尝试从他们提供的例子开始?

libemqtt没有积极开发,并有一个通知说“不要使用”。除非您对解决自己使用MQTT的问题特别感兴趣,否则我建议您查看不同的东西,例如Pahomosquitto客户端库。