2017-10-20 1840 views
2

如果客户端将因网络错误而断开连接,则服务器必须在我的pub/sub连接中关闭。我知道ctx.Done()函数,但不知道如何在我的情况下正确使用它。请有人解释一下吗?如何在客户端断开连接时正确使用ctx.Done()?

GRPC-GO:1.7.0

去版本go1.8.4

func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error { 
    ctx := stream.Context() 
    _, ok := user.FromContext(ctx) 
    if !ok { 
     return grpc.Errorf(codes.Unauthenticated, "user not found") 
    } 

    pubsub := a.redisClient.Subscribe("notifications") 
    defer pubsub.Close() 

    for { 
     msg, err := pubsub.ReceiveMessage() 
     if err != nil { 
      grpclog.Warningf("Notifications: pubsub error: %v", err) 
      return grpc.Errorf(codes.Internal, "pubsub error %v", err) 
     } 

     notification := &pb.Notification{} 
     err = json.Unmarshal([]byte(msg.Payload), notification) 
     if err != nil { 
      grpclog.Warningf("Notifications: parse error: %v", err) 
      continue 
     } 
     if err := stream.Send(notification); err != nil { 
      grpclog.Warningf("Notifications: %v", err) 
      return err 
     } 
     grpclog.Infof("Notifications: send msg %v", notification) 
    } 
} 

回答

0

您应该取消从调用函数(或其它地方的情况下,可以访问)的情况,以及做适当的行动Done()检查select声明。

做的是提供一种用于select语句

做恢复时,代表此背景下所做的工作应取消已结束营业的通道。如果此上下文无法取消,则完成可能返回nil。连续调用完成返回相同的值。

而且

WithCancel返回父母的副本,新的渠道完成。返回的上下文的完成通道在返回的取消函数被调用时或父上下文的完成通道关闭时关闭,无论哪个先发生。

取消此上下文会释放与其关联的资源,因此只要在此上下文中运行的操作完成,代码就应该立即调用取消。

go func() { 
    for { 
     select { 
     case <-ctx.Done(): 
      return // returning not to leak the goroutine 
     case dst <- n: 
      n++ 
     } 
    } 
}() 
1

您可以使用select。而不是从一个函数正常获取数据,使用一个通道来获取数据和一个去例行程序来处理它。 一些这样的事:

func (a *API) Notifications(in *empty.Empty, stream 
    pb.Service_NotificationsServer) error { 
    ctx := stream.Context() 
    _, ok := user.FromContext(ctx) 
    if !ok { 
     return grpc.Errorf(codes.Unauthenticated, "user not found") 
    } 

    pubsub := a.redisClient.Subscribe("notifications") 
    defer pubsub.Close() 

    // I can not build the code, so I assume the msg in your code Message struct 
    c := make(chan Message) 
    go func() { 
     for { 
      msg, err := pubsub.ReceiveMessage() 
      if err != nil { 
       grpclog.Warningf("Notifications: pubsub error: %v", err) 
       close(c) 
       return grpc.Errorf(codes.Internal, "pubsub error %v", err) 
      } 
      c<- msg 
     } 
    }() 

    for { 
     select { 
      case msg, ok := <-c: 
       if !ok { 
        // channel is closed handle it 
       } 
       notification := &pb.Notification{} 
       err = json.Unmarshal([]byte(msg.Payload), notification) 
       if err != nil { 
        grpclog.Warningf("Notifications: parse error: %v", err) 
        continue 
       } 
       if err := stream.Send(notification); err != nil { 
        grpclog.Warningf("Notifications: %v", err) 
        return err 
       } 
       grpclog.Infof("Notifications: send msg %v", notification) 
      case <- ctx.Done(): 
       // do exit logic. some how close the pubsub, so next 
       // ReceiveMessage() return an error 
       // if forget to do that the go routine runs for ever 
       // until the end of main(), which I think its not what you wanted 
       pubsub.Close() // Its just pseudo code 
       return 
     } 
    } 
} 

读取消息(I假设的类型是消息)从信道,并使用select功率。在这种情况下

其他两个相关的事情:

  1. 确保去年底例行完成这项功能之后。我猜不出,因为我不知道代码,但我认为有一个Close()方法关闭pubsub,所以下一个ReceiveMessage返回一个错误。 (我看到推迟,做我希望的工作)

  2. 如果ReceiveMessage之前有一个错误ctx.Done您可以关闭通道,然后打破循环。

相关问题