2010-05-06 124 views
12

我有一个Stream对象,偶尔会在它上面获取一些数据,但时间间隔不可预知。 Stream上出现的消息已定义良好,并预先声明了其有效负载的大小(大小是每个消息的前两个字节中包含的一个16位整数)。从流中连续读取?

我想有一个StreamWatcher类来检测Stream何时有一些数据。一旦它发生,我想要引发一个事件,以便订阅的StreamProcessor实例可以处理新消息。

这可以用C#事件完成,而不直接使用线程?它看起来应该是直截了当的,但我无法用正确的方式来设计它。

回答

0

是的,这可以做到。使用非阻塞Stream.BeginRead方法与AsyncCallback。当数据可用时,该回调将被异步调用。在回调中,请拨打Stream.EndRead以获取数据,并再次拨打Stream.BeginRead以获取下一个数据块。在一个足够容纳消息的字节数组中缓冲传入的数据。一旦字节数组已满(可能需要多个回调调用),请提出事件。然后阅读下一个消息大小,创建一个新的缓冲区,重复,完成。

+0

他说没有使用线程! =) – Nayan 2010-05-06 22:29:26

+0

@Nayan:异步*是*多线程。这个问题本质上是多线程的问题之一。我怀疑OP的意思是他不想自己明确地创建线程。 – 2010-05-06 22:32:04

+0

@Nayan:我假设OP正在寻找一种解决方案,它不会显式创建一个新的线程,并使用阻塞的Read方法,但是对于非阻塞的异步解决方案。没有线程的情况下不能有异步性。 – dtb 2010-05-06 22:33:17

1

正常的做法是使用Stream公开的.NET asynchronous programming pattern。本质上,您通过调用Stream.BeginRead来异步开始读取,并向其传递一个byte[]缓冲区和一个回调方法,该方法将在从流中读取数据时调用。在回调方法中,您调用Stream.EndRead,并将其传递给您的回调的参数IAsncResultEndRead的返回值告诉你有多少个字节被读入缓冲区。

一旦以这种方式收到了前几个字节,就可以通过再次调用BeginRead来等待消息的其余部分(如果第一次没有获得足够的数据)。一旦你得到了整个信息,你就可以举办活动。

11

当你说不能使用线程直接,我想你还是想通过异步调用使用它们间接,否则这不会是非常有用的。

您只需要将Stream的异步方法换行并将结果存储在缓冲区中。首先,让我们定义的规范的情况下,部分:

public delegate void MessageAvailableEventHandler(object sender, 
    MessageAvailableEventArgs e); 

public class MessageAvailableEventArgs : EventArgs 
{ 
    public MessageAvailableEventArgs(int messageSize) : base() 
    { 
     this.MessageSize = messageSize; 
    } 

    public int MessageSize { get; private set; } 
} 

现在,从流中读取一个16位整数异步和报到时,它的准备:

public class StreamWatcher 
{ 
    private readonly Stream stream; 

    private byte[] sizeBuffer = new byte[2]; 

    public StreamWatcher(Stream stream) 
    { 
     if (stream == null) 
      throw new ArgumentNullException("stream"); 
     this.stream = stream; 
     WatchNext(); 
    } 

    protected void OnMessageAvailable(MessageAvailableEventArgs e) 
    { 
     var handler = MessageAvailable; 
     if (handler != null) 
      handler(this, e); 
    } 

    protected void WatchNext() 
    { 
     stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback), 
      null); 
    } 

    private void ReadCallback(IAsyncResult ar) 
    { 
     int bytesRead = stream.EndRead(ar); 
     if (bytesRead != 2) 
      throw new InvalidOperationException("Invalid message header."); 
     int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0]; 
     OnMessageAvailable(new MessageAvailableEventArgs(messageSize)); 
     WatchNext(); 
    } 

    public event MessageAvailableEventHandler MessageAvailable; 
} 

我想这就是它。这假设无论哪个类正在处理消息也可以访问Stream,并准备根据事件中的消息大小同步或异步地读取它。如果你想让观察者类实际阅读整个消息,那么你将不得不添加更多的代码来做到这一点。

+0

+1的实现证明了我的BeginRead理论是不正确的解决方案。你的实施清除了这种怀疑。 – Nayan 2010-05-06 22:39:15

+0

光荣!这看起来好像有用,明天我会试试看。 – 2010-05-06 22:42:15

+0

+1。 Nitpicking:AFAIK只要返回至少一个字节(如果未达到结尾),则允许Stream.Read返回少于计数字节的字节数。所以如果'(bytesRead!= 2)'你不应该抛出一个异常,而是再次读入BeginRead,直到两个字节被读取。 – dtb 2010-05-06 22:54:43

1

是不是使用Stream.BeginRead()与在独立线程中使用同步Stream.Read()方法相同?