2014-10-11 155 views
4
package main 

import (
"fmt" 
"github.com/streadway/amqp" 
"time" 
) 

// Every connection should declare the topology they expect 
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) { 
    //setup connection 
    conn, err := amqp.Dial(url) 
    if err != nil { 
     return nil, nil, err 
    } 
    //build channel in the connection 
    ch, err := conn.Channel() 
     if err != nil { 
     return nil, nil, err 
    } 
    //queue declare 
    if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil { 
     return nil, nil, err 
    } 

    return conn, ch, nil 
} 

func main() { 
    //amqp url 
    url := "amqp://guest:[email protected]:5672"; 
    for i := 1; i <= 2; i++ { 
     fmt.Println("connect ", i) 
     //two goroutine 
     go func() { 
      //queue name 
      queue := fmt.Sprintf("example.reconnect.%d", i) 
      //setup channel in the tcp connection 
      _, pub, err := setup(url, queue) 
      if err != nil { 
       fmt.Println("err publisher setup:", err) 
       return 
      } 
      // Purge the queue from the publisher side to establish initial state 
      if _, err := pub.QueuePurge(queue, false); err != nil { 
       fmt.Println("err purge:", err) 
       return 
      } 
      //publish msg 
      if err := pub.Publish("", queue, false, false, amqp.Publishing{ 
       Body: []byte(fmt.Sprintf("%d", i)), 
      }); err != nil { 
       fmt.Println("err publish:", err) 
       return 
      } 
      //keep running 
      for{ 
       time.Sleep(time.Second * 20) 
      } 
     }() 
    } 
    //keep running 
    for { 
     time.Sleep(time.Second * 20) 
    } 
} 

我想只有一个程序和MQ-服​​务器之间的连接,一个连接可以在rabbitmq的go api中支持多个通道吗?

但有两个方面,一个连接只能支持一个通道,为什么呢?

不能这两个goroutine共享相同的tcp连接吗?

套接字描述符可以共享理论中的所有进程线程。

为什么两个goroutine不共享一个套接字但有自己的通道?

手工模型:

model1

在RabbitMQ的真实模型: model2

回答

4

看着它看起来好像你可以调用conn.Channel()多次为source for the library你喜欢它并通过同一连接创建一个新的通信流。

好吧,我试了一下,这里的工作的例子......一个够程,一个连接,两个通道 我安装的接收器,然后发送消息,然后从接收通道

,如果你想多读队列绑定在一个goroutine中,您可以调用rec.Consume两次,然后在队列中进行选择。

package main 

import (
    "fmt" 
    "github.com/streadway/amqp" 
    "os" 
) 

func main() { 
    conn, err := amqp.Dial("amqp://localhost") 
    e(err) 
    defer conn.Close() 
    fmt.Println("Connected") 
    rec, err := conn.Channel() 
    e(err) 

    fmt.Println("Setup receiver") 
    rq, err := rec.QueueDeclare("go-test", false, false, false, false, nil) 
    e(err) 
    msgs, err := rec.Consume(rq.Name, "", true, false, false, false, nil) 
    e(err) 

    fmt.Println("Setup sender") 
    send, err := conn.Channel() 
    e(err) 
    sq, err := send.QueueDeclare("go-test", false, false, false, false, nil) 
    e(err) 

    fmt.Println("Send message") 
    err = send.Publish("", sq.Name, false, false, amqp.Publishing{ 
     ContentType: "text/plain", 
     Body:  []byte("This is a test"), 
    }) 
    e(err) 

    msg := <-msgs 
    fmt.Println("Received from:", rq, "msg:", string(msg.Body)) 
} 

func e(err error) { 
    if err != nil { 
     fmt.Println(err) 
     os.Exit(1) 
    } 
} 

输出在我的盒子:在相同的goroutine如果

$ go run rmq.go 
Connected 
Setup receiver 
Setup sender 
Send message 
Received from: {go-test 0 0} msg: This is a test 
+0

一个连接。两个连接,如果在两个goroutine.I不知道为什么? @DavidB – 2014-10-15 19:30:21

+0

它看起来像连接和通道是线程安全的,你确定你不能使用来自多个goroutines的连接? (再一次,我没有亲自尝试过) – 2014-10-16 01:33:36

+0

我想表达的是,有多少个goroutines调用了amqp.Dial()来决定tcp连接的数量,主要我想知道的是为什么这样设计?例如只有主线程调用amqp.Dial(),所以有1个tcp连接。我的例子中有2个线程调用amqp.Dial(),所以有2个tcp连接。很抱歉,我没有表达我的问题清楚吗?但是您的示例显示频道可以共享一个连接。 @DavidB – 2014-10-16 11:25:42

相关问题