2015-11-04 95 views
0

这是我的情景:如何将写入流1的内容流式传输到流2?

producer.WriteStream(stream); 
consumer.ReadStream(stream); 

我想要的东西,允许由producer生成的字节将逐步转移到consumer

我可以写一切到MemoryStream,然后倒带它并在consumer上读取它,但这会导致巨大的内存消耗。

我该如何做到这一点?

+0

使用2实例(https://msdn.microsoft.com/en-us/library/system.io.pipes.pipestream(V = vs.110)的.aspx),1至读(客户端)和1写(服务器)。 – Amit

+0

谢谢@Amit,你能否详细说明如何将这些流“绑定”在一起..这对我来说并不清楚。 –

+0

如果您需要将数据从一个数据流传输到另一个数据流,通常通过从数据源读取数据块(例如1K或4K)并将数据放入目标,直到源数据流为空。 – Oliver

回答

2

使用管道作为数据的底层传输,可以有一个“写入流”(服务器)和一个允许这种通信机制的“读取流”(客户端)。

使用匿名管道或命名管道(如果需要进程间通信)很简单。要创建管道流:

AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream(); 
AnonymousPipeClientStream pipeClient = 
    new AnonymousPipeClientStream(pipeServer.GetClientHandleAsString()); 

现在,您可以用这些来写&读:

producer.WriteStream(pipeServer); 
// somewhere else... 
consumer.ReadStream(pipeClient); 
+0

这比我的解决方案容易得多。 –

+0

工程就像一个魅力 –

1

我只是把这个共同的乐趣,这是未经测试,可能有一些错误。您只需将ReaderStream传递给读者,并将WriterStream传递给作者。的[PipeStream]

public class LoopbackStream 
{ 
    public Stream ReaderStream { get; } 
    public Stream WriterStream { get;} 

    private readonly BlockingCollection<byte[]> _buffer; 

    public LoopbackStream() 
    { 
     _buffer = new BlockingCollection<byte[]>(); 
     ReaderStream = new ReaderStreamInternal(_buffer); 
     WriterStream = new WriterStreamInternal(_buffer); 
    } 

    private class WriterStreamInternal : Stream 
    { 
     private readonly BlockingCollection<byte[]> _buffer; 

     public WriterStreamInternal(BlockingCollection<byte[]> buffer) 
     { 
      _buffer = buffer; 
      CanRead = false; 
      CanWrite = false; 
      CanSeek = false; 
     } 

     public override void Close() 
     { 
      _buffer.CompleteAdding(); 
     } 

     public override int Read(byte[] buffer, int offset, int count) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void Write(byte[] buffer, int offset, int count) 
     { 
      var newData = new byte[count]; 
      Array.Copy(buffer, offset, newData, 0, count); 
      _buffer.Add(newData); 
     } 

     public override void Flush() 
     { 
     } 

     public override long Seek(long offset, SeekOrigin origin) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void SetLength(long value) 
     { 
      throw new NotSupportedException(); 
     } 

     public override bool CanRead { get; } 
     public override bool CanSeek { get; } 
     public override bool CanWrite { get; } 

     public override long Length 
     { 
      get { throw new NotSupportedException(); } 
     } 

     public override long Position 
     { 
      get { throw new NotSupportedException(); } 
      set { throw new NotSupportedException(); } 
     } 
    } 
    private class ReaderStreamInternal : Stream 
    { 
     private readonly BlockingCollection<byte[]> _buffer; 
     private readonly IEnumerator<byte[]> _readerEnumerator; 
     private byte[] _currentBuffer; 
     private int _currentBufferIndex = 0; 

     public ReaderStreamInternal(BlockingCollection<byte[]> buffer) 
     { 
      _buffer = buffer; 
      CanRead = true; 
      CanWrite = false; 
      CanSeek = false; 
      _readerEnumerator = _buffer.GetConsumingEnumerable().GetEnumerator(); 
     } 

     protected override void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       _readerEnumerator.Dispose(); 
      } 
      base.Dispose(disposing); 
     } 

     public override int Read(byte[] buffer, int offset, int count) 
     { 
      if (_currentBuffer == null) 
      { 
       bool read = _readerEnumerator.MoveNext(); 
       if (!read) 
        return 0; 
       _currentBuffer = _readerEnumerator.Current; 
      } 

      var remainingBytes = _currentBuffer.Length - _currentBufferIndex; 
      var readBytes = Math.Min(remainingBytes, count); 
      Array.Copy(_currentBuffer, _currentBufferIndex, buffer, offset, readBytes); 
      _currentBufferIndex += readBytes; 

      if (_currentBufferIndex == _currentBuffer.Length) 
       _currentBuffer = null; 

      return readBytes; 
     } 

     public override void Write(byte[] buffer, int offset, int count) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void Flush() 
     { 
     } 

     public override long Seek(long offset, SeekOrigin origin) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void SetLength(long value) 
     { 
      throw new NotSupportedException(); 
     } 

     public override bool CanRead { get; } 
     public override bool CanSeek { get; } 
     public override bool CanWrite { get; } 

     public override long Length 
     { 
      get { throw new NotSupportedException(); } 
     } 

     public override long Position 
     { 
      get { throw new NotSupportedException(); } 
      set { throw new NotSupportedException(); } 
     } 
    } 
}