2017-04-08 125 views
2

我的Java程序实现了一个服务器,该服务器应该从客户端通过websockets获取一个使用gzip压缩的非常大的文件,并检查文件内容中的某些字节模式。Java按顺序解压缩GZIP流

客户端发送嵌入专有协议内的文件块,以便在客户端收到消息后解析消息并提取gzip文件内容。

我无法在程序存储器中保存整个文件,所以我试图解压每个块,处理数据并继续到下一个块。

我用下面的代码:

public static String gzipDecompress(byte[] compressed) throws IOException { 
    String uncompressed; 
    try (
     ByteArrayInputStream bis = new ByteArrayInputStream(compressed); 
     GZIPInputStream gis = new GZIPInputStream(bis); 
     Reader reader = new InputStreamReader(gis); 
     Writer writer = new StringWriter() 
    ) { 

     char[] buffer = new char[10240]; 
     for (int length = 0; (length = reader.read(buffer)) > 0;) { 
     writer.write(buffer, 0, length); 
     } 
     uncompressed = writer.toString(); 
    } 

    return uncompressed; 
    } 

但调用与第一压缩块的功能时,我发现了以下异常:

java.io.EOFException: Unexpected end of ZLIB input stream 
    at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:240) 
    at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158) 
    at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117) 
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) 
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) 
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
    at java.io.InputStreamReader.read(InputStreamReader.java:184) 
    at java.io.Reader.read(Reader.java:140) 

重要的是要提的是我很重要不会跳过任何块并尝试按顺序解压块。

我错过了什么?

+1

目前尚不清楚这些数据是从哪里开始的。您应该创建一个读取* all *数据的流,并将其包装在GZipInputStream中。它不需要在内存中拥有所有数据*,但它应该是单个流。 –

回答

2

问题是你用这些块手动玩。

正确的方法是获得一些InputStream,用GZIPInputStream包装它,然后读取数据。

InputStream is = // obtain the original gzip stream 

    GZIPInputStream gis = new GZIPInputStream(is); 
    Reader reader = new InputStreamReader(gis); 

    //... proceed reading and so on 

GZIPInputStream作品流的方式,所以如果你一次只能从reader问10KB,整体内存占用会很低,无论最初的GZIP文件的大小。这个问题是更新

更新后,您的情况可能的解决方案是写一个InputStream实现,流正在由客户端协议处理程序向它提出的块字节。

这里是一个原型:

public class ProtocolDataInputStream extends InputStream { 
    private BlockingQueue<byte[]> nextChunks = new ArrayBlockingQueue<byte[]>(100); 
    private byte[] currentChunk = null; 
    private int currentChunkOffset = 0; 
    private boolean noMoreChunks = false; 

    @Override 
    public synchronized int read() throws IOException { 
     boolean takeNextChunk = currentChunk == null || currentChunkOffset >= currentChunk.length; 
     if (takeNextChunk) { 
      if (noMoreChunks) { 
       // stream is exhausted 
       return -1; 
      } else { 
       currentChunk = nextChunks.take(); 
       currentChunkOffset = 0; 
      } 
     } 
     return currentChunk[currentChunkOffset++]; 
    } 

    @Override 
    public synchronized int available() throws IOException { 
     if (currentChunk == null) { 
      return 0; 
     } else { 
      return currentChunk.length - currentChunkOffset; 
     } 
    } 

    public synchronized void addChunk(byte[] chunk, boolean chunkIsLast) { 
     nextChunks.add(chunk); 
     if (chunkIsLast) { 
      noMoreChunks = true; 
     } 
    } 
} 

您的客户端协议处理程序使用addChunk(),而你的解压码翻出此流的数据增加了字节块(通过Reader)。

请注意,此代码有一些问题:

  1. 正在使用的队列的大小有限。如果太频繁地呼叫addChunk(),则可能会填充队列,这将阻止addChunk()。这可能是合意的或不合适的。
  2. 只有read()方法用于说明目的。为了性能,最好以相同的方式实施read(byte[])
  3. 在读者(解压缩程序)和写入程序(协议处理程序调用addChunk())是不同线程的假设下使用保守同步。
  4. InterruptedException未在take()上处理,以避免太多细节。

如果你的解压缩和addChunk()在同一个线程(在同一回路)执行,那么你可以尝试使用InputStream.available()方法使用InputStreamReader.ready()拉着一个Reader拉动时,时。

+1

无法使用ByteArrayInputStream或其他InputStream将字节数组作为InputStream传递给GZIPInputStream?在我的情况下,我无法真正使用从服务器获取数据的原始InputSteam。 – Eldad

+0

为什么不能使用原始的'InputStream'?用我知道的字节来提供'GZIPInputStream'的唯一安全方法是首先将所有字节读入内存,这不是您想要的大文件。 –

+0

我添加了详细信息以更好地描述情况,我得到嵌入专有协议内的文件块,以便我的InputStream获取完整的协议消息,解析它,然后从中提取文件块,然后才能解压缩块,I不要控制客户端,也不知道下一个包含下一个文件块的消息何时到达。感谢和抱歉的描述不好。 – Eldad

0

来自gzipped流的任意字节序列不是有效的独立gzip数据。不管怎样,你必须连接所有的字节块。

最简单的方法是为了积累他们都用一个简单的管道:

import java.io.PipedOutputStream; 
import java.io.IOException; 
import java.util.zip.GZIPInputStream; 

public class ChunkInflater { 
    private final PipedOutputStream pipe; 

    private final InputStream stream; 

    public ChunkInflater() 
    throws IOException { 
     pipe = new PipedOutputStream(); 
     stream = new GZIPInputStream(new PipedInputStream(pipe)); 
    } 

    public InputStream getInputStream() { 
     return stream; 
    } 

    public void addChunk(byte[] compressedChunk) 
    throws IOException { 
     pipe.write(compressedChunk); 
    } 
} 

现在你有,你可以在你想要的任何单位读取的InputStream。例如:

ChunkInflater inflater = new ChunkInflater(); 

Callable<Void> chunkReader = new Callable<Void>() { 
    @Override 
    public Void call() 
    throws IOException { 
     byte[] chunk; 
     while ((chunk = readChunkFromSource()) != null) { 
      inflater.addChunk(chunk); 
     } 

     return null; 
    } 
}; 
ExecutorService executor = Executors.newSingleThreadExecutor(); 
executor.submit(chunkReader); 
executor.shutdown(); 

Reader reader = new InputStreamReader(inflater.getInputStream()); 
// read text here