2012-02-07 69 views
1

我正在尝试使用管道输入流写入数据。但是从线程转储看起来好像管道输入流上存在锁定。管道输入流被锁定

PipedOutputStream pos = new PipedOutputStream(); 
PipedInputStream pis = new PipedInputStream(pos); 
FileInputStream fis = null; 
GZIPOutputStream gos = null; 
byte[] buffer = new byte[1024]; 
try { 
    fis = new FileInputStream(file); 
    gos = new GZIPOutputStream(pos); 
    int length; 
    while ((length = fis.read(buffer, 0, 1024)) != -1) 
     gos.write(buffer, 0, length); 
    } catch(Exception e){ 
     print("Could not read the file"); 
    } 
    finally { 
     try { 
      fis.close(); 
      gos.close(); 
     }catch (Exception ie){ 
      printException(ie); 
     } 
    } 
writeObject(pis); 
pos.close(); 

writeobj方法将简单地从流中读取,但read方法被锁定。 线程转储指示一些等待管道输入流。

main" prio=10 tid=0x08066000 nid=0x48d2 in Object.wait() [0xb7fd2000..0xb7fd31e8] 
    java.lang.Thread.State: TIMED_WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <0xa5c28be8> (a java.io.PipedInputStream) 
    at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:257) 
    at java.io.PipedInputStream.receive(PipedInputStream.java:215) 
    - locked <0xa5c28be8> (a java.io.PipedInputStream) 
    at java.io.PipedOutputStream.write(PipedOutputStream.java:132) 
    at java.util.zip.GZIPOutputStream.finish(GZIPOutputStream.java:95) 
    at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:146) 

    Locked ownable synchronizers: 
    - None 

我不确定是谁锁定了它。阅读文档以确定锁定呼叫。但无法弄清楚什么是错误的以及如何克服它。

回答

4

使用PipedInputStream和PipedOutputStream必须位于不同的线程中。

仔细阅读的Javadoc: http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html

典型地,数据被从一个对象的PipedInputStream由一个线程读取,并且数据被从其他线程写入相应的PipedOutputStream。不建议尝试从单个线程使用这两个对象,因为它可能使线程死锁。

0

我需要一个过滤器拦截,我需要尽快关闭数据库连接,所以我最初使用Java的管道,但是当他们实现更紧密看着连接速度慢,这是所有同步,所以我结束了使用创造我自己的QueueInputStream小缓冲区和阻塞队列将缓冲区放入队列中,一旦被填满,除非在LinkedBlockingQueue中使用的锁定条件,在小缓冲区的帮助下它应该便宜,否则它是无锁的,该类仅用于使用单个生产者和消费者的每个实例:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void write(final byte[] b, final int off, final int len) throws IOException 
    { 
    super.write(b,off,len); 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    } 
3

的PipedInputStream有一个小的非膨胀缓冲。一旦缓冲区满了,写入PipedOutputStream块,直到被另一个线程读取缓冲输入。您不能在同一个线程中使用这两个线程,因为写入操作将等待不会发生的读取。

在你的情况,你是不是读的任何数据,直到你已经写了这一切,因此该解决方案是使用ByteArrayOutputStreamByteArrayInputStream代替:

  1. 所有数据都写入到一个ByteArrayOutputStream。
  2. 完成后,调用流上的ByteArray()以检索字节数据。
  3. (可选)使用字节数据创建一个ByteArrayInputStream,将其作为InputStream读取。