2014-09-21 114 views
0

我想将服务器上的消息(ZMQ_ROUTER套接字,处理多个客户端)传输到redis服务器以用于存储目的。我听说,redis不会说ZMQ。所以如果不搭桥,就不可能实现。我接受你的建议。在哪里看?将zeromq套接字连接到redis服务器以进行数据传输?

//负载平衡的多线程版服务器:

#include "zhelpers.hpp" 
#include <queue> 
#include "zmq.hpp" 
#include <stdio.h> 
#include <string> 
#include <vector> 
#include "datamsg.pb.h" 
using namespace google::protobuf::io; 
    bool verify(std::string str, std::vector<std::string> &s) 
    { 
    for(int q=0;q<s.size();q++) 
    {  
    if(s.at(q)==str.substr(0,4)){ 
    s.push_back(str.substr(4,str.length()-1)); 

    return true; 
    } 
} 
return false; 
} 
// Basic request-reply client using REQ socket 
static void * worker_thread(void *arg) { 
    zmq::context_t context(1); 
    zmq::message_t worker_receive; 
    datamsg worker_parsed; 
    zmq::socket_t worker(context, ZMQ_REQ); 
    s_set_id(worker); // Makes tracing easier 
    worker.connect("ipc://backend.ipc"); 
    // Tell backend we're ready for work 
    s_send(worker, "READY"); 

    while (1) { 
    // Read and save all frames until we get an empty frame 
    worker.recv(&worker_receive); 
    worker_parsed.ParseFromArray(worker_receive.data(), worker_receive.size()); 
    // printing after parsing......... 
    s_sendmore (worker, worker_parsed.destination()); 
    s_sendmore (worker, ""); 
    worker.send(worker_receive);// Here I sent the same structure back 
} 
return (NULL); 
} 

int main (int argc, char *argv[]) { 

// Prepare our context and sockets 
zmq::context_t context(1); 
zmq::socket_t frontend (context, ZMQ_ROUTER); 
zmq::socket_t backend (context, ZMQ_ROUTER); 
zmq::socket_t verification (context, ZMQ_REP); 
verification.bind("tcp://*:5557"); 
std::vector<std::string> s; 
s.reserve(10); 
s.push_back("cli4"); 
frontend.bind("tcp://*:5559"); 
backend.bind("ipc://backend.ipc"); 
zmq::message_t frontend_received; 
zmq::message_t front_get; 
int worker_nbr; 
for(worker_nbr = 0; worker_nbr < 3; worker_nbr++) { 
pthread_t worker; 
pthread_create(&worker, NULL, worker_thread, NULL); 
} 
std::queue<std::string> worker_queue; 

while (1) { 
    // Initialize poll set 
    zmq::pollitem_t items[] = { 
    // Always poll for worker activity on backend 
    { backend, 0, ZMQ_POLLIN, 0 }, 
    // Poll front-end only if we have available workers 
    { frontend, 0, ZMQ_POLLIN, 0 }, 
    //Poll for new customer for verification of client refrence ID 
    {verification,0,ZMQ_POLLIN,0 } 
}; zmq::poll (items, 3, -1); 

if (items [0].revents & ZMQ_POLLIN) { // Handle worker activity on backend 
// Queue worker address for LoadBalanced routing 
    worker_queue.push(s_recv (backend)); 

// Second frame is empty 
std::string empty = s_recv (backend); 
assert (empty.size() == 0); 

// Third frame is READY or else a client reply address 
std::string client_addr = s_recv (backend); 
// If client reply, send rest back to frontend 
if(client_addr.compare("READY") != 0) { 
    std::string empty = s_recv (backend); 
    assert (empty.size() == 0); 
    backend.recv(&frontend_received); 
    s_sendmore (frontend, client_addr); 
    s_sendmore (frontend, ""); 
    frontend.send(frontend_received); 
    //frontend.close(); 
    } 
} 
if (items [1].revents & ZMQ_POLLIN) { 
// Client request is [address][request] 

std::string client_addr = s_recv (frontend); 

frontend.recv(&front_get); 
std::string worker_addr = worker_queue.front(); 
worker_queue.pop(); 

    s_sendmore (backend, worker_addr); 
    s_sendmore (backend, ""); 
    backend.send(front_get); 

} 
if (items [2].revents & ZMQ_POLLIN) { 
std::string refrence=s_recv(verification); 
    if(verify(refrence,s)){ 
    s_send(verification,"OK"); 
    std::cout<<"ID:"<<refrence.substr(4,(refrence.length()-1))<<" Has been Registered" <<std::endl; 
} 
else s_send(verification,"Verification Failed!"); 
    } 

} 
sleep (1); 
return 0; 
} 
+0

Apache Camel是一个在不同协议之间构建总线的好项目。 – zenbeni 2014-09-22 08:29:58

+0

为什么您的应用程序需要使用ZMQ,而不是使用redis模块/库/等直接与redis对话?如果redis服务器位于另一台主机上,您需要通过某种方式首先将数据发送给该主机,那么通常您会为该主机编写一个客户机应用程序,然后该应用程序会原生地与Redis进行通信。 – Jason 2014-09-22 13:45:17

+0

@Jason我需要通过redis服务器记录所有的消息。 zmq和redis都本地运行(localhost)但是我找不到从zmq套接字与redis进行通信的方式。我是否需要编写一个单独的客户端? – monsterrrrr 2014-09-22 18:53:14

回答

2

所以,你必须运行一个ZMQ ROUTER插座的应用程序,并且要存档这些消息的Redis?除非你有一些你没有提到的限制,否则你应该直接从你的应用程序连接到redis,而不是试图通过ZMQ传递所有的通信。 ZMQ插槽只有有史以来与其他ZMQ套接字(没有或多或少逆向工程ZMQ协议,但这相当于建立一个桥梁,你说你不想要)。

Redis没有本机ZMQ连接选项。

或多或少你需要完成看起来像这样的内容:

-------Application-------  ------------ 
|      |  | External | 
|   ZMQ socket-(|<----(| Source | 
|   v   |  ------------ 
|   V   | 
|  (Process Data) |  ---------- 
|   V   |  | Redis | 
|  Redis connector--|)---->| Server | 
|      |  ---------- 
------------------------- 

希望这是有道理的。如果您在原始问题中提供代码,那么我们可以直接解决该问题。

+0

+1为杀手ascii艺术! – 2014-09-23 14:15:07

+0

@Jason其实我的应用程序完全基于zeromq。它是一个拥有多个套接字的聊天服务器。客户端连接到它即时通讯。我想将邮件从zmq套接字传输到redis。所以绕过zeromq不是一个选项(至少我不知道)。 Redis连接器的含义是什么?这是什么类型的连接器?种类的插座?如果你想查看,我已经粘贴了所有的代码,请高兴地做,并且非常感谢你的帮助,通过可视化:) +1 – monsterrrrr 2014-09-23 17:47:40

+0

你的应用程序是用某种语言编写的(我猜Python,但你没有指定)。该语言具有绑定,允许您创建ZMQ套接字并将其连接到远程ZMQ套接字。该语言还具有*独立*绑定,可让您连接并发送数据到redis。 – Jason 2014-09-23 17:53:37

相关问题