2016-08-01 96 views
1

我试图建立一个命令行聊天室,服务器正在处理连接并将来自一个客户端的输入重复回到所有其他客户端。 目前服务器能够接受来自多个客户端的输入,但只能将信息单独发回给这些客户端。我认为我的问题是,每个连接正在一个单独的线程上处理。我将如何允许线程彼此进行通信或能够将数据发送到每个线程?使用多线程通过NetworkStream发送数据

Server代码:

namespace ConsoleApplication 
{ 


    class TcpHelper 
    { 


     private static object _lock = new object(); 
     private static List<Task> _connections = new List<Task>(); 


     private static TcpListener listener { get; set; } 
     private static bool accept { get; set; } = false; 

     private static Task StartListener() 
     { 
      return Task.Run(async() => 
      { 
       IPAddress address = IPAddress.Parse("127.0.0.1"); 
       int port = 5678; 
       listener = new TcpListener(address, port); 

       listener.Start(); 

       Console.WriteLine($"Server started. Listening to TCP clients at 127.0.0.1:{port}"); 

       while (true) 
       { 
        var tcpClient = await listener.AcceptTcpClientAsync(); 
        Console.WriteLine("Client has connected"); 
        var task = StartHandleConnectionAsync(tcpClient); 
        if (task.IsFaulted) 
         task.Wait(); 
       } 
      }); 
     } 

     // Register and handle the connection 
     private static async Task StartHandleConnectionAsync(TcpClient tcpClient) 
     { 
      // start the new connection task 
      var connectionTask = HandleConnectionAsync(tcpClient); 



      // add it to the list of pending task 
      lock (_lock) 
       _connections.Add(connectionTask); 

      // catch all errors of HandleConnectionAsync 
      try 
      { 
       await connectionTask; 

      } 
      catch (Exception ex) 
      { 
       // log the error 
       Console.WriteLine(ex.ToString()); 
      } 
      finally 
      { 
       // remove pending task 
       lock (_lock) 
        _connections.Remove(connectionTask); 
      } 
     } 






     private static async Task HandleConnectionAsync(TcpClient client) 
     { 

      await Task.Yield(); 


      { 
       using (var networkStream = client.GetStream()) 
       { 

        if (client != null) 
        { 
         Console.WriteLine("Client connected. Waiting for data."); 



         StreamReader streamreader = new StreamReader(networkStream); 
         StreamWriter streamwriter = new StreamWriter(networkStream); 

         string clientmessage = ""; 
         string servermessage = ""; 


         while (clientmessage != null && clientmessage != "quit") 
         { 
          clientmessage = await streamreader.ReadLineAsync(); 
          Console.WriteLine(clientmessage); 
          servermessage = clientmessage; 
          streamwriter.WriteLine(servermessage); 
          streamwriter.Flush(); 


         } 
         Console.WriteLine("Closing connection."); 
         networkStream.Dispose(); 
        } 
       } 

      } 

     } 
     public static void Main(string[] args) 
     { 
      // Start the server 

      Console.WriteLine("Hit Ctrl-C to close the chat server"); 
      TcpHelper.StartListener().Wait(); 

     } 

    } 

} 

客户端代码:

namespace Client2 
{ 
    public class Program 
    { 

     private static void clientConnect() 
     { 
      TcpClient socketForServer = new TcpClient(); 
      bool status = true; 
      string userName; 
      Console.Write("Input Username: "); 
      userName = Console.ReadLine(); 

      try 
      { 
       IPAddress address = IPAddress.Parse("127.0.0.1"); 
       socketForServer.ConnectAsync(address, 5678); 
       Console.WriteLine("Connected to Server"); 
      } 
      catch 
      { 
       Console.WriteLine("Failed to Connect to server{0}:999", "localhost"); 
       return; 
      } 
      NetworkStream networkStream = socketForServer.GetStream(); 
      StreamReader streamreader = new StreamReader(networkStream); 
      StreamWriter streamwriter = new StreamWriter(networkStream); 
      try 
      { 
       string clientmessage = ""; 
       string servermessage = ""; 
       while (status) 
       { 
        Console.Write(userName + ": "); 
        clientmessage = Console.ReadLine(); 
        if ((clientmessage == "quit") || (clientmessage == "QUIT")) 
        { 
         status = false; 
         streamwriter.WriteLine("quit"); 
         streamwriter.WriteLine(userName + " has left the conversation"); 
         streamwriter.Flush(); 

        } 
        if ((clientmessage != "quit") && (clientmessage != "quit")) 
        { 
         streamwriter.WriteLine(userName + ": " + clientmessage); 
         streamwriter.Flush(); 
         servermessage = streamreader.ReadLine(); 
         Console.WriteLine("Server:" + servermessage); 
        } 
       } 
      } 
      catch 
      { 
       Console.WriteLine("Exception reading from the server"); 
      } 
      streamreader.Dispose(); 
      networkStream.Dispose(); 
      streamwriter.Dispose(); 
     } 
     public static void Main(string[] args) 
     { 
      clientConnect(); 
     } 
    } 
} 

