2011-03-29 51 views
2

我试图使用zeroMQ作为在多个线程之间实现消息传递系统的一种方式。我尝试了下面的代码,但它不起作用;在具体的每个线程中调用zmq_recv不会等待/阻止任何消息被执行。使用ZeroMQ消息的线程间通信

你能帮我这段代码吗?

我使用Linux操作系统和gcc

问候

AFG

static void * 
    worker_routine (void *context) { 
     // Socket to talk to dispatcher 
     void *receiver = zmq_socket (context, ZMQ_REP); 
     zmq_connect (receiver, "inproc://workers"); 
     while (1) { 

      zmq_msg_t request; 
      zmq_msg_init(&request); 
      zmq_recv(receiver, &request, 0); 
      printf ("Received request\n"); 
      // Do some 'work' 
      usleep (1000); 
      // Send reply back to client 
      zmq_send (receiver, &request, 0); 
     } 
     zmq_close (receiver); 
     return NULL; 
    } 

    int main (void) { 

    void *context = zmq_init (1); 
    void *clients = zmq_socket (context, ZMQ_REP); 
    zmq_bind (clients, "inproc://workers"); 

    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 

    zmq_close (clients); 
    zmq_term (context); 
    return 0; 
    } 
+0

我再次阅读指南ZeroMQ。有谁知道我是否需要为我的目的创建一个像QUEUE这样的zmq_device?我还注意到,有样本使用“ipc”作为协议..我总是认为对于MT我必须使用“inproc”..任何人都知道这是否会产生影响? – 2011-03-29 21:44:58

回答

3

你关闭插座,ZeroMQ要创建的线程之后。他们可能没有时间达到阻止状态,如果他们这样做了,只要您销毁zmq上下文,他们就会失败。从zmq_term man page

上下文终止通过以下步骤进行:

任何阻塞目前在插座范围内开放正在进行的操作应立即返回与ETERM的错误代码。

+0

我可能需要添加也尝试添加一些“睡眠”;无论如何,我仍然不知道我预期的行为是否满足该设置..请参阅我的意见下面的问题。 PS。谢谢你的帮助! – 2011-03-29 21:46:29

6

两个套接字都是REP。你想要的是REQ + REP。

0

首先,由于@sustrik指出你需要使用REQREP,主线程和工作线程都不能是REP

其次,您需要提供一些种类的阻塞循环在你的主线程:

int main (int argc, char **argv) 
{ 
    void *context = zmq_init (1); 
    void *clients = zmq_socket (context, ZMQ_REP); // use ZMQ_REQ on the clients 
    zmq_bind (clients, "inproc://workers"); 

    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 

    while (TRUE) 
    { 
        // worker thread connected asking for work 
        zmq_msg_t request; 
        zmq_msg_init (&request); 
     zmq_recv (clients, &request, 0); 
     zmq_msg_close (&request); 

     // do whatever you need to do with the clients' request here 

     // send work to clients 
     zmq_msg_t reply; 
     zmq_msg_init_data (&reply, "Reply", 5, NULL, NULL); 
     zmq_send (clients, &reply, 0); 
     zmq_msg_close (&reply); 
    } 

    zmq_close (clients); 
    zmq_term (context); 
    return 0; 
}