所以这不像人们想象的那么直截了当。
基本上我使用了一个调度程序线程,它可以像所有连接客户端的控制中心那样工作。因此,无论何时客户端收到一条消息,都会将其发送给调度员,然后将消息分发给每个连接的客户端。
我还必须在另一个线程中接收消息,因为在rust-websocket中没有非阻塞方式接收消息。然后我可以使用永久循环来检查从websocket和调度程序接收到的新消息。
这里是我的代码是如何模样到底:
extern crate websocket;
use std::str;
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
use websocket::{Server, Message, Sender, Receiver};
use websocket::header::WebSocketProtocol;
use websocket::message::Type;
fn main() {
let server = Server::bind("0.0.0.0:2794").unwrap();
let (dispatcher_tx, dispatcher_rx) = mpsc::channel::<String>();
let client_senders: Arc<Mutex<Vec<mpsc::Sender<String>>>> = Arc::new(Mutex::new(vec![]));
// dispatcher thread
{
let client_senders = client_senders.clone();
thread::spawn(move || {
while let Ok(msg) = dispatcher_rx.recv() {
for sender in client_senders.lock().unwrap().iter() {
sender.send(msg.clone()).unwrap();
}
}
});
}
// client threads
for connection in server {
let dispatcher = dispatcher_tx.clone();
let (client_tx, client_rx) = mpsc::channel();
client_senders.lock().unwrap().push(client_tx);
// Spawn a new thread for each connection.
thread::spawn(move || {
let request = connection.unwrap().read_request().unwrap(); // Get the request
let headers = request.headers.clone(); // Keep the headers so we can check them
request.validate().unwrap(); // Validate the request
let mut response = request.accept(); // Form a response
if let Some(&WebSocketProtocol(ref protocols)) = headers.get() {
if protocols.contains(&("rust-websocket".to_string())) {
// We have a protocol we want to use
response.headers.set(WebSocketProtocol(vec!["rust-websocket".to_string()]));
}
}
let mut client = response.send().unwrap(); // Send the response
let ip = client.get_mut_sender()
.get_mut()
.peer_addr()
.unwrap();
println!("Connection from {}", ip);
let message: Message = Message::text("SERVER: Connected.".to_string());
client.send_message(&message).unwrap();
let (mut sender, mut receiver) = client.split();
let(tx, rx) = mpsc::channel::<Message>();
thread::spawn(move || {
for message in receiver.incoming_messages() {
tx.send(message.unwrap()).unwrap();
}
});
loop {
if let Ok(message) = rx.try_recv() {
match message.opcode {
Type::Close => {
let message = Message::close();
sender.send_message(&message).unwrap();
println!("Client {} disconnected", ip);
return;
},
Type::Ping => {
let message = Message::pong(message.payload);
sender.send_message(&message).unwrap();
},
_ => {
let payload_bytes = &message.payload;
let payload_string = match str::from_utf8(payload_bytes) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
let msg_string = format!("MESSAGE: {}: ", payload_string);
dispatcher.send(msg_string).unwrap();
}
}
}
if let Ok(message) = client_rx.try_recv() {
let message: Message = Message::text(message);
sender.send_message(&message).unwrap();
}
}
});
}
}
http://pastebin.com/H9McWLrH
我想类似的东西,放弃了。 AFAICS的问题是接收器对象没有名为'try_recv_message'的方法。这意味着,当您开始查找邮件时,服务器将会阻止,直到您实际收到邮件。 https://github.com/cyderize/rust-websocket/blob/2f80d4c4889602d63cc745aa18e3fdd4ae71eb8a/src/ws/receiver.rs 该项目命名为铁锈聊天使用不同的WebSocket库,以及他们的工作围绕它通过使用多播。 https://github.com/nbaksalyar/rust-chat/blob/master/src/main.rs – nielsle
谢谢!我会研究它。但我不能相信这对于rust-websocket来说会很难,因为即使他们的例子是使用网络聊天。它看起来像他们打算让用户扩展他们的例子,因为单一客户端聊天对我来说没有多大意义。 – DropOfBlood
我很可能错过了一些东西。如果您找到解决方案,请发布。 – nielsle