回答

2

最主要的错在你的代码是,你不尝试发送从一个客户机接收到所连接的其他客户端的数据。您的服务器上有_connections列表,但存储在列表中的唯一东西是连接的Task对象,您甚至不会对这些对象执行任何操作。

相反,您应该自己维护一个连接列表,以便当您从一个客户端收到消息时,可以将该消息转发给其他客户端。

至少应该是List<TcpClient>,但是因为您使用的是StreamReaderStreamWriter,所以您还需要初始化并将这些对象存储在列表中。另外,你应该包含一个客户端标识符。一个明显的选择是客户端名称(即用户输入的名称),但是您的示例在聊天协议中没有提供任何机制来传输该标识作为连接初始化的一部分,所以在我的示例(下面)我只是使用一个简单的整数值。

有你发布的代码一些其他的违规行为,如:

  • 开始在一个全新的线程任务,只是为了执行该让你启动异步操作的点了几个语句。在我的例子中,我只是省略了代码的Task.Run()部分,因为它不是必需的。
  • 当它返回IsFaulted时检查连接特定的任务。由于这个Task对象返回时实际上不可能发生任何I/O,因此该逻辑几乎没有用处。对Wait()的调用将抛出异常,该异常将传播到主线程的Wait()调用,终止服务器。但是,如果出现任何其他错误,您不会终止服务器,所以目前尚不清楚您为什么要这么做。
  • 有一个虚假的电话Task.Yield()。我不知道你想在那里完成什么,但不管它是什么,这个声明没有用。我只是删除它。
  • 在您的客户端代码中,您只会在发送数据时尝试从服务器接收数据。这是非常错误的;您希望客户能够及时响应,并在收到数据后立即收到数据。在我的版本中,我包含了一个简单的小匿名方法,立即调用该方法来启动一个单独的消息接收循环,该循环将与主用户输入循环异步并行执行。
  • 同样在客户端代码中,您正在发送“&hellip; has left&hellip;”消息将导致服务器关闭连接的“退出”消息。这意味着服务器永远不会收到“......剩下的......”信息。我颠倒了消息的顺序,以便“退出”始终是客户端发送的最后一件事情。

我的版本是这样的:

服务器:

class TcpHelper 
{ 
    class ClientData : IDisposable 
    { 
     private static int _nextId; 

     public int ID { get; private set; } 
     public TcpClient Client { get; private set; } 
     public TextReader Reader { get; private set; } 
     public TextWriter Writer { get; private set; } 

     public ClientData(TcpClient client) 
     { 
      ID = _nextId++; 
      Client = client; 

      NetworkStream stream = client.GetStream(); 

      Reader = new StreamReader(stream); 
      Writer = new StreamWriter(stream); 
     } 

     public void Dispose() 
     { 
      Writer.Close(); 
      Reader.Close(); 
      Client.Close(); 
     } 
    } 

    private static readonly object _lock = new object(); 
    private static readonly List<ClientData> _connections = new List<ClientData>(); 

    private static TcpListener listener { get; set; } 
    private static bool accept { get; set; } 

    public static async Task StartListener() 
    { 
     IPAddress address = IPAddress.Any; 
     int port = 5678; 
     listener = new TcpListener(address, port); 

     listener.Start(); 

     Console.WriteLine("Server started. Listening to TCP clients on port {0}", port); 

     while (true) 
     { 
      var tcpClient = await listener.AcceptTcpClientAsync(); 
      Console.WriteLine("Client has connected"); 
      var task = StartHandleConnectionAsync(tcpClient); 
      if (task.IsFaulted) 
       task.Wait(); 
     } 
    } 

    // Register and handle the connection 
    private static async Task StartHandleConnectionAsync(TcpClient tcpClient) 
    { 
     ClientData clientData = new ClientData(tcpClient); 

     lock (_lock) _connections.Add(clientData); 

     // catch all errors of HandleConnectionAsync 
     try 
     { 
      await HandleConnectionAsync(clientData); 
     } 
     catch (Exception ex) 
     { 
      // log the error 
      Console.WriteLine(ex.ToString()); 
     } 
     finally 
     { 
      lock (_lock) _connections.Remove(clientData); 
      clientData.Dispose(); 
     } 
    } 

