2015-02-06 56 views
0

我有关于PipedInputStreamPipedOutputStream一个问题,我不知道如果我误解了这些类的设计是否有在Java代码中PipedInputStream.javaPipedInputStream.java的设计中是否存在错误,或者我误解了它的设计?

一个错误,据我了解PipedInputStreamPipedOutputStream实现了一种可用于在两个不同线程之间创建流的机制。生产者线程在PipedOutputStream中写入内容,并且消费者线程在连接的PipedInputStream中读取它。有一个内部缓冲区允许缓冲通信。默认情况下,这个缓冲区的大小是1024字节。

如果消费者线程读取PipedInputStream并且缓冲区为空,则线程将等待。如果生产者线程写入PipedOutputStream并且缓冲区已满,则该线程也等待。

PipedInputStream维护内部缓冲区,PipedOutputStream只使用在PipedInputStream中声明的函数。

所有在PipedInputStream与内部(圆形)缓冲相关(在byte [] bufferint inint out -as你可以看到PipedInputStream.java)场均宣布protectedPipedInputStream使用2个不同的PipedInputStream.receive函数来注入数据。

所有InputStreams都有两个读取版本:read()read(byte [], int, int)。所有OutputStreams都有两个写入版本write(byte b)write(byte [], int, int)。全部都有单字节版本和多字节版本。 PipedInputStreamPipedOutputStream有这些功能。

PipedOutputStream.write(byte b)使用PipedInputStream.receive(int b)函数在连接的PipedInputStream中注入字节。这个接收函数被声明为protected,所以你可以重载这个函数并截取从PipedOutputStream到连接的PipedInputStream的任何字节注入。

PipedOutputStream.write(byte b[], int offset, int len)使用PipedInputStream.receive(byte [] b, int offset, int len)在连接的PipedInputStream中注入字节数组。

而我的问题是:PipedInputStream.receive(byte [], int, int)receive(int)的多字节副本,没有声明为受保护,因为receive(int)是,它具有默认的可见性(包可见性)。所以你不能重载这个函数,并且拦截从PipedOutputStream到相连的PipedInputStream的多字节注入。

PipedInputStream.write(byte b[], int offset, int len)不援引PipedInputStream.write(int b)。因此,使用receive(byte [],int, int)时,超载receive(int)无效。

据我所知,PipedInputStream.receive(byte[], int, int)应该是protected,因为PipedInputStream.receive(int)是。它的声明:

