2011-09-29 57 views
3

我有一个场景,这是非常接近这个示例:的Node.js + socket.io +节点AMQP和队列binginds当“再”连接直通socket.io

一个主屏幕:

  • 这个屏幕(客户端)将通过服务器连接到socket.io服务器:9090/scope(io.connect(“http:// server:9090/scope))并将发送一个事件”userBindOk“ .emit(“userBindOk”,message))到socket.io服务器;

  • 服务器接收到连接并且“userBindOk ”。此时,服务器应该获得到rabbitmq服务器的活动连接,并将队列绑定到通过socket.io连接到应用程序的相应用户。样品:

    socket.on( “连接”,函数(客户端){// 客户端ID为1234 //绑定RabbitMQ的交换,队列和: queue.subscribe(//接收回调); } )

  • 到目前为止,没问题 - 我可以通过socket.io发送/接收消息,没有任何问题。

  • 但是,如果我刷新页面,所有这些步骤将再次完成。因此,绑定到队列将发生,但这次与另一个socket.io客户端会话相关。这意味着如果我向与第一个socket.io会话相关的队列发送消息(在页面刷新之前),该绑定应该(我认为)接收消息并将其发送给无效的socket.io客户端(页面在socket.io上下文中刷新= new client.id)。我可以证明这种行为,因为每次刷新页面时,我都需要发送更多的消息x次。例如:我已经第一次连接了: - 所以,1条消息 - 一次屏幕更新;刷新页面:我需要发送2条消息到队列,只有第二条消息将从“实际”socket.io客户端会话接收 - 这种行为将发生多达我刷新页面(20页刷新,20条消息被发送到一个队列,并且服务器socket.io“last”客户端将把该消息发送到客户端socket.io以呈现到屏幕中)。

我认为解决方案是:

  • 找到一个方法来“解除绑定”从socket.io服务器断开连接时的队列 - 我didn`t看到在节点AMQP此选项API尚未(等待它:D)

  • 找到一种方法来使用相同的client.id重新连接socket.io客户端。通过这种方式,我可以识别即将到来的客户端并应用一些逻辑来缓存套接字。

任何想法?我想很清楚......但是,你知道,它`不是那么eaey试图澄清的东西,是非常具体的一些方面,当暴露你的问题......

TKS

回答

1

我解决它这样的:

我用来声明rabbitMq队列为持久= true,autoDelete = false,exclusive = false,并在我的应用程序有1个队列/用户和1交换(type =直接)与routing_key名称= queueName,我的应用程序也使用队列为其他客户端不同浏览器,如android应用程序或iPhone应用程序作为推后备,所以我用crear 1队列为earch用户。

此问题的解决方案是更改我的rabbitMQ队列和交换声明。现在,我将exchange/user声明为fanout和autoDelete = True,并且用户将拥有持久= true,autoDelete = true,exclusive = true(队列号=客户端)的N个队列,并且所有队列都绑定到用户交换(多播)。

注意:我的应用程序在django中很流行,并且我使用node + socket + amqp能够使用web.scokets与浏览器通信,因此我使用node-restler查询我的应用程序api以获取用户 - 队列信息。

多数民众赞成在RabbitMQ的一侧,用于节点+ AMQP +插座我这样做:

服务器端:

  • 的onConnect:用户交流的扇出,自动删除,耐用的声明。然后将队列声明为持久的,自动删除和排他性的,然后将queue.bind传递给用户交换并最终queue.subscribe和socket.disconnect将销毁队列,以便在客户端连接应用程序时存在队列,这解决了刷新的问题,并允许用户已经超过1窗口标签与应用:

服务器端:

  /* 
      * unCaught exception handler 
      */ 

      process.on('uncaughtException', function (err) { 
       sys.p('Caught exception: ' + err); 
       global.connection.end(); 
      }); 


      /* 
      * Requiere libraries 
      */ 

      global.sys = require('sys'); 
      global.amqp = require('amqp'); 
      var rest = require('restler'); 
      var io = require('socket.io').listen(8080); 

      /* 
      * Module global variables 
      */ 
      global.amqpReady = 0; 


      /* 
      * RabbitMQ connection 
      */ 

      global.connection = global.amqp.createConnection({ 
          host: host, 
          login: adminuser, 
          password: adminpassword, 
          vhost: vhost 
          }); 

      global.connection.addListener('ready', 
         function() { 
          sys.p("RabbitMQ connection stablished"); 
          global.amqpReady = 1; 
         } 
      ); 


      /* 
      * Web-Socket declaration 
      */ 

      io.sockets.on('connection', function (socket) { 
       socket.on('message', function (data) { 
        sys.p(data); 
        try{ 
         var message = JSON.parse(data);     
        }catch(error){ 
         socket.emit("message", JSON.stringify({"error": "invalid_params", "code": 400})); 
         var message = {}; 
        }   
        var message = JSON.parse(data); 
        if(message.token != undefined) { 

         rest.get("http://dev.kinkajougames.com/api/push", 
           {headers: 
            { 
             "x-geochat-auth-token": message.token 
            } 
           }).on('complete', 
            function(data) { 
             a = data; 
           }).on('success', 
            function (data){ 
             sys.p(data); 
             try{         
              sys.p("---- creating exchange"); 
              socket.exchange = global.connection.exchange(data.data.bind, {type: 'fanout', durable: true, autoDelete: true}); 
              sys.p("---- declarando queue"); 
              socket.q = global.connection.queue(data.data.queue, {durable: true, autoDelete: true, exclusive: false}, 
               function(){ 
                sys.p("---- bind queue to exchange"); 
                //socket.q.bind(socket.exchange, "*"); 
                socket.q.bind(socket.exchange, "*"); 
                sys.p("---- subscribing queue exchange"); 
                socket.q.subscribe(function (message) { 
                 socket.emit("message", message.data.toString()); 
                });  
               } 
              ); 
             }catch(err){ 
              sys.p("Imposible to connection to rabbitMQ-server"); 
             }         

           }).on('error', function (data){ 
            a = { 
             data: data, 
            }; 
           }).on('400', function() { 
            socket.emit("message", JSON.stringify({"error": "connection_error", "code": 400})); 
           }).on('401', function() { 
            socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401})); 
           });    
        } 
        else { 
         socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401})); 
        } 

       }); 
       socket.on('disconnect', function() { 
        socket.q.destroy(); 
        sys.p("closing socket"); 
       }); 
      }); 

