2012-02-15 61 views
2

Rrom C#,通过SOCKET读/写到JAVA并产生一些并发/套接字问题。C#通过读写SOCKET到JAVA并发生一些并发/套接字问题

我想实现一个服务器客户端应用程序,其中服务器是Java和客户端是C#。他们通过TCP/IP进行通信并在它们之间交换一些二进制数据。

特别是我有一个在Java和C#中定义的数据包类。它有一个标题,键和值。 Java和C#都以完全相同的方式写入和读取数据包到套接字。通过这种方式,我可以从C#发送请求数据包,在Java服务器上处理它并将响应作为数据包发回。

原来的问题是更复杂的方式,但我可以把它煮到这个“简单”的版本。

我已经实现了服务器和客户端,如下所述。代码也可在底部找到。

对我说出你不得不继续阅读:)

服务器(Java)的侧

在服务器端的问题,我有一个很虚的ServerSocket使用。它读取传入的数据包并发回与响应几乎相同的数据包。

客户端(C#)端 客户端有点复杂。客户端启动N(可配置)线程数(我将称它们为用户线程)。一个在线程和一个在线程。所有用户线程都使用虚拟请求数据包和唯一ID创建一个Call对象。然后将该呼叫添加到本地BlockingCollection中。

的输出线程连续读取本地BlockingCollection和服务器也连续发送所有请求数据包到服务器

将在线程读取响应数据包,并将其匹配到呼叫对象(还记得唯一的呼叫ID)。

如果在5秒的时间间隔内没有响应特定的Call对象,用户线程将通过打印到控制台来投诉它。

还有一个定时器,间隔10秒,打印每秒执行多少事务。

如果您到达目前为止,谢谢:)。

现在的问题:

下面的代码,这是一个什么样上述我在Mac上使用Mono正常工作的落实。在Windows上,它也不会立即以较低的用户线程数(< 10)失败。随着我突然增加线程数量,客户端收到的响应数据包以某种方式被破坏。在这个应用程序方面,所有用户线程都被阻塞,因为他们的请求的答案没有收到。 问题是为什么他们被损坏?正如你看到的touche socket的线程是In和Out线程。但不知何故,用户线程的数量会影响客户端并使其刹车。

它看起来像一些并发或套接字问题,但我可以找到它。

