2015-09-13 41 views
19

我有一个Java类型的actor,负责可能暂时不可用的外部资源上的过滤器/重试逻辑。演员的领域,常用的方法有:无限期地等待可能永远不会到达的消息

public class MyActorImpl implements MyActor { 
    private static final long MINWAIT = 50; 
    private static final long MAXWAIT = 1000; 
    private static final long DEFAULTWAIT = 0; 
    private static final double BACKOFFMULTIPLIER = 1.5; 

    private long updateWait(long currentWait) { 
     return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT); 
    } 

    // mutable 
    private long opWait = DEFAULTWAIT; 
    private final Queue<OpInput> opBuffer = new ArrayDeque<>(); 

    // called from external actor 
    public void operation(OpInput opInput) { 
     operation(opInput, DEFAULTWAIT); 
    } 

    // called internally 
    public void operation(OpInput opInput, long currentWait); 
} 

演员有几个操作所有或多或少相同的重试/缓冲逻辑;每个操作都有自己的[op]Wait[op]Buffer字段。

  1. 父演员调用void operation(OpInput opInput)
  2. void operation(OpInput opInput, long currentWait)使用DEFAULTWAIT用于第二参数前述方法调用
  3. 如果currentWait参数不等于opWait然后输入被存储在opBuffer,否则该输入被发送到外部资源。
  4. 如果外部资源返回成功,则opWait设置为DEFAULTWAIT,并且opBuffer的内容通过operation(opInput)方法发回。如果外部资源(或更可能是网络)返回错误,则我使用opWait毫秒的延迟更新opWait = updateWait(opWait)和调度参与者系统调度器上的operation(opInput, opWait)

I.e.我使用actor系统调度器来实现指数退避;我正在使用currentWait参数来标识正在重试的消息,并正在缓冲其他消息,直到主消息被外部资源成功处理。

问题是,如果预定的operation(opInput, currentWait)消息丢失,那么我将永远缓冲消息,因为currentWait == opWait防护将对所有其他消息失败。我可以使用像spring-retry这样的实现指数回退,但我没有看到合并操作的重试循环的方式,这意味着我可以在每个重试循环中使用一个线程(而使用actor系统的调度器不会放得太多系统上的应变)。

我正在寻找一种更容错的方法来实现角色和外部资源之间的接口上的缓冲和指数退避,而不必为任务分配太多资源。

回答

8

如果我正确理解你,如果唯一的问题是失去了计划的消息,你为什么不只是使用类似的Reliable Proxy Pattern针对特定信息,然后如果失败opWait = DEFAULTWAIT;

有关于你的代码的一些事情,我得到的,当你说public void operation(OpInput opInput)被称为外部时,我不明白你的意思。你的意思是说这种方法与网络相互作用,网络使用的资源有时不可用?

如果我可以,我建议一个替代方案。根据我的理解,你的主要问题是你有一个不可用的资源,所以你有某种你使用某种等待逻辑实现的que/buffer,这样消息一旦被再次使用就会被处理,这很不幸涉及一些可能会丢失并导致无限等待的消息。我认为你可以通过超时使用期货来实现你想要的。然后重试,如果未来在一段时间内没有完成,就说3次重复。您甚至可以根据服务器负载和完成消息所需的时间来调整此时间。希望有所帮助。

+0

“调用外部”我的意思是该方法被另一个actor调用,而“内部调用”方法只能通过'self.operation(...)调用。并感谢您使用未来的建议,而不是预定的消息,这大大简化了代码 –

相关问题