0
所有消息应发布到消息总线:如何单元测试Rx.Net的foreach订阅
upstream.Get().ForEachAsync(async e => await _bus.Publish(e, cancellationToken));
我想单元测试来验证publish方法被调用正确:
[Theory, AutoMoqData]
public void Publish_ShouldPublishAllDataSources(
[Frozen]Mock<IBus> bus,
[Frozen]Mock<IUpstream> upstream,
SUT sut,
TestScheduler scheduler,
CancellationToken cancellationToken
)
{
{
Arrange();
Act();
Assert();
}
ITestableObservable<Event> inputObservable;
void Arrange()
{
inputObservable =
scheduler.CreateColdObservable(
OnNext(1L,new Event("e1")),
OnNext(1L,new Event("e2")),
OnNext(1L,new Event("e3"))
);
upstream
.Setup(c => c.Get())
.Returns(inputObservable);
}
void Act()
{
sut.Exercise(cancellationToken);
//Do sth with scheduller to fire all events
//I've tested these:
//scheduler.Start();
//scheduler.AdvanceTo(30000L);
}
void Assert()
{
inputObservable.ForEach(e =>
bus.Verify(b =>
b.Publish(e, cancellationToken)
));
}
}
但无论我尝试测试永不停止。
更新:
下面是完整的生产代码:
class SUT
{
IUpstream _upstream;
public SUT(IUpstream upstream)
{
_upstream = upstream;
}
void Exercise(CancellationToken cancellationToken)
{
_upstream .Get()
.ForEachAsync(async
e => await _bus.Publish(e, cancellationToken));
}
}
interface IUpstream
{
IObservable<string> Get();
}
interface IBus
{
void Publish<T>(T,System.Threading.CancellationToken)
}
你真的需要为此提供[MCVE。我们可以运行一些代码来查看行为。最好只是一个简单的控制台应用。 – Enigmativity
感谢您的更新,但是您是否尝试制作[mcve],以便在控制台应用中运行? – Enigmativity
我不知道你的意思是“在控制台应用程序中运行代码的测试工具需要更多仪式”。这对我没有意义。 – Enigmativity