2017-07-19 54 views
0

我必须在JBoss中构建一个JAVA NIO服务器应用程序以从10-200个传感器盒中读取数据。他们打开一个流,并始终向我发送数据。沟通是双向的。现在,有时会发生这些Box(或服务器)出现内部错误。为了检测这种问题,观察者线程每5秒检查一次数据块是否自上次检查后进入。如果我的盒子都没有发送数据,那么发生了一些不好的事情,我想重新启动整个套接字通信。JAVA NIO服务器:如何重置所有连接

现在,很好地阐述了如何建立与NIO的套接字连接,但很难找到复杂的例子来清除重置它们。这是我的问题:当我的看门狗检测到最近5秒没有数据时,它调用close()然后startEngine()。但之后,仍然没有数据到达。有些东西似乎被阻止,一些资源仍然相关或类似。如果我重新启动我的JBoss,数据再次到达。有人可以给我一个提示吗?

谢谢你的时间! 斯特凡

public class TestServer 
{ 
    private NIOServer server; 
    private HashMap<String, SocketChannel> clientsList = new HashMap<String, SocketChannel>(); 

    class NIOServer extends Thread 
    { 
     class MessageBuffer 
     { 
       int [] msgAsByte = new int[msgSize]; 
       int pos = 0; 
       int lastSign = 0;          
       int bytesRead = 0; 
     } 
     private ByteBuffer readBuffer = ByteBuffer.allocate(256); 
     private Selector selector; 
     private boolean stop = false; 
     private int[] ports; 
     private int msgSize = 48; 
     private HashMap<String,MessageBuffer> buffer = new HashMap<String, MessageBuffer>(); 

     private List<ServerSocketChannel> channels; 
     // Maps a SocketChannel to a list of ByteBuffer instances 
     private Map<SocketChannel, List<ByteBuffer>> pendingDataToWrite = new HashMap<SocketChannel, List<ByteBuffer>>(); 

     public NIOServer(int[] ports) { 
       this.ports = ports; 
     } 

     private void stopAll() 
     { 
       stop = true; 

       try 
       { 
        server.interrupt(); 
        server.join(3000); 
       } 
       catch (InterruptedException e) { 
        Thread.currentThread().interrupt(); 
       } 
       closeConnections(); 
     } 

     public void sendData(SocketChannel socket, byte[] data) 
     { 
       // And queue the data we want written 
       synchronized (this.pendingDataToWrite) { 
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingDataToWrite.get(socket); 
        if (queue == null) { 
          queue = new ArrayList<ByteBuffer>(); 
          this.pendingDataToWrite.put(socket, queue); 
        } 
        queue.add(ByteBuffer.wrap(data)); 
       } 

       SelectionKey key = socket.keyFor(this.selector); 
       if(key != null) 
        key.interestOps(SelectionKey.OP_WRITE); 
       // Finally, wake up our selecting thread so it can make the required changes 
       this.selector.wakeup(); 
     } 

     public void run() 
     { 
       try 
       { 
        stop = false; 
        selector = Selector.open(); 
        channels = new ArrayList<ServerSocketChannel>(); 
        ServerSocketChannel serverchannel; 
        for (int port : ports) 
        { 
          try 
          { 
           serverchannel = ServerSocketChannel.open(); 
           serverchannel.configureBlocking(false); 
           try 
           { 
             serverchannel.socket().setReuseAddress(true); 
           } 
           catch(SocketException se) 
           { 
             // 
           } 
           serverchannel.socket().bind(new InetSocketAddress(port)); 
           serverchannel.register(selector, SelectionKey.OP_ACCEPT); 
           channels.add(serverchannel); 
          } 
          catch(Exception e) 
          { 
           // 
          } 
        } 
        while (!stop) 
        { 

          SelectionKey key = null; 
          try 
          { 
           selector.select(); 
           Iterator<SelectionKey> keysIterator = selector.selectedKeys() 
              .iterator(); 
           while (keysIterator.hasNext()) 
           { 
             key = keysIterator.next(); 

             if(key.isValid()) 
             { 
              if (key.isAcceptable()) 
              { 
                accept(key); 
              } 
              else if (key.isReadable()) 
              { 
                readData(key); 
              } 
              else if (key.isWritable()) 
              { 
                writeData(key); 
              } 
             } 
             else 
             { 
              SocketChannel sc = (SocketChannel) key.channel(); 
             } 
             keysIterator.remove(); 
           } 
          } 
          catch (Exception e) 
          { 
           if(e instanceof IOException || e instanceof ClosedSelectorException) 
           { 
             try 
             { 
              ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 
              channels.remove(ssc); 
              ssc.close(); 
              key.cancel(); 
             } 
             catch(Exception ex) 
             { 
              // 
             } 

           } 
           else 
           { 
             // 
           } 
          } 
        } 
       } 
       catch(Exception e1) 
       { 
        // 
       } 

       closeConnections(); 

     } 

