2016-05-31 74 views
6

我有一个应用程序,它使用BufferedReaderPrintStream包装java.net.Socket对象的InputStreamOutputStream同步读取和写入文本行。所以,我可以使用方法BufferedReader.readLine()PrintStream.println(),并让Java库将输入拆分成行并为我输出格式。Java:用于读取和写入行的异步I/O通道

现在我想用异步IO替换这个同步IO。所以我一直在研究AsynchronousSocketChannel,它允许异步读取和写入字节。现在,我想要包装类,以便可以使用字符串异步读/写行。

我在Java库中找不到这样的包装类。在我编写自己的实现之前,我想问问是否有其他库允许包装AsynchronousSocketChannel并提供异步文本IO。

+0

为什么?你想解决什么问题?你可以用'BufferedReader'每秒读取数百万行。这还不够吗? – EJP

+0

@EJP:我想读取异步。我不想阻止等待通过套接字接收的一行文本。我希望在接收完整行文本时调用我的代码。 –

+1

@ giorgio-b如果你没有从套接字中读取数据,那么接收完整行会发生什么? –

回答

0

你可以做这样的事情

public void nioAsyncParse(AsynchronousSocketChannel channel, final int bufferSize) throws IOException, ParseException, InterruptedException { 
    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); 
    BufferConsumer consumer = new BufferConsumer(byteBuffer, bufferSize); 
    channel.read(consumer.buffer(), 0l, channel, consumer); 
} 


class BufferConsumer implements CompletionHandler<Integer, AsynchronousSocketChannel> { 

     private ByteBuffer bytes; 
     private StringBuffer chars; 
     private int limit; 
     private long position; 
     private DateFormat frmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 

     public BufferConsumer(ByteBuffer byteBuffer, int bufferSize) { 
      bytes = byteBuffer; 
      chars = new StringBuffer(bufferSize); 
      frmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
      limit = bufferSize; 
      position = 0l; 
     } 

     public ByteBuffer buffer() { 
      return bytes; 
     } 

     @Override 
     public synchronized void completed(Integer result, AsynchronousSocketChannel channel) { 

      if (result!=-1) { 
       bytes.flip(); 
       final int len = bytes.limit(); 
       int i = 0; 
       try { 
        for (i = 0; i < len; i++) { 
         byte by = bytes.get(); 
         if (by=='\n') { 
          // *** 
          // The code used to process the line goes here 
          // *** 
          chars.setLength(0); 
         } 
         else { 
          chars.append((char) by); 
         } 
        } 
       } 
       catch (Exception x) { 
        System.out.println("Caught exception " + x.getClass().getName() + " " + x.getMessage() + " i=" + String.valueOf(i) + ", limit=" + String.valueOf(len) + ", position="+String.valueOf(position)); 
       } 

       if (len==limit) { 
        bytes.clear(); 
        position += len; 
        channel.read(bytes, position, channel, this); 
       } 
       else { 
        try { 
         channel.close(); 
        } 
        catch (IOException e) { } 
        bytes.clear(); 
        buffers.add(bytes); 
       } 
      } 
      else { 
       try { 
        channel.close(); 
       } 
       catch (IOException e) { } 
       bytes.clear(); 
       buffers.add(bytes); 
      } 
     } 

     @Override 
     public void failed(Throwable e, AsynchronousSocketChannel channel) { 
     } 
};