2017-03-31 119 views
1

我正在编写一个作为供应商软件客户端将连接到的Windows服务运行的TcpListener服务。我无法改变客户的实施,并且必须遵守他们的规范。TcpListener每隔5秒发送一次心跳并从客户端读取消息异步

我有接收消息并返回响应工作的部分。我遇到的问题是客户希望每5秒钟发送一次心跳消息(S0000E),并使用相同的消息进行回复。我不知道如何在async/await中处理从客户端接收的真实消息的功能。

的OnStart

_serverListenerTask = Task.Run(() => AcceptClientsAsync(_listener, _cancellationToken.Token)); 

AcceptClientsAsync

static async Task AcceptClientsAsync(TcpListener listener, CancellationToken ct) 
{ 
    var clientCounter = 0; 
    while (!ct.IsCancellationRequested) 
    { 
     TcpClient client = await listener.AcceptTcpClientAsync() 
             .ConfigureAwait(false); 
     clientCounter++; 
     await ReceiveMessageAsync(client, clientCounter, ct); 
    } 
} 

ReceiveMessageAsync

static async Task ReceiveMessageAsync(TcpClient client, int clientIndex, CancellationToken ct) 
{ 
    Log.Info("New client ({0}) connected", clientIndex); 
    using (client) 
    { 
     var buffer = new byte[4096]; 
     var stream = client.GetStream(); 
     while (!ct.IsCancellationRequested) 
     { 
      var timeoutTask = Task.Delay(TimeSpan.FromSeconds(15)); 
      var amountReadTask = stream.ReadAsync(buffer, 0, buffer.Length, ct); 

      var completedTask = await Task.WhenAny(timeoutTask, amountReadTask) 
              .ConfigureAwait(false); 

      if (completedTask == timeoutTask) 
      { 
       var msg = Encoding.ASCII.GetBytes("Client timed out"); 
       await stream.WriteAsync(msg, 0, msg.Length); 
       break; 
      } 

      var bytesRead = amountReadTask.Result; 
      if (bytesRead == 0) 
      { 
       // Nothing was read 
       break; 
      } 

      // Snip... Handle message from buffer here 

      await stream.WriteAsync(responseBuffer, 0, responseBuffer.Length, ct) 
         .ConfigureAwait(false); 
     } 
    } 
    Log.Info("Client ({0}) disconnected", clientIndex); 
} 

我以为我可以心跳任务添加到Task.WhenAny,但造成的心跳始终火灾和我永远不会读取回应。我也尝试在超时之前发送心跳,并读取任务,这些任务可用于发送,但之后我正在读取心跳响应,而不是读取下一条消息,或者超时任务将完成并断开客户端连接。实质上,如果心跳交换成功,那么在15秒延迟之后客户端不应该断开连接。

+0

你如何将另一条消息与另一条消息分开 – john

+0

我读到这个权利,通常协议是客户端请求 - >服务器响应,但在这个特定的情况下,它的服务器心跳 - >客户端心跳?协议中是否有特定的规则,例如,在响应未完成时,您是否允许发送心跳消息?这里涉及的(类型)消息的明确定义和围绕它们的规则(即“协议”)将会有所帮助。 –

+0

如果服务器选择在客户端选择发送请求的同一时刻选择发送心跳,它似乎本质上是活泼的。 –

回答

1

实现TCP服务器客户端不是一项简单的任务。然而,实施以下方式,如果你提高它在资源的更高效,可以是一个实用的解决方案:

服务器:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Net; 
using System.Net.Sockets; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Server 
{ 
    public class Program 
    { 
     static List<SocketBuffer> clients = new List<SocketBuffer>(); 
     public static void Main(string[] args) 
     { 
      //Receive from any IP, listen on port 65000 in this machine 
      var listener = new TcpListener(IPAddress.Any, 65000); 
      var t = Task.Run(() => 
      { 
       while (true) 
       { 
        listener.Start(); 
        var task = listener.AcceptTcpClientAsync(); 
        task.Wait(); 
        clients.Add(new SocketBuffer(task.Result, new byte[4096])); 
       } 
      }); 
      t.Wait(); //It will remain here, do in a better way if you like ! 
     } 
    } 

    /// <summary> 
    /// We need this class because each TcpClient will have its own buffer 
    /// </summary> 
    class SocketBuffer 
    { 
     public SocketBuffer(TcpClient client, byte[] buffer) 
     { 
      this.client = client; 
      stream = client.GetStream(); 
      this.buffer = buffer; 

      receiveData(null); 
     } 

     private TcpClient client; 
     private NetworkStream stream; 
     private byte[] buffer; 

     private object _lock = new object(); 
     private async void receiveData(Task<int> result) 
     { 
      if (result != null) 
      { 
       lock (_lock) 
       { 
        int numberOfBytesRead = result.Result; 
        //If no data read, it means we are here to be notified that the tcp client has been disconnected 
        if (numberOfBytesRead == 0) 
        { 
         onDisconnected(); 
         return; 
        } 
        //We need a part of this array, you can do it in more efficient way if you like 
        var segmentedArr = new ArraySegment<byte>(buffer, 0, numberOfBytesRead).ToArray(); 
        OnDataReceived(segmentedArr); 
       } 

      } 
      var task = stream.ReadAsync(buffer, 0, buffer.Length); 
      //This is not recursion in any sense because the current 
      //thread will be free and the call to receiveData will be from a new thread 
      await task.ContinueWith(receiveData);  
     } 

     private void onDisconnected() 
     { 
      //Add your code here if you want this event 
     } 

     private void OnDataReceived(byte[] dat) 
     { 
      //Do anything with the data, you can reply here. I will just pring the received data from the demo client 
      string receivedTxt = Encoding.ASCII.GetString(dat); 
      Console.WriteLine(receivedTxt); 
     } 
    } 
} 

演示客户端:

using System; 
using System.Net.Sockets; 
using System.Text; 
using System.Threading; 

namespace Client 
{ 
    public class Program 
    { 
     public static void Main(string[] args) 
     { 
      TcpClient client = new TcpClient(); 
      var task = client.ConnectAsync("localhost", 65000); 
      task.Wait(); 
      if(client.Connected) 
      { 
       Console.WriteLine("Client connected"); 
       var stream = client.GetStream(); 
       var data = Encoding.ASCII.GetBytes("test"); 
       stream.Write(data, 0, data.Length); 
      } 
      else 
      { 
       Console.WriteLine("Client NOT connected"); 
      } 
      Thread.Sleep(60000); 
     } 
    } 
} 
相关问题