您应该考虑对此使用Reactive Extensions。它处理值(事件)流,并可以删除对锁的需求。
首先,我要为Add
,Complete
和RequestView
动作定义一些动作类。这是为了表现得像在F#例如识别联合:
public class EventAction
{
public static EventAction Add(int value) => new AddAction(value);
public static readonly RequestViewAction RequestView = new RequestViewAction();
public static readonly EventAction Complete = new CompleteAction();
}
public class AddAction : EventAction
{
public readonly int Value;
public AddAction(int value) => Value = value;
}
public class CompleteAction : EventAction
{
}
public class RequestViewAction : EventAction
{
}
接下来我会创建一个名为AggregateView
类型将举行三个的Rx Subject
值:
这里流在班上是:
using System;
using LanguageExt;
using static LanguageExt.Prelude;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public class AggregateView : IDisposable
{
readonly Subject<EventAction> aggregator = new Subject<EventAction>();
readonly Subject<int> events = new Subject<int>();
readonly Subject<Lst<int>> view = new Subject<Lst<int>>();
readonly IDisposable subscription;
public AggregateView()
{
// Creates an aggregate view of the integers that responds to various control
// actions coming through.
subscription = aggregator.Aggregate(
Lst<int>.Empty,
(list, action) =>
{
switch(action)
{
// Adds an item to the aggregate list and passes it on to the
// events Subject
case AddAction add:
events.OnNext(add.Value);
return list.Add(add.Value);
// Clears the list and passes a list onto the views Subject
case CompleteAction complete:
view.OnNext(Lst<int>.Empty);
return Lst<int>.Empty;
// Gets the current aggregate list and passes it onto the
// views Subject
case RequestViewAction req:
view.OnNext(list);
return list;
default:
return list;
}
})
.Subscribe(x => { });
}
/// <summary>
/// Observable stream of integer events
/// </summary>
public IObservable<int> Events =>
events;
/// <summary>
/// Observable stream of list views
/// </summary>
public IObservable<Lst<int>> Views =>
view;
/// <summary>
/// Listener for plugging into an event
/// </summary>
public void Listener(int value) =>
aggregator.OnNext(EventAction.Add(value));
/// <summary>
/// Clears the aggregate view and post it to Views
/// </summary>
public void Complete() =>
aggregator.OnNext(EventAction.Complete);
/// <summary>
/// Requests a the current aggregate view to be pushed through to
/// the Views subscribers
/// </summary>
public void RequestView() =>
aggregator.OnNext(EventAction.RequestView);
/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
subscription?.Dispose();
view?.OnCompleted();
events?.OnCompleted();
view?.Dispose();
events?.Dispose();
}
}
它有两个IObservable
属性:
Views
- 它允许您订阅的总名单
Events
- 它允许您订阅整数事件
也有一些有用的方法:
Listener
- 这就是你将插入您的event
Complete
- 这将清空聚合列表并发送一个空列表到View
可观察
RequestView
- 这会将当前聚集列表发送给Views
可观察对象的所有订阅者。
最后进行测试:
class Program
{
static event Action<int> eventTest;
static void Main(string[] args)
{
var aggregate = new AggregateView();
eventTest += aggregate.Listener;
aggregate.Views.Subscribe(ReceiveList);
aggregate.Events.Subscribe(ReceiveValue);
eventTest(1);
eventTest(2);
eventTest(3);
eventTest(4);
eventTest(5);
aggregate.RequestView();
aggregate.Complete();
eventTest(6);
eventTest(7);
eventTest(8);
eventTest(9);
eventTest(10);
aggregate.RequestView();
}
static void ReceiveList(Lst<int> list) =>
Console.WriteLine($"Got list of {list.Count} items: {ListShow(list)}");
static void ReceiveValue(int x) =>
Console.WriteLine(x);
static string ListShow(Lst<int> list) =>
String.Join(", ", list);
}
这是我能想到的事件打交道时最实用的方式。 Action<int>
应该始终是红旗对于任何想要在功能上工作的人,因为默认情况下它有副作用并且不纯。所以你需要尽可能地封装副作用,并使其他一切都变得纯洁。
顺便说一句,你可以概括这整个事情与任何类型的工作。这使它更有用:
public enum EventActionTag
{
Add,
Complete,
RequestView
}
public class EventAction<T>
{
public readonly EventActionTag Tag;
public static EventAction<T> Add(T value) => new AddAction<T>(value);
public static readonly EventAction<T> RequestView = new RequestViewAction<T>();
public static readonly EventAction<T> Complete = new CompleteAction<T>();
public EventAction(EventActionTag tag) =>
Tag = tag;
}
public class AddAction<T> : EventAction<T>
{
public readonly T Value;
public AddAction(T value) : base(EventActionTag.Add) =>
Value = value;
}
public class CompleteAction<T> : EventAction<T>
{
public CompleteAction() : base(EventActionTag.Complete)
{ }
}
public class RequestViewAction<T> : EventAction<T>
{
public RequestViewAction() : base(EventActionTag.RequestView)
{ }
}
public class AggregateView<T> : IDisposable
{
readonly Subject<EventAction<T>> aggregator = new Subject<EventAction<T>>();
readonly Subject<T> events = new Subject<T>();
readonly Subject<Lst<T>> view = new Subject<Lst<T>>();
readonly IDisposable subscription;
public AggregateView()
{
// Creates an aggregate view of the integers that responds to various control
// actions coming through.
subscription = aggregator.Aggregate(
Lst<T>.Empty,
(list, action) =>
{
switch(action.Tag)
{
// Adds an item to the aggregate list and passes it on to the
// events Subject
case EventActionTag.Add:
var add = (AddAction<T>)action;
events.OnNext(add.Value);
return list.Add(add.Value);
// Clears the list and passes a list onto the views Subject
case EventActionTag.Complete:
view.OnNext(Lst<T>.Empty);
return Lst<T>.Empty;
// Gets the current aggregate list and passes it onto the
// views Subject
case EventActionTag.RequestView:
view.OnNext(list);
return list;
default:
return list;
}
})
.Subscribe(x => { });
}
/// <summary>
/// Observable stream of integer events
/// </summary>
public IObservable<T> Events =>
events;
/// <summary>
/// Observable stream of list views
/// </summary>
public IObservable<Lst<T>> Views =>
view;
/// <summary>
/// Listener for plugging into an event
/// </summary>
public void Listener(T value) =>
aggregator.OnNext(EventAction<T>.Add(value));
/// <summary>
/// Clears the aggregate view and post it to Views
/// </summary>
public void Complete() =>
aggregator.OnNext(EventAction<T>.Complete);
/// <summary>
/// Requests a the current aggregate view to be pushed through to
/// the Views subscribers
/// </summary>
public void RequestView() =>
aggregator.OnNext(EventAction<T>.RequestView);
/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
subscription?.Dispose();
view?.OnCompleted();
events?.OnCompleted();
view?.Dispose();
events?.Dispose();
}
}
试试ConcurrentBag? https://www.dotnetperls.com/concurrentbag – Trey