我已经把服务器(Java)和客户端(C#)的代码。他们没有任何依赖关系,只是编译和运行两个主要方法(第一台服务器)显示问题。

我很欣赏你读到目前为止。

服务器代码

import java.io.*; 
import java.net.*; 
import java.nio.ByteBuffer; 

public class DummyServer { 

public static void main(String[] args) throws IOException { 
    ServerSocket server = new ServerSocket(9900); 
    System.out.println("Server started"); 
    for(;;){ 
     final Socket socket = server.accept(); 
     System.out.println("Accepting a connection"); 
     new Thread(new Runnable(){ 
      public void run() { 
       try { 
        System.out.println("Thread started to handle the connection"); 
        DataInputStream dis = new DataInputStream(socket.getInputStream()); 
        DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); 
        for(int i=0; ; i++){ 
         Packet packet = new Packet(); 
         packet.readFrom(dis); 
         packet.key = null; 
         packet.value = new byte[1000]; 
         packet.writeTo(dos); 
        } 
       } catch (IOException e) { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 
    } 
} 
public static class Packet { 
    byte[] key; 
    byte[] value; 
    long callId = -1; 
    private int valueHash = -1; 

    public void writeTo(DataOutputStream outputStream) throws IOException { 
     final ByteBuffer writeHeaderBuffer = ByteBuffer.allocate(1 << 10); // 1k 
     writeHeaderBuffer.clear(); 
     writeHeaderBuffer.position(12); 
     writeHeaderBuffer.putLong(callId); 
     writeHeaderBuffer.putInt(valueHash); 
     int size = writeHeaderBuffer.position(); 
     int headerSize = size - 12; 
     writeHeaderBuffer.position(0); 
     writeHeaderBuffer.putInt(headerSize); 
     writeHeaderBuffer.putInt((key == null) ? 0 : key.length); 
     writeHeaderBuffer.putInt((value == null) ? 0 : value.length); 
     outputStream.write(writeHeaderBuffer.array(), 0, size); 
     if (key != null)outputStream.write(key); 
     if (value != null)outputStream.write(value); 
    } 

    public void readFrom(DataInputStream dis) throws IOException { 
     final ByteBuffer readHeaderBuffer = ByteBuffer.allocate(1 << 10); 
     final int headerSize = dis.readInt(); 
     int keySize = dis.readInt(); 
     int valueSize = dis.readInt(); 
     readHeaderBuffer.clear(); 
     readHeaderBuffer.limit(headerSize); 
     dis.readFully(readHeaderBuffer.array(), 0, headerSize); 
     this.callId = readHeaderBuffer.getLong(); 
     valueHash = readHeaderBuffer.getInt(); 
     key = new byte[keySize]; 
     dis.readFully(key); 
     value = new byte[valueSize]; 
     dis.readFully(value); 
    } 
} 

}

C#的客户端代码:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Net.Sockets; 
using System.Net; 
using System.IO; 
using System.Collections.Concurrent; 
using System.Threading; 

namespace Client 
{ 
public class Program 
{ 
    readonly ConcurrentDictionary<long, Call> calls = new ConcurrentDictionary<long, Call>(); 
    readonly BlockingCollection<Call> outThreadQueue = new BlockingCollection<Call>(1000); 
    readonly TcpClient tcpClient = new TcpClient("localhost", 9900); 
    readonly private int THREAD_COUNT; 
    static int ops; 

    public static void Main(string[] args) { 
     new Program(args.Length > 0 ? int.Parse(args[0]) : 100).Start(); 
    } 
    public Program(int threadCount) { 
     this.THREAD_COUNT = threadCount; 
     new Thread(new ThreadStart(this.InThreadRun)).Start();//start the InThread 
     new Thread(new ThreadStart(this.OutThreadRun)).Start();//start the OutThread 
    } 
    public void Start(){ 
     for (int i = 0; i < THREAD_COUNT; i++) 
      new Thread(new ThreadStart(this.Call)).Start(); 
     Console.WriteLine(THREAD_COUNT + " User Threads started to perform server call"); 
     System.Timers.Timer aTimer = new System.Timers.Timer(10000); 
     aTimer.Elapsed += new System.Timers.ElapsedEventHandler(this.Stats); 
     aTimer.Enabled = true; 
    } 
    public void Stats(object source, System.Timers.ElapsedEventArgs e){ 
     Console.WriteLine("Ops per second: " + Interlocked.Exchange(ref ops, 0)/10); 
    } 
    public void Call() { 
     for (; ;){ 
      Call call = new Call(new Packet()); 
      call.request.key = new byte[10]; 
      call.request.value = new byte[1000]; 
      outThreadQueue.Add(call); 
      Packet result = null; 
      for (int i = 1;result==null ; i++){ 
       result = call.getResult(5000); 
       if(result==null) Console.WriteLine("Call" + call.id + " didn't get answer within "+ 5000*i/1000 + " seconds"); 
      } 
      Interlocked.Increment(ref ops); 
     } 
    } 
    public void InThreadRun(){ 
     for (; ;){ 
      Packet packet = new Packet(); 
      packet.Read(tcpClient.GetStream()); 
      Call call; 
      if (calls.TryGetValue(packet.callId, out call)) 
       call.inbQ.Add(packet); 
      else 
       Console.WriteLine("Unkown call result: " + packet.callId); 
     } 
    } 
    public void OutThreadRun() { 
     for (; ;){ 
      Call call = outThreadQueue.Take(); 
      calls.TryAdd(call.id, call); 
      Packet packet = call.request; 
      if (packet != null) packet.write(tcpClient.GetStream()); 
     } 
    } 
} 
public class Call 
{ 
    readonly public long id; 
    readonly public Packet request; 
    static long callIdGen = 0; 
    readonly public BlockingCollection<Packet> inbQ = new BlockingCollection<Packet>(1); 
    public Call(Packet request) 
    { 
     this.id = incrementCallId(); 
     this.request = request; 
     this.request.callId = id; 
    } 
    public Packet getResult(int timeout) 
    { 
     Packet response = null; 
     inbQ.TryTake(out response, timeout); 
     return response; 
    } 
    private static long incrementCallId() 
    { 
     long initialValue, computedValue; 
     do 
     { 
      initialValue = callIdGen; 
      computedValue = initialValue + 1; 
     } while (initialValue != Interlocked.CompareExchange(ref callIdGen, computedValue, initialValue)); 
     return computedValue; 
    } 
} 

public class Packet 
{ 
    public byte[] key; 
    public byte[] value; 
    public long callId = 0; 
    public void write(Stream stream) 
    { 
     MemoryStream header = new MemoryStream(); 
     using (BinaryWriter writer = new BinaryWriter(header)) 
     { 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder((long)callId)); 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder((int)-1)); 
     } 
     byte[] headerInBytes = header.ToArray(); 
     MemoryStream body = new MemoryStream(); 
     using (BinaryWriter writer = new BinaryWriter(body)) 
     { 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder(headerInBytes.Length)); 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder(key == null ? 0 : key.Length)); 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder(value == null ? 0 : value.Length)); 
      writer.Write(headerInBytes); 
      if (key != null) writer.Write(key); 
      if (value != null) writer.Write(value); 
      byte[] packetInBytes = body.ToArray(); 
      stream.Write(packetInBytes, 0, packetInBytes.Length); 
     } 
    } 
    public void Read(Stream stream) 
    { 
     BinaryReader reader = new BinaryReader(stream); 
     int headerSize = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     int keySize = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     int valueSize = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     this.callId = IPAddress.NetworkToHostOrder(reader.ReadInt64()); 
     int valuePartitionHash = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     this.key = new byte[keySize]; 
     this.value = new byte[valueSize]; 
     if (keySize > 0) reader.Read(this.key, 0, keySize); 
     if (valueSize > 0) reader.Read(this.value, 0, valueSize); 
    } 
} 

}

回答

2

这是一个非常常见的错误:任何Read调用套接字可能实际上不会读取尽可能多的字节数,如果它们当前不可用。 Read将返回每个调用读取的字节数。如果您希望读取n个字节的数据,则需要多次调用读取,直到读取的字节数加起来为n。

+0

你是对的,我改变了Read()方法到我从Java DataInputStream中作弊的readFully方法,它工作。非常感谢。高度赞赏! – 2012-02-15 23:45:32