2016-02-27 1110 views
0

我试图使用Rust-Websocket创建一个简单的聊天室,其中多个人可以互相交谈。使用rust-websocket聊天

我看了一下例子,'server.rs'和'websockets.html'对我来说看起来像一个体面的起点。所以我只是尝试启动并从网络连接。一切正常,但我只能与自己沟通,而不能与其他连接进行沟通(因为它直接将消息发送回sender而不是每个连接)。

所以我试图得到一个载体,所有senders/clients所以我可以遍历它们并发送消息给每一个,但这似乎是有问题的。我无法沟通senderclient,因为它不是线程安全的,我也不能复制其中的任何一个。

我不确定我是否不明白整个借款是100%还是不打算做这样的交叉连接通信。

server.rs:
https://github.com/cyderize/rust-websocket/blob/master/examples/server.rs

websockets.html:
https://github.com/cyderize/rust-websocket/blob/master/examples/websockets.html

我可能会从错误的方向接近这一点。与所有其他线程共享收到的消息可能会更容易。我想到了这一点,但我能想到的唯一事情就是使用channels从线程内部向外部发送消息。有什么方法可以在线程之间直接传播消息吗?我所需要做的就是从一个线程发送一个字符串到另一个线程。

+0

我想类似的东西,放弃了。 AFAICS的问题是接收器对象没有名为'try_recv_message'的方法。这意味着,当您开始查找邮件时,服务器将会阻止,直到您实际收到邮件。 https://github.com/cyderize/rust-websocket/blob/2f80d4c4889602d63cc745aa18e3fdd4ae71eb8a/src/ws/receiver.rs 该项目命名为铁锈聊天使用不同的WebSocket库,以及他们的工作围绕它通过使用多播。 https://github.com/nbaksalyar/rust-chat/blob/master/src/main.rs – nielsle

+0

谢谢!我会研究它。但我不能相信这对于rust-websocket来说会很难,因为即使他们的例子是使用网络聊天。它看起来像他们打算让用户扩展他们的例子,因为单一客户端聊天对我来说没有多大意义。 – DropOfBlood

+0

我很可能错过了一些东西。如果您找到解决方案,请发布。 – nielsle

回答

0

所以这不像人们想象的那么直截了当。

基本上我使用了一个调度程序线程,它可以像所有连接客户端的控制中心那样工作。因此,无论何时客户端收到一条消息,都会将其发送给调度员,然后将消息分发给每个连接的客户端。

我还必须在另一个线程中接收消息,因为在rust-websocket中没有非阻塞方式接收消息。然后我可以使用永久循环来检查从websocket和调度程序接收到的新消息。

这里是我的代码是如何模样到底:

extern crate websocket; 

use std::str; 
use std::sync::{Arc, Mutex}; 
use std::sync::mpsc; 
use std::thread; 

use websocket::{Server, Message, Sender, Receiver}; 
use websocket::header::WebSocketProtocol; 
use websocket::message::Type; 


fn main() { 
    let server = Server::bind("0.0.0.0:2794").unwrap(); 

    let (dispatcher_tx, dispatcher_rx) = mpsc::channel::<String>(); 
    let client_senders: Arc<Mutex<Vec<mpsc::Sender<String>>>> = Arc::new(Mutex::new(vec![])); 

    // dispatcher thread 
    { 
     let client_senders = client_senders.clone(); 
     thread::spawn(move || { 
      while let Ok(msg) = dispatcher_rx.recv() { 
       for sender in client_senders.lock().unwrap().iter() { 
        sender.send(msg.clone()).unwrap(); 
       } 
      } 
     }); 
    } 

    // client threads 
    for connection in server { 
     let dispatcher = dispatcher_tx.clone(); 
     let (client_tx, client_rx) = mpsc::channel(); 
     client_senders.lock().unwrap().push(client_tx); 

     // Spawn a new thread for each connection. 
     thread::spawn(move || { 
      let request = connection.unwrap().read_request().unwrap(); // Get the request 
      let headers = request.headers.clone(); // Keep the headers so we can check them 

      request.validate().unwrap(); // Validate the request 

      let mut response = request.accept(); // Form a response 

      if let Some(&WebSocketProtocol(ref protocols)) = headers.get() { 
       if protocols.contains(&("rust-websocket".to_string())) { 
        // We have a protocol we want to use 
        response.headers.set(WebSocketProtocol(vec!["rust-websocket".to_string()])); 
       } 
      } 

      let mut client = response.send().unwrap(); // Send the response 

      let ip = client.get_mut_sender() 
       .get_mut() 
       .peer_addr() 
       .unwrap(); 

      println!("Connection from {}", ip); 

      let message: Message = Message::text("SERVER: Connected.".to_string()); 
      client.send_message(&message).unwrap(); 

      let (mut sender, mut receiver) = client.split(); 

      let(tx, rx) = mpsc::channel::<Message>(); 
      thread::spawn(move || { 
       for message in receiver.incoming_messages() { 
        tx.send(message.unwrap()).unwrap(); 
       } 
      }); 

      loop { 
       if let Ok(message) = rx.try_recv() { 
        match message.opcode { 
         Type::Close => { 
          let message = Message::close(); 
          sender.send_message(&message).unwrap(); 
          println!("Client {} disconnected", ip); 
          return; 
         }, 
         Type::Ping => { 
          let message = Message::pong(message.payload); 
          sender.send_message(&message).unwrap(); 
         }, 
         _ => { 
          let payload_bytes = &message.payload; 
          let payload_string = match str::from_utf8(payload_bytes) { 
           Ok(v) => v, 
           Err(e) => panic!("Invalid UTF-8 sequence: {}", e), 
          }; 
          let msg_string = format!("MESSAGE: {}: ", payload_string); 
          dispatcher.send(msg_string).unwrap(); 
         } 
        } 
       } 
       if let Ok(message) = client_rx.try_recv() { 
        let message: Message = Message::text(message); 
        sender.send_message(&message).unwrap(); 
       } 
      } 
     }); 
    } 
} 

http://pastebin.com/H9McWLrH