    private static async Task HandleConnectionAsync(ClientData clientData) 
    { 
     Console.WriteLine("Client connected. Waiting for data."); 

     string clientmessage; 

     while ((clientmessage = await clientData.Reader.ReadLineAsync()) != null && clientmessage != "quit") 
     { 
      string message = "From " + clientData.ID + ": " + clientmessage; 

      Console.WriteLine(message); 

      lock (_lock) 
      { 
       // Locking the entire operation ensures that a) none of the client objects 
       // are disposed before we can write to them, and b) all of the chat messages 
       // are received in the same order by all clients. 
       foreach (ClientData recipient in _connections.Where(r => r.ID != clientData.ID)) 
       { 
        recipient.Writer.WriteLine(message); 
        recipient.Writer.Flush(); 
       } 
      } 
     } 
     Console.WriteLine("Closing connection."); 
    } 
} 

客户:

class Program 
{ 
    private const int _kport = 5678; 

    private static async Task clientConnect() 
    { 
     IPAddress address = IPAddress.Loopback; 
     TcpClient socketForServer = new TcpClient(); 
     string userName; 
     Console.Write("Input Username: "); 
     userName = Console.ReadLine(); 

     try 
     { 
      await socketForServer.ConnectAsync(address, _kport); 
      Console.WriteLine("Connected to Server"); 
     } 
     catch (Exception e) 
     { 
      Console.WriteLine("Failed to Connect to server {0}:{1}", address, _kport); 
      return; 
     } 


     using (NetworkStream networkStream = socketForServer.GetStream()) 
     { 
      var readTask = ((Func<Task>)(async() => 
      { 
       using (StreamReader reader = new StreamReader(networkStream)) 
       { 
        string receivedText; 

        while ((receivedText = await reader.ReadLineAsync()) != null) 
        { 
         Console.WriteLine("Server:" + receivedText); 
        } 
       } 
      }))(); 

      using (StreamWriter streamwriter = new StreamWriter(networkStream)) 
      { 
       try 
       { 
        while (true) 
        { 
         Console.Write(userName + ": "); 
         string clientmessage = Console.ReadLine(); 
         if ((clientmessage == "quit") || (clientmessage == "QUIT")) 
         { 
          streamwriter.WriteLine(userName + " has left the conversation"); 
          streamwriter.WriteLine("quit"); 
          streamwriter.Flush(); 
          break; 
         } 
         else 
         { 
          streamwriter.WriteLine(userName + ": " + clientmessage); 
          streamwriter.Flush(); 
         } 
        } 

        await readTask; 
       } 
       catch (Exception e) 
       { 
        Console.WriteLine("Exception writing to server: " + e); 
        throw; 
       } 
      } 
     } 
    } 

    public static void Main(string[] args) 
    { 
     clientConnect().Wait(); 
    } 
} 

还有很多你需要去努力。您可能想要在服务器端实现适当的聊天用户名初始化。至少,对于真实世界的代码,您希望进行更多的错误检查,并确保客户端ID可靠地生成(如果您只需要正面的ID值,则不能超过2^31-1在它回滚到0之前的连接)。

我还做了其他一些非必要的小改动,例如使用IPAddress.AnyIPAddress.Loopback的值而不是解析字符串,并且通常简化并清理代码。此外,我目前还没有使用C#6编译器,所以我更改了使用C#6功能的代码,以便它可以使用C#5进行编译。

要做一个完整的聊天服务器,你仍然有你的工作切断你。但我希望上述让你回到正确的轨道上。

+0

WOW。很有帮助。是的,我大部分时间都是用Google搜索,把我发现的东西拼凑在一起,并排除任何错误。你真的超越了我,我非常感激。我曾以为我需要存储客户端,但我不确定这是否多余。非常感谢你清理这个! – hereswilson

+0

@hereswilson:高兴地帮忙。请注意,以上只是您如何做事的一个例子。这不是你如何做的最后一句话。例如,您可以选择甚至将消息发送回发送它的客户端(简化客户端枚举)。另外请注意,我已经更改了中继部分的同步,同步整个操作,而不是只获取要发送给客户端的部分。我决定我更喜欢这样(因为代码中的评论中描述的原因)。 –

相关问题