     private void closeConnections() 
     { 
       //if thread is stopped, close all 
       try 
       { 
        try 
        { 
          if(this.selector == null || this.selector.keys() == null) 
          { 
           log.debug("No selectors or keys found to close"); 
          } 
          else 
          { 
           Iterator<SelectionKey> keys = this.selector.keys().iterator(); 
           while(keys.hasNext()) 
           { 
             SelectionKey key = keys.next(); 
             key.cancel(); 
           } 
          } 
        } 
        catch(Exception ex) { 
          // 
        } 
        if(selector != null) 
          selector.close(); 
        if(channels != null) 
        { 
          for(ServerSocketChannel channel:channels) 
          { 
           channel.socket().close(); 
           channel.close(); 
          } 
        } 

        if(clientsList != null) 
        { 
          Iterator<Map.Entry<String, SocketChannel>> hfm = clientsList.entrySet().iterator(); 
          while(hfm.hasNext()) 
          { 
           Map.Entry<String, SocketChannel> s = hfm.next(); 
           s.getValue().close(); 
          } 
        } 
        clientsList=null; 

        selector = null; 
        channels = null; 
        pendingDataToWrite = null; 
       } 
       catch(Exception e) 
       { 
        // 
       } 

     } 

     private void accept(SelectionKey key) throws IOException 
     { 

       ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 
       SocketChannel sc = ssc.accept(); 
       sc.configureBlocking(false); 
       sc.register(selector, SelectionKey.OP_READ); 

       String ip = sc.socket().getRemoteSocketAddress().toString(); 
       if(!buffer.containsKey(ip)) 
        buffer.put(ip, new MessageBuffer()); 
     } 

     private void readData(SelectionKey key) throws Exception 
     { 

       SocketChannel sc = (SocketChannel) key.channel();  

       MessageBuffer buf = buffer.get(sc.socket().getRemoteSocketAddress().toString()); 
       try 
       { 
        buf.bytesRead = sc.read(readBuffer); //read into buffer. 
       } 
       catch(Exception e2) 
       { 
        sc.close(); 
        buffer.remove(sc); 
       } 

       //close connection 
       if (buf.bytesRead == -1) 
       { 
        sc.close(); 
        key.cancel(); 
        return; 
       } 

       readBuffer.flip();  //make buffer ready for read 

       while(readBuffer.hasRemaining()) 
       { 
        //Read the data and forward it to another Process... 
       } 

       readBuffer.compact(); //make buffer ready for writing 

     } 

     private void writeData(SelectionKey key) throws Exception 
     { 
       SocketChannel socketChannel = (SocketChannel) key.channel(); 
       synchronized (this.pendingDataToWrite) { 
        List queue = (List) this.pendingDataToWrite.get(socketChannel); 

        // Write until there's not more data ... 
        while (!queue.isEmpty()) { 
          ByteBuffer buf = (ByteBuffer) queue.get(0); 
          try 
          { 
           socketChannel.write(buf); 
          } 
          catch(Exception e) 
          { 
           // 
          } 
          finally 
          { 
           queue.remove(0); 
          } 
          if (buf.remaining() > 0) { 
           // ... or the socket's buffer fills up 
           break; 
          } 
        } 

        key.interestOps(SelectionKey.OP_READ); 
       } 
     } 
    } 



    public void close() { 

     if (server != null && server.isAlive()) 
     {  
        server.stopAll(); 
     } 
     if(clientsList != null) 
     { 
       clientsList.clear(); 
     } 
     server = null; 

    } 

    public void startEngine(int[] ports) { 
     if (ports != null) { 
       for (int port : ports) 
        log.info("Listening on port " + port); 
       server= new NIOServer(ports); 
       server.start(); 
     } 
    } 

} 

回答

1

使用select()超时。

如果超时发生,关闭所有注册的SocketChannels

如果您希望获得更精细的信息,请记录每个通道上的最后I/O时间,并关闭每个select()循环底部已过期的那些时间。

注意您的OP_WRITE技术不正确。这里有很多答案显示如何正确使用它。

+0

感谢您的意见,我想你的意思是你的意见,像这样的线程https://stackoverflow.com/questions/17556901/java-high-load-nio-tcp-server?这意味着我应该在写入时写入,并且只有当此操作返回0时,才会注册OP_WRITE。正确? select + timeout帮助我识别没有新数据到达时,但它没有解决我调用close()和restartEngine()后仍然没有得到新数据的问题... – user3354754

+0

您不会得到任何东西,直到客户重新连接。 – EJP

+0

好吧,当我重新启动我的JBoss,他们重新连接,所有工作正常。但是,如果我只关闭套接字,就不要这样做。有没有办法强制他们重新连接 – user3354754