这是我的情景:如何将写入流1的内容流式传输到流2?
producer.WriteStream(stream);
consumer.ReadStream(stream);
我想要的东西,允许由producer
生成的字节将逐步转移到consumer
。
我可以写一切到MemoryStream
,然后倒带它并在consumer
上读取它,但这会导致巨大的内存消耗。
我该如何做到这一点?
这是我的情景:如何将写入流1的内容流式传输到流2?
producer.WriteStream(stream);
consumer.ReadStream(stream);
我想要的东西,允许由producer
生成的字节将逐步转移到consumer
。
我可以写一切到MemoryStream
,然后倒带它并在consumer
上读取它,但这会导致巨大的内存消耗。
我该如何做到这一点?
使用管道作为数据的底层传输,可以有一个“写入流”(服务器)和一个允许这种通信机制的“读取流”(客户端)。
使用匿名管道或命名管道(如果需要进程间通信)很简单。要创建管道流:
AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream();
AnonymousPipeClientStream pipeClient =
new AnonymousPipeClientStream(pipeServer.GetClientHandleAsString());
现在,您可以用这些来写&读:
producer.WriteStream(pipeServer);
// somewhere else...
consumer.ReadStream(pipeClient);
这比我的解决方案容易得多。 –
工程就像一个魅力 –
我只是把这个共同的乐趣,这是未经测试,可能有一些错误。您只需将ReaderStream
传递给读者,并将WriterStream
传递给作者。的[PipeStream]
public class LoopbackStream
{
public Stream ReaderStream { get; }
public Stream WriterStream { get;}
private readonly BlockingCollection<byte[]> _buffer;
public LoopbackStream()
{
_buffer = new BlockingCollection<byte[]>();
ReaderStream = new ReaderStreamInternal(_buffer);
WriterStream = new WriterStreamInternal(_buffer);
}
private class WriterStreamInternal : Stream
{
private readonly BlockingCollection<byte[]> _buffer;
public WriterStreamInternal(BlockingCollection<byte[]> buffer)
{
_buffer = buffer;
CanRead = false;
CanWrite = false;
CanSeek = false;
}
public override void Close()
{
_buffer.CompleteAdding();
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
var newData = new byte[count];
Array.Copy(buffer, offset, newData, 0, count);
_buffer.Add(newData);
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override bool CanRead { get; }
public override bool CanSeek { get; }
public override bool CanWrite { get; }
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
}
private class ReaderStreamInternal : Stream
{
private readonly BlockingCollection<byte[]> _buffer;
private readonly IEnumerator<byte[]> _readerEnumerator;
private byte[] _currentBuffer;
private int _currentBufferIndex = 0;
public ReaderStreamInternal(BlockingCollection<byte[]> buffer)
{
_buffer = buffer;
CanRead = true;
CanWrite = false;
CanSeek = false;
_readerEnumerator = _buffer.GetConsumingEnumerable().GetEnumerator();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_readerEnumerator.Dispose();
}
base.Dispose(disposing);
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_currentBuffer == null)
{
bool read = _readerEnumerator.MoveNext();
if (!read)
return 0;
_currentBuffer = _readerEnumerator.Current;
}
var remainingBytes = _currentBuffer.Length - _currentBufferIndex;
var readBytes = Math.Min(remainingBytes, count);
Array.Copy(_currentBuffer, _currentBufferIndex, buffer, offset, readBytes);
_currentBufferIndex += readBytes;
if (_currentBufferIndex == _currentBuffer.Length)
_currentBuffer = null;
return readBytes;
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override bool CanRead { get; }
public override bool CanSeek { get; }
public override bool CanWrite { get; }
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
}
}
使用2实例(https://msdn.microsoft.com/en-us/library/system.io.pipes.pipestream(V = vs.110)的.aspx),1至读(客户端)和1写(服务器)。 – Amit
谢谢@Amit,你能否详细说明如何将这些流“绑定”在一起..这对我来说并不清楚。 –
如果您需要将数据从一个数据流传输到另一个数据流,通常通过从数据源读取数据块(例如1K或4K)并将数据放入目标,直到源数据流为空。 – Oliver