2011-09-23 57 views
4

我正在尝试编写一段解除阻塞代码以从PipedInputStream中读取。它主要检查是否有任何调用阻塞前阅读阅读API:Java PipedInputStream available()方法返回值

int n = 0; 
if ((n = pipedInputStream_.available()) > 0) { 
    pipedInputStream_.read(...) 
} 

通过java API doc我不能肯定地告诉该检查应该是什么阅读,因为可能的值为零(意味着没有数据,或关闭/破碎流)或大于零。那么来电者怎么知道是否有什么可读的东西呢? “

”返回可以从此输入流中读取的无阻塞字节数,或者如果通过调用close()方法关闭了此输入流,或者管道未连接或断开,则返回0。 “

查看来源,它似乎是唯一的值是零或大于零。

public synchronized int available() throws IOException { 
    if(in < 0) 
     return 0; 
    else if(in == out) 
     return buffer.length; 
    else if (in > out) 
     return in - out; 
    else 
     return in + buffer.length - out; 
} 
+1

零并不意味着封闭或断开的流。每个规格仅允许零或大于零。 – EJP

+0

你能澄清为什么你说零不暗示关闭或破流?请看看我同意的那个解释。 Java文档还指出: “返回: 可以从此输入流读取的无阻塞字节数,如果此输入流已通过调用close()方法关闭,或者管道是未连接或断开。“ – USQ

+0

你误会了。零意味着*要么*没有数据要被读取而没有阻塞*或*流被关闭*或*管道未连接*或*管道损坏。这并不意味着只有这些条件中的一个。 – EJP

回答

2

如果available()返回零,则目前没有字节可供读取。根据您引用的文档,可能有以下几种原因:

  • 管道已关闭。
  • 管道破裂。
  • 所有先前可用的输入(如果有)已被使用。

available()威力返回值为零,意味着已经发生了错误,这意味着你将永远无法通过将来管道读取任何数据,但你不能肯定地告诉这里,因为零可能表示上面的第三个条件,其中在InputStream#read()上的阻塞可能最终产生更多数据,相应的OutputStream侧将通过管道推进。

我看不出有可能轮询PipedInputStreamavailable(),直到有更多数据可用为止,因为您永远无法区分上面的终端情况(第一个和第二个)与读者是否更加饥饿比作家。像很多流接口一样,这里也必须尝试使用​​并准备好失败。那是陷阱; InputStream#read()将会阻止,但直到您承诺阻止尝试阅读时,您才能够辨别出没有更多输入即将到来。

将您的消费行为建立在available()上并不可行。如果它返回一个正数,则有需要阅读,但当然,即使现在有什么可能不足以满足您的消费者。如果您提交一个线程以阻塞方式使用InputStream并跳过available()的轮询,则会发现您的应用程序更易于管理。让InputStream#read()成为你在这里的唯一预言。

+0

谢谢。是的,在查看PipedInputStream代码的其余部分,尤其是循环缓冲区的实现/使用之后,我看到第三个终端条件也是零的原因之一。 – USQ

0

我需要一个过滤器来拦截速度较慢的连接,因为我需要尽快关闭数据库连接,所以我最初使用Java管道,但是当它们的实现看起来更接近时,它们都是同步的,因此我最终使用一个小缓冲区创建了自己的QueueInputStream和阻塞队列将缓冲区放入队列中一旦被填满,除非在LinkedBlockingQueue中使用的锁定条件,在小缓冲区的帮助下它应该便宜,否则它是无锁的,该类仅用于单生产者和消费者每个实例:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{-1}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    } 

}