2016-12-24 101 views
2

所以这里是我的websocket服务器实现。Akka websocket - 如何通过服务器关闭连接?

val route = get { 
    pathEndOrSingleSlash { 
    handleWebSocketMessages(websocketFlow) 
    } 
} 

def websocketFlow: Flow[Message, Message, Any] = 
    Flow[Message] 
    .collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) } 
    .via(chatActorFlow(UUID.randomUUID())) 
    .map(event => TextMessage.Strict(protocol.serialize(event))) 


def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = { 

    val sink = Flow[Protocol.Message] 
    .map(msg => Protocol.SignedMessage(connectionId, msg)) 
    .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId))) 

    val source = Source 
     .mapMaterializedValue { 
     actor : ActorRef => { 
      chatRef ! Protocol.OpenConnection(actor, connectionId) 
     } 
     } 

    Flow.fromSinkAndSource(sink, source) 
} 

我不知道是否有什么办法一旦ConnectionClosed类型的消息是由chatRef发送关闭连接?

+0

从代码看来,'chatRef'实际上*会接收一个'ConnectionClosed'消息(作为'Sink.actorRef'的onComplete消息)。你能否澄清你想达到的目标? –

+0

我想手动关闭服务器端的连接。让我们假设我有一个“禁止用户”列表,每当被禁用的用户打开一个连接时,我想通过服务器关闭它。 –

回答

4

下面的解决方案允许通过终止由Source.actorRef阶段实现的Actor来从服务器端删除连接。这只需发送PoisonPill即可完成。

现在,我还不清楚你想在连接时识别一个“被禁止”的客户端,所以这个例子 - 故意 - 非常简单:服务器在最大数量的客户连接。如果您想要随时使用任何其他策略来启动客户端,则仍然可以应用相同的逻辑并将PoisonPill发送给其自己的源演员。

object ChatApp extends App { 

    implicit val system = ActorSystem("chat") 
    implicit val executor: ExecutionContextExecutor = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    val route = get { 
    pathEndOrSingleSlash { 
     handleWebSocketMessages(websocketFlow) 
    } 
    } 

    val maximumClients = 1 

    class ChatRef extends Actor { 
    override def receive: Receive = withClients(Map.empty[UUID, ActorRef]) 

    def withClients(clients: Map[UUID, ActorRef]): Receive = { 
     case SignedMessage(uuid, msg) => clients.collect{ 
     case (id, ar) if id != uuid => ar ! msg 
     } 
     case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill 
     case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar))) 
     case CloseConnection(uuid) => context.become(withClients(clients - uuid)) 
    } 
    } 

    object Protocol { 
    case class SignedMessage(uuid: UUID, msg: String) 
    case class OpenConnection(actor: ActorRef, uuid: UUID) 
    case class CloseConnection(uuid: UUID) 
    } 

    val chatRef = system.actorOf(Props[ChatRef]) 

    def websocketFlow: Flow[Message, Message, Any] = 
    Flow[Message] 
     .mapAsync(1) { 
     case TextMessage.Strict(s) => Future.successful(s) 
     case TextMessage.Streamed(s) => s.runFold("")(_ + _) 
     case b: BinaryMessage => throw new Exception("Binary message cannot be handled") 
     }.via(chatActorFlow(UUID.randomUUID())) 
     .map(TextMessage(_)) 

    def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = { 

    val sink = Flow[String] 
     .map(msg => Protocol.SignedMessage(connectionId, msg)) 
     .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId))) 

    val source = Source.actorRef(16, OverflowStrategy.fail) 
     .mapMaterializedValue { 
     actor : ActorRef => { 
      chatRef ! Protocol.OpenConnection(actor, connectionId) 
     } 
     } 

    Flow.fromSinkAndSource(sink, source) 
    } 

    Http().bindAndHandle(route, "0.0.0.0", 8080) 
    .map(_ => println(s"Started server...")) 

} 
+0

嘿,这是我正在寻找的答案!在我的情况下,客户端必须在连接打开后的30秒内进行身份验证,否则服务器应该断开连接。 'PoisonPill'做的很好,谢谢! –