0

此注册观察员是从here截取的IEventProcessor实现的一部分:与IEventProcessor

public class SimpleEventProcessor : IEventProcessor 
{ 
    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events) 
    { 
     foreach (EventData eventData in events) 
     { 

     } 
    } 
} 

作为新事件被添加到EventHub,所述ProcessEventsAsync方法被调用和foreach循环可被用于处理该事件。现在我想使用例如ObserverRegistry将观察者添加到SimpleEventProcessor中,如here所述。建议的ObserverRegistry看起来像这样:

public class ObserverRegistry : IObserverRegistry<IProjectionWriterFactory> 
{ 
    IEnumerable<object> GetObservers(IProjectionWriterFactory factory) 
    { 
     yield return new LoanApplicationObserver(); 
     yield return new OfferObserver(); 
     // more observers... 
    } 
} 

不幸的是,有几件事缺失。我如何向SimpleEventProcessor注册几个观察者,以便将事件从ProcessEventsAsync传递给所有观察者,最终他们的When方法?

回答

1

完整的源代码是here。故事梗概如下:

您可以定义在SimpleEventProcessor一个静态事件:

public class SimpleEventProcessor : IEventProcessor 
{ 
    public static event EventHandler<MessageReceivedEventArgs> OnMessageReceived;   

    public SimpleEventProcessor() 
    { } 
} 

然后在ProcessEventsAsync提高OnMessageReceived事件:

public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
{ 
    foreach (EventData message in messages) 
    { 
     OnMessageReceived(this, new MessageReceivedEventArgs() { ReceivedOn = DateTimeOffset.UtcNow, Message = message }); 
    } 
} 

非常重要:确保所有用户都在处理器关闭时删除。这是非常重要的,因为缺少退订可能导致内存静态事件泄漏article explaining this

public async Task CloseAsync(PartitionContext context, CloseReason reason) 
{ 
    if (OnMessageReceived != null) 
    { 
     foreach (EventHandler<MessageReceivedEventArgs> subscriber in OnMessageReceived.GetInvocationList()) 
     { 
      OnMessageReceived -= subscriber; 
     } 
    } 
} 

最后,你可以挂钩观察员的初始化逻辑的一部分:从控制台

ObserverRegistry registry = new ObserverRegistry(); 
foreach (IObserver observer in registry.GetObservers()) 
{ 
    SimpleEventProcessor.OnMessageReceived += new EventHandler<MessageReceivedEventArgs>(
    (sender, e) => observer.When(e)); 
} 

输出示例应用:

SimpleEventProcessor: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18 
Observer1: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18 
Observer2: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18 
SimpleEventProcessor: a29d5875-7c53-4a7c-8113-ef7c24c2851f 
Observer1: a29d5875-7c53-4a7c-8113-ef7c24c2851f 
Observer2: a29d5875-7c53-4a7c-8113-ef7c24c2851f 

我想强调以下几点:

  1. 在您的使用案例中,注册IEventProcessorFactory可能更有效,因为您可以更好地控制处理器实例化和处置。
  2. 建议尽可能使ProcessEventsAsync方法尽可能快速。在你的情况下,创建单独的消费者群体可能是更好的选择吗?

希望以上回答你的问题。