2011-10-12 179 views
3

我想在Java中使用nio实现一个TCP服务器。 它只是使用选择器的select方法来获得就绪键。然后处理这些密钥,如果它们是可接受的,可读的等等。服务器工作得很好,直到即时通讯使用单线程。但是当我尝试使用更多的线程来处理密钥时,服务器的响应会变慢并最终停止响应,例如在4-5个请求之后。 这是所有什么即时做:(伪)多线程与非阻塞套接字

Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); 
while (keyIterator.hasNext()) { 
       SelectionKey readyKey = keyIterator.next(); 
       if (readyKey.isAcceptable()) { 
        //A new connection attempt, registering socket channel with selector 

       } else { 
        Worker.add(readyKey); 
       } 

工人是执行从信道输入/输出的线程类。 这是我的Worker类的代码:

private static List<SelectionKey> keyPool = Collections.synchronizedList(new LinkedList()); 

public static void add(SelectionKey key) { 
    synchronized (keyPool) { 
     keyPool.add(key); 
     keyPool.notifyAll(); 
    } 
} 


public void run() { 
    while (true) { 

     SelectionKey myKey = null; 
     synchronized (keyPool) { 
      try { 
       while (keyPool.isEmpty()) { 
        keyPool.wait(); 
       } 
      } catch (InterruptedException ex) {      
      } 
      myKey = keyPool.remove(0); 
      keyPool.notifyAll(); 
     } 

     if (myKey != null && myKey.isValid()) { 

      if (myKey.isReadable()) { 
       //Performing reading 
      } else if (myKey.isWritable()) { 
       //performing writing 
       myKey.cancel(); 
      } 
     } 
    } 

我的基本思路是增加的关键keyPool从不同的线程可以一键搞定,一次一个。 我的BaseServer类本身作为一个线程运行。它在事件循环开始之前创建10个工作线程。我也尝试增加BaseServer线程的优先级,以便它有更多机会接受可接受的密钥。尽管如此,在大约8次请求后它仍然停止响应。请帮助,如果我错了。提前致谢。 :)

+0

在这里寻找一些有关生产者/消费者问题和一些可能有助于解决您的问题的良好数据结构的想法 - http://stackoverflow.com/questions/1212386/concurrent-and-blocking-queue-in-java。 – Perception

回答

1

首先,由于java.util.concurrent中存在良好的标准Java(1.5+)包装类,如BlockingQueue,因此不应再真正使用wait()和notify()调用。

其次,建议在选择线程本身执行IO操作,而不是在工作线程中执行IO操作。工作线程应该将读/写入队列并写入选择器线程。

本页面说明它非常好,甚至还提供了一个简单的TCP/IP服务器的工作代码示例:http://rox-xmlrpc.sourceforge.net/niotut/

对不起,我还没有时间看你的具体的例子。

2

第三,您并未从选定密钥集中删除任何内容。每次在循环周围都必须这样做,例如调用next()后调用keyIterator.remove()。

您需要阅读NIO教程。

1

尝试使用xsocket库。它为我节省了很多时间阅读论坛。

下载:http://xsocket.org/

教程:http://xsocket.sourceforge.net/core/tutorial/V2/TutorialCore.htm

服务器代码:

import org.xsocket.connection.*; 

/** 
* 
* @author wsserver 
*/ 
public class XServer { 

    protected static IServer server; 

    public static void main(String[] args) { 
     try { 
      server = new Server(9905, new XServerHandler()); 
      server.start(); 
     } catch (Exception ex) { 
      System.out.println(ex.getMessage()); 
     } 
    } 
    protected static void shutdownServer(){ 
     try{ 
      server.close(); 
     }catch(Exception ex){ 
      System.out.println(ex.getMessage()); 
     }   
    } 
} 

服务器处理:

import java.io.IOException; 
import java.nio.BufferUnderflowException; 
import java.nio.ByteBuffer; 
import java.nio.channels.ClosedChannelException; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
import java.nio.charset.CharsetEncoder; 
import java.util.*; 
import org.xsocket.*; 
import org.xsocket.connection.*; 

public class XServerHandler implements IConnectHandler, IDisconnectHandler, IDataHandler { 

    private Set<ConnectedClients> sessions = Collections.synchronizedSet(new HashSet<ConnectedClients>()); 

    Charset charset = Charset.forName("ISO-8859-1"); 
    CharsetEncoder encoder = charset.newEncoder(); 
    CharsetDecoder decoder = charset.newDecoder(); 
    ByteBuffer buffer = ByteBuffer.allocate(1024); 

    @Override 
    public boolean onConnect(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, MaxReadSizeExceededException { 
     try { 
      synchronized (sessions) { 
       sessions.add(new ConnectedClients(inbc, inbc.getRemoteAddress())); 
      } 
      System.out.println("onConnect"+" IP:"+inbc.getRemoteAddress().getHostAddress()+" Port:"+inbc.getRemotePort()); 
     } catch (Exception ex) { 
      System.out.println("onConnect: " + ex.getMessage()); 
     } 
     return true; 
    } 

    @Override 
    public boolean onDisconnect(INonBlockingConnection inbc) throws IOException { 
     try { 
      synchronized (sessions) { 
       sessions.remove(inbc); 
      } 
      System.out.println("onDisconnect"); 
     } catch (Exception ex) { 
      System.out.println("onDisconnect: " + ex.getMessage()); 
     } 
     return true; 
    } 

    @Override 
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException { 
     inbc.read(buffer); 
     buffer.flip(); 
     String request = decoder.decode(buffer).toString(); 
     System.out.println("request:"+request); 
     buffer.clear(); 
     return true; 
    } 
} 

连接的客户端:

import java.net.InetAddress; 
import org.xsocket.connection.INonBlockingConnection; 

/** 
* 
* @author wsserver 
*/ 
public class ConnectedClients { 

    private INonBlockingConnection inbc; 
    private InetAddress address; 

    //CONSTRUCTOR 
    public ConnectedClients(INonBlockingConnection inbc, InetAddress address) { 
     this.inbc = inbc; 
     this.address = address; 
    } 

    //GETERS AND SETTERS 
    public INonBlockingConnection getInbc() { 
     return inbc; 
    } 

    public void setInbc(INonBlockingConnection inbc) { 
     this.inbc = inbc; 
    } 

    public InetAddress getAddress() { 
     return address; 
    } 

    public void setAddress(InetAddress address) { 
     this.address = address; 
    } 
} 

客户端代码:

import java.net.InetAddress; 
import org.xsocket.connection.INonBlockingConnection; 
import org.xsocket.connection.NonBlockingConnection; 

/** 
* 
* @author wsserver 
*/ 
public class XClient { 

    protected static INonBlockingConnection inbc; 
    public static void main(String[] args) { 
     try { 
      inbc = new NonBlockingConnection(InetAddress.getByName("localhost"), 9905, new XClientHandler()); 

      while(true){ 

      } 
     } catch (Exception ex) { 
      System.out.println(ex.getMessage()); 
     } 
    } 
} 

客户端处理程序:

import java.io.IOException; 
import java.nio.BufferUnderflowException; 
import java.nio.ByteBuffer; 
import java.nio.channels.ClosedChannelException; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
import java.nio.charset.CharsetEncoder; 
import org.xsocket.MaxReadSizeExceededException; 
import org.xsocket.connection.IConnectExceptionHandler; 
import org.xsocket.connection.IConnectHandler; 
import org.xsocket.connection.IDataHandler; 
import org.xsocket.connection.IDisconnectHandler; 
import org.xsocket.connection.INonBlockingConnection; 

/** 
* 
* @author wsserver 
*/ 
public class XClientHandler implements IConnectHandler, IDataHandler,IDisconnectHandler, IConnectExceptionHandler { 

    Charset charset = Charset.forName("ISO-8859-1"); 
    CharsetEncoder encoder = charset.newEncoder(); 
    CharsetDecoder decoder = charset.newDecoder(); 
    ByteBuffer buffer = ByteBuffer.allocate(1024); 

    @Override 
    public boolean onConnect(INonBlockingConnection nbc) throws IOException { 
     System.out.println("Connected to server"); 
     nbc.write("hello server\r\n"); 
     return true; 
    } 

    @Override 
    public boolean onConnectException(INonBlockingConnection nbc, IOException ioe) throws IOException { 

     System.out.println("On connect exception:"+ioe.getMessage()); 
     return true; 
    } 

    @Override 
    public boolean onDisconnect(INonBlockingConnection nbc) throws IOException { 

     System.out.println("Dissconected from server"); 
     return true; 
    } 

    @Override 
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException { 

     inbc.read(buffer); 
     buffer.flip(); 
     String request = decoder.decode(buffer).toString(); 
     System.out.println(request); 
     buffer.clear(); 
     return true; 
    } 
} 

我花了很多时间阅读关于这个论坛,我希望我能帮助ü我的代码。