synchronized void receive(byte [] b, int off, int len) throws IOException {

应该是:

protected synchronized void receive(byte [] b, int off, int len) throws IOException {

PipeReaderPipeWriter(的PipedInputStreamPipedOutputStream字符版本)申报缓冲领域,并与包可见接收方法(不受保护!)。 Java中的Reader/Writer(自JDK1.1开始)比InputStream/OutputStream(从JDK1.0开始)更新。

是它的PipedInputStream设计一个真正的错误?,是protected visibity在PipedInputStream设计事故从早期的Java版本继承?或者还是我完全失去了?

在此先感谢。

PD:下面是出现此问题的示例。该程序不能编译(通过提到的接收可见性问题)。在这个例子中,我尝试创建一个PipedInputStream子类,允许在需要时自动扩展缓冲区。所以,如果缓冲区是空的,有人试图读取线程等待。但是如果缓冲区已满并有人试图写入(使用连接的PipedOutputStream),线程不会等待,但缓冲区会扩展以存储更多字节。消费者等待,但生产者没有。

我有我自己的这个例子的功能实现,但我想知道它是否不能作为PipedInputStream子类实现。

import java.io.IOException; 
import java.io.PipedInputStream; 
import java.io.PipedOutputStream; 

public class ExtensiblePipedInputStream extends PipedInputStream { 

/** 
* Default extensions' size 
*/ 
private static final int DEFAULT_EXTENSION = 1024; 

/** 
* The current extensions' size 
*/ 
protected int extension = DEFAULT_EXTENSION; 


// the same constructors than the super class (PipedInputStream)... 

public ExtensiblePipedInputStream() { 
    super(); 
} 

public ExtensiblePipedInputStream(PipedOutputStream src) throws IOException { 
    super(src); 
} 

public ExtensiblePipedInputStream(int pipeSize) { 
    super(pipeSize); 
} 

public ExtensiblePipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { 
    super(src, pipeSize); 
} 

/** 
* This function ensures the specified capacity in the internal buffer. If 
* the specified capacity is less or equals than the current internal buffer 
* capacity it does nothing. If the specified capacity is greater than the 
* current one, then the buffer is extended to: at least allocate the new 
* capacity. This function extends the buffer using multiple factors of 
* extension size. 
* 
* @param capacity The capacity 
* @throws IOException if an IO error occurs 
* @throws IllegalArgumentException if capacity is negative 
*/ 
public synchronized void ensureCapacity(int capacity) throws IOException, IllegalArgumentException { 
    if (capacity < 0) { 
     throw new IllegalArgumentException("capacity < 0"); 
    } 

    if (capacity > buffer.length) { 
     int additionalSpace = capacity - buffer.length; 
     final int modExtension = additionalSpace % extension; 
     additionalSpace += (modExtension == 0) ? 0 : extension - modExtension; 

     setCapacity(buffer.length + additionalSpace); 
    } 
} 

/** 
* Returns capacity of the internal buffer (the buffer's size). 
* 
* @return The capacity or the internal buffer 
*/ 
public synchronized int getCapacity() { 
    return buffer.length; 
} 

/** 
* Returns the size of the next buffer's extensions. 
* 
* @return The size of the next buffer's extensions. 
*/ 
public synchronized int getExtension() { 
    return extension; 
} 

/** 
* This function extends and invokes PipedInputStream.receive. It only avoid 
* writers block by extending the internal buffer when needed. 
* 
* @param b The byte to be received 
* @throws IOException if an IO error occurs 
*/ 
@Override 
protected synchronized void receive(int b) throws IOException { 
    ensureCapacity(available() + 1); 
    super.receive(b); 
} 

/** 
* MY PROBLEM!!!! 
* 
* this function is not posible! 
* 
* PipedInputStream.receive(byte[], int, int) 
* has not protected visibility, it has package visibility!!!!! 
* 
* Why? 
* 
* @param b The array of bytes to be received 
* @param off The offset in the array of bytes. 
* @param len The number of bytes to be received. 
* @throws IOException If an IO error occurs 
*/ 
@Override 
protected synchronized void receive(byte b[], int off, int len) throws IOException { 
    ensureCapacity(available() + len); 
    super.receive(b, off, len); 
} 

/** 
* Changes the size of the internal buffer. The new size must be greater or 
* equals than the number of bytes stored in the internal buffer 
* (available()) 
* 
* @param capacity The new size of the internal buffer. 
* @throws IOException If an IO error occurs. 
* @throws IllegalArgumentException If capacity < available() 
*/ 
public synchronized void setCapacity(int capacity) throws IOException, IllegalArgumentException { 
    final int available = available(); 

    if (capacity < available) { 
     throw new IllegalArgumentException("capacity < available"); 
    } 

    final byte[] nbuf = new byte[capacity]; 
    if (available > 0) { 
     final int firstTransferAmount = Math.min(available, buffer.length - out); 
     System.arraycopy(buffer, out, nbuf, 0, firstTransferAmount); 
     if (in > 0) { 
      System.arraycopy(buffer, 0, nbuf, firstTransferAmount, in); 
     } 
     out = 0; 
     in = (available == capacity) ? 0 : available; 
    } 

    buffer = nbuf; 
} 

/** 
* Set the size of future extensions. It must be a value greater than 0. 
* 
* @param extension The size of future extensions. 
* @throws IllegalArgumentException If extension <= 0 
*/ 
public synchronized void setExtension(int extension) throws IllegalArgumentException { 
    if (extension <= 0) { 
     throw new IllegalArgumentException("extension <= 0"); 
    } 

    this.extension = extension; 
} 

} 
+0

你为什么试图扩展管道流?为什么不把它做得足够大以至于你永远不需要种植它呢? – 2015-02-06 11:25:15

+0

假设你花了2个小时,最低工资约为20美元。如果你决定在一个并不真正需要它的缓冲区中“浪费”128KB。这将浪费0.2美分的内存。请告诉我你正在计划使用上百万个这样的代码;) – 2015-02-06 11:30:35

+1

这些类不是为了扩展而设计的,在Java的早期阶段,设计者并没有对这些问题敏感,所以一个不需要的'protected'访问级别可能会被。您应该避免尝试扩展任何JDK类,除了那些明确记录其子类合同的类。 – 2015-02-06 11:37:11

回答

2

我想你已经误解了管道的目的。管道为阻塞两个不同线程之间的通信。预计作者的速度将受限于读者的速度,这意味着管道在内存使用方面效率很高,但将处理速度限制在最慢组件的速度。

如果你想异步写入,你应该看看使用队列 - java.util.concurrent包中的一个版本应该适合。

+0

PipedInputStream在读取/写入操作的解耦中提到了限制。但它没有提到限制应该保持不变。 PipedInputStream.java代码表示允许在“int protected int final int PIPE_SIZE”声明中更改管道大小,但有一条注释表明:在允许更改管道大小之前,这是一个常量。该字段将继续保持向后兼容。 – acesargl 2015-02-06 12:21:54

0

不支持特定用例的API设计(尤其是涉及子类化的用例)可以被描述为“受限制”或“受限制”......但将其称为一个错误是一种延伸。

如果我们接受可疑的命题您试图实施的是值得的,那么我认为您的分析是正确的。使用可扩展缓冲区创建管道输入/输出流的版本比修改器不同的情况更有效。

但是......这并非不可能。您可以通过更大规模的覆盖方法(例如readwrite方法)或通过简单地从头开始来实现所需的功能。


另一件需要注意的是,这(StackOverflow)不是报告Java“bug”或提出增强请求的正确位置。如果您真的对此感到强烈,请尝试开发并向OpenJDK团队提交您提出的更改的修补程序。但请记住,任何破坏兼容性的内容(包括与PipedInputStreamPipedOutputStream的现有客户子类的兼容性)可能会被拒绝。这可能可以解释为什么他们没有“修复”这已经!


1 - 正如Barry指出的那样,管道流阻塞了设计。非阻塞版本有潜在危险。

+0

感谢Stephen的回应,但PipedInputStream的子类怎样才能拦截由PipedOutputStream完成的receive(byte [],int,int)调用?您必须注意写操作不在PipedInputStream中,因此它不能由PipedInputStream的子类重载。如果我需要创建一个PipedOutputStream的子类X来处理PipedInputStream的子类Y,那么Y不是PipedInputStream(因为它不能被任何PipedOutputStream使用)。据我所知,设计中的问题是无法将其作为PipedInputStream子类来执行。 – acesargl 2015-02-06 19:02:47

+0

我认为可以创建一个PipedInputStream的子类,它使用Java平台调试器体系结构[JPDA]拦截其隐藏/禁止接收方法的调用(新子类将在JVM TI中声明为调试器,并且它可以拦截接收调用;类将自行调试!)。但我认为这不是一种“合理的方式”(这也是一件非常非常丑陋的事情)。 – acesargl 2015-02-06 19:25:02

+0

*“PipedInputStream的子类如何拦截...”*。它不能。 JPDA也不起作用 - 正在运行的应用程序不能对自己使用JPDA(AFAIK)。做一个不同的方式。覆盖'read'和'write'等方法。 – 2015-02-07 01:11:51