客户端:

  • 插座intance与选项的力量新连接'= true并且'在卸载时同步断开连接'=假。
  • 客户端使用onbeforeunload和onunload窗口对象事件发送socket.disconnect
  • socket.connect事件上的客户端发送用户令牌给节点。从插座

     var socket; 
         function webSocket(){ 
          //var socket = new io.Socket(); 
          socket = io.connect("ws.dev.kinkajougames.com", {'force new connection':true, 'sync disconnect on unload': false}); 
          //socket.connect(); 
    
          onSocketConnect = function(){ 
           alert('Connected'); 
           socket.send(JSON.stringify({ 
            token: Get_Cookie('liveScoopToken') 
           })); 
          }; 
    
          socket.on('connect', onSocketConnect); 
          socket.on('message', function(data){ 
           message = JSON.parse(data); 
           if (message.action == "chat") { 
            if (idList[message.data.sender] != undefined) { 
             chatboxManager.dispatch(message.data.sender, { 
              first_name: message.data.sender 
             }, message.data.message); 
            } 
            else { 
             var username = message.data.sender; 
             Data.Collections.Chats.add({ 
              id: username, 
              title: username, 
              user: username, 
              desc: "Chat", 
              first_name: username, 
              last_name: "" 
             }); 
             idList[message.data.sender] = message.data.sender; 
             chatboxManager.addBox(message.data.sender, { 
              title: username, 
              user: username, 
              desc: "Chat", 
              first_name: username, 
              last_name: "", 
              boxClosed: function(id){ 
               alert("closing"); 
              } 
             }); 
             chatboxManager.dispatch(message.data.sender, { 
              first_name: message.data.sender 
             }, message.data.message); 
            } 
           } 
          }); 
         }       
    
         webSocket(); 
    
         window.onbeforeunload = function() { 
          return "You have made unsaved changes. Would you still like to leave this page?"; 
         } 
    
         window.onunload = function(){ 
          socket.disconnect(); 
         } 
    

  • 理线的消息就是这样,该消息的所以没有更多的圆形robing。