2012-02-24 138 views
2

我目前有一个程序,用于侦听网络流并在新消息被反序列化时触发事件。使用System.Reactive反序列化消息

while(true) 
{ 
    byte[] lengthBytes = new byte[10]; 
    networkStream.Read(lengthBytes, 0, 10); 
    int messageLength = Int32.Parse(Encoding.UTF8.GetString(lengthBytes)); 
    var messageBytes = new byte[messageLength + 10]; 
    Array.Copy(lengthBytes, messageBytes, 10); 
    int bytesReadTotal = 10; 
    while (bytesReadTotal < 10 + messageLength) 
    bytesReadTotal += networkStream.Read(messageBytes, bytesReadTotal, messageLength - bytesReadTotal + 10); 
    OnNewMessage(new MessageEventArgs(messageFactory.GetMessage(messageBytes))); 
} 

我想改写这个利用反应扩展,而不是事件,以便有一个IObservable<Message>。这可以使用

Observable.FromEvent<EventHandler<MessageEventArgs>, MessageEventArgs>(
    (h) => NewMessage += h, 
    (h) => NewMessage -= h) 
    .Select( (e) => { return e.Message; }); 

但是,我宁愿重写使用System.Reactive监听过程,而不是。我的出发点(从here)是

Func<byte[], int, int, IObservable<int>> read; 
read = Observable.FromAsyncPattern<byte[], int, int, int>(
networkStream.BeginRead, 
networkStream.EndRead); 

允许

byte[] lengthBytes = new byte[10]; 
read(lengthBytes, 0, lengthBytes.Length).Subscribe(
{ 
    (bytesRead) => ; 
}); 

我挣扎,看看如何虽继续。有没有人有执行?

回答

1

我想出了以下内容,但是我觉得应该可以不创建类并使用Subject<T>(例如,通过头部数据包将正文数据包投影到消息对象,但问题是EndRead()不会返回字节数组,但会返回读取的字节数,因此您需要一个对象或至少一个关闭点)。

class Message 
{ 
    public string Text { get; set; } 
} 

class MessageStream : IObservable<Message> 
{ 
    private readonly Subject<Message> messages = new Subject<Message>(); 

    public void Start() 
    { 
     // Get your real network stream here. 
     var stream = Console.OpenStandardInput(); 
     GetNextMessage(stream); 
    } 

    private void GetNextMessage(Stream stream) 
    { 
     var header = new byte[10]; 
     var read = Observable.FromAsyncPattern<byte [], int, int, int>(stream.BeginRead, stream.EndRead); 
     read(header, 0, 10).Subscribe(b => 
     { 
      var bodyLength = BitConverter.ToInt32(header, 0); 
      var body = new byte[bodyLength]; 
      read(body, 0, bodyLength).Subscribe(b2 => 
      { 
       var message = new Message() {Text = Encoding.UTF8.GetString(body)}; 
       messages.OnNext(message); 
       GetNextMessage(stream); 
      }); 
     }); 
    } 

    public IDisposable Subscribe(IObserver<Message> observer) 
    { 
     return messages.Subscribe(observer); 
    } 
} 
0

由于Observable.FromAsyncPattern不仅使异步调用一次,你将需要做的是将它称为多次,而不是一个功能。这应该让你开始,但可能有很大的改进空间。它假定您可以使用相同的参数重复进行异步调用,并假定selector将处理由此产生的任何问题。

Function FromRepeatedAsyncPattern(Of T1, T2, T3, TCallResult, TResult)(
      begin As Func(Of T1, T2, T3, AsyncCallback, Object, IAsyncResult), 
      [end] As Func(Of IAsyncResult, TCallResult), 
      selector As Func(Of TCallResult, TResult), 
      isComplete As Func(Of TCallResult, Boolean) 
      ) As Func(Of T1, T2, T3, IObservable(Of TResult)) 
    Return Function(a1, a2, a3) Observable.Create(Of TResult)(
     Function(obs) 
      Dim serial As New SerialDisposable() 
      Dim fac = Observable.FromAsyncPattern(begin, [end]) 
      Dim onNext As Action(Of TCallResult) = Nothing 
      'this function will restart the subscription and will be 
      'called every time a value is found 
      Dim subscribe As Func(Of IDisposable) = 
       Function() 
        'note that we are REUSING the arguments, the 
        'selector should handle this appropriately 
        Return fac(a1, a2, a3).Subscribe(onNext, 
                Sub(ex) 
                 obs.OnError(ex) 
                 serial.Dispose() 
                End Sub) 
       End Function 
      'set up the OnNext handler to restart the observer 
      'every time it completes 
      onNext = Sub(v) 
         obs.OnNext(selector(v)) 
         'subscriber disposed, do not check for completion 
         'or resubscribe 
         If serial.IsDisposed Then Exit Sub 
         If isComplete(v) Then 
          obs.OnCompleted() 
          serial.Dispose() 
         Else 
          'using the scheduler lets the OnNext complete before 
          'making the next async call. 
          'you could parameterize the scheduler, but it may not be 
          'helpful, and it won't work if Immediate is passed. 
          Scheduler.CurrentThread.Schedule(Sub() serial.Disposable = subscribe()) 
         End If 
        End Sub 
      'start the first subscription 
      serial.Disposable = subscribe() 
      Return serial 
     End Function) 
End Function 

从这里,你可以得到一个IObservable(Of Byte)像这样:

Dim buffer(4096 - 1) As Byte 
Dim obsFac = FromRepeatedAsyncPattern(Of Byte(), Integer, Integer, Integer, Byte())(
       AddressOf stream.BeginRead, AddressOf stream.EndRead, 
       Function(numRead) 
        If numRead < 0 Then Throw New ArgumentException("Invalid number read") 
        Console.WriteLine("Position after read: " & stream.Position.ToString()) 
        Dim ret(numRead - 1) As Byte 
        Array.Copy(buffer, ret, numRead) 
        Return ret 
       End Function, 
       Function(numRead) numRead <= 0) 
'this will be an observable of the chunk size you specify 
Dim obs = obsFac(buffer, 0, buffer.Length) 

从那里,你将需要某种形式的蓄能器的功能时,他们发现,以字节数组和输出的完整信息。这种功能的框架可能如下所示:

Public Function Accumulate(source As IObservable(Of Byte())) As IObservable(Of Message) 
    Return Observable.Create(Of message)(
     Function(obs) 
      Dim accumulator As New List(Of Byte) 
      Return source.Subscribe(
       Sub(buffer) 
        'do some logic to build a packet here 
        accumulator.AddRange(buffer) 
        If True Then 
         obs.OnNext(New message()) 
         'reset accumulator 
        End If 
       End Sub, 
       AddressOf obs.OnError, 
       AddressOf obs.OnCompleted) 
     End Function) 
End Function