我正在写一个服务结构中承载的有状态的服务。该服务的工作是消费来自外部队列的消息,转换它们并将它们放置到我们自己的消息传递系统中。根据供应商文档,吞吐量可以达到6k信息/秒。System.Fabric.FabricNotPrimaryException当从定时器保存状态
我已经配置的服务成多个分区扩展消息负荷,并且每个分区有分2 /最大3个副本。为了从故障中恢复,我可以订阅供应商队列,并传递一个时间戳,从这个时间点我希望收到消息。要做到这一点,我要存储处于服务状态的最后一条消息的时间戳。由于消息量,我决定做这个计时器“保存”(并允许消息的潜在的DUP下游)
这是由时间调用的代码:每次
private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
var saveRetryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(5, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
);
await saveRetryPolicy.ExecuteAsync(async() =>
{
using (var tx = _stateManager.CreateTransaction())
{
var state = await _stateManager.TryGetAsync<IReliableDictionary<string, long>>(TimestampStateName);
if (state.HasValue)
{
await state.Value.AddOrUpdateAsync(tx, TimestampStateName, _lastTXTimestamp,
(s, l) => _lastTXTimestamp);
await tx.CommitAsync();
}
else
{
var s =
await _stateManager.GetOrAddAsync<IReliableDictionary<string, long>>(tx, TimestampStateName);
await tx.CommitAsync();
_timer_Elapsed(this, null);
}
}
});
}
试图坚持这一点,我得到一个'System.Fabric.FabricNotPrimaryException'错误,在每个分区上。
我已经包含一个重试策略(由Polly Retry提供),因为对于建议这样做的类似问题发表了评论。这并没有影响,因为延长错误报告前的时间。
我误解的东西与SF应该如何使用基本的?这对我来说似乎是一个简单的用例。从评论
你开始对所有副本的计时器?还是只在主副本上? – LoekD
很棒的地方(我现在看起来不笨)。谢谢 –