2016-12-27 83 views
3

我该如何确定走频道的哪一边在另一边等待?如何确定走频道的哪一边正在等待?

我想知道这个,所以我可以找出我的处理正在受到限制,并通过分配更多资源进行响应。

一些选项

2种方法我认为双方都需要的东西所以测量不太吵做记录值的移动平均值,但是这不是一个bigproblem


  1. 使用一个定时器来检查的时间%的消费者

等待在单个消费者的情况下,我可以从通道消耗后,我停止计时器之前启动一个定时器得到一个记录。我可以跟踪每个获取周期中等待和响应的时间百分比。

缓冲信道的
  • 试样长度
  • 如果信道经常是0,这意味着我们正在消耗比发送速度更快。同样,如果缓冲区已满,我们发送速度比我们可以接收的速度快。我们可以随时检查我们频道的长度,以确定运行缓慢的情况。


    出于性能原因或其他原因,是否有充分的理由选择其中一个到另一个?有一个更简单的解决方案来解决这个问题吗?

    我正在执行N HTTP请求抓住在同一时间在多达W够程内容,并在单个的goroutine运行发送所有内容的信道到一个processor,服务,其在将数据反馈给客户端。

    每个工作任务将导致通道上发送大量消息。每个worker的任务可能需要几分钟才能完成。

    下图总结了3名并发工作人员(W=3)的数据流。

    [worker: task 1] - 
             \ 
        [worker: task 2] - | --- [ channel ] --- [ processor ] -> [ client ] 
            /
        [worker: task 3] - 
    

    我想知道我是否应该请求过程中运行更多的工人(增加W)或更少的工人(减少W)。由于客户端通过非常不同的速度进行连接,因此每个请求可能会有所不同。

    +0

    您是否尝试过仅运行阻止配置文件? – JimB

    +0

    你可以使用'len()'和'cap()'作为频道 –

    +0

    @JimB我不清楚你的意思是“阻止配置文件”。 – turtlemonvh

    回答

    3

    达到目标的一种方法是使用“有限发送”和“有限接收”操作 - 如果您能够提出合理的轮询超时时间。

    当你的任何一个工作人员试图通过通道发送一个完整的结果时,不要让它永远阻塞(直到通道缓冲区中有空间);相反,只允许它阻止一些最长时间。如果超时发生在通道缓冲区中有空间之前,您可以对该情况做出反应:计算发生次数,调整未来最后期限,节流或减少工作人员数量等等。

    同样,对于从处理器接收结果的“处理器”,您可以限制其阻塞的时间量。如果超时发生在有可用值之前,则处理器将停止运行。创造更多的工作人员来更快地喂食(假设工人将从这种并行性中受益)。

    这种方法的缺点是每个sendreceive operation的开销creating timers

    草图,这些声明可以访问到每个工人:

    const minWorkers = 3 
    var workers uint32 
    

    在每个工人够程:

    atomic.AddUint32(&workers, 1) 
    for { 
        result, ok := produce() 
        if !ok { 
         break 
        } 
        // Detect when channel "p"'s buffer is full. 
        select { 
        case p <- result: 
        case <-time.After(500 * time.Millisecond): 
         // Hand over the pending result, no matter how long it takes. 
         p <- result 
         // Reduce worker count if above minimum. 
         if current := atomic.LoadUint32(&workers); current > minWorkers && 
          atomic.CompareAndSwapUint32(&workers, current, current-1) { 
          return 
         } 
         // Consider whether to try decrementing the working count again 
         // if we're still above the minimum. It's possible another one 
         // of the workers also exited voluntarily, changing the count. 
        } 
    } 
    atomic.AddUint32(&workers, -1) 
    

    注意,按照上面的方法,你可以实现通过定时如何同样的效果长时间需要发送到通道p来完成,并且对其做出反应花费了太长的时间,而不是进行一个有界发送,然后是潜在的阻塞发送。不过,我这样描绘了它,因为我怀疑这样的代码会成熟到在超时到期时包含日志和仪器计数器颠簸。

    同样,在你的处理器的goroutine,你可以限制你阻止工人接收值的时间量:

    for { 
        select { 
        case result <- p: 
         consume(result) 
        case <-time.After(500 * time.Millisecond): 
         maybeStartAnotherWorker() 
        } 
    } 
    

    显然,有很多的旋钮可以附加到这个玩意儿。你最终将生产者的安排与消费者和生产者本身联系起来。引入生产者和消费者“抱怨”延迟的不透明“听众”,可以让您打破这种循环关系,更轻松地改变管理您对拥堵的反应的政策。

    +0

    对不起,延迟回复。这将适用于通道使用的超时始终小于从通道读取的延迟时间的情况。如果我们想捕获负载的周期性波动,我们可以依靠[nyqist](https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem)告诉我们,我们的采样率需要非常高。在这种情况下,我们将把一个离散的数据流解释为一个连续的数据流,然后再次离散化为样本。 – turtlemonvh

    +0

    当然,我所说的获得频道的长度也有同样的问题。您的解决方案意味着我们不需要使用缓冲通道来获得准确的测量结果(并且所有事情都有更多的事件驱动);我需要一个后台进程来完成抽样,但是没有定时器的开销。您的计时器可能需要与发送到通道之间的估计时间绑定,这在某些情况下可能很难计算。它可能不是我所需要的最佳解决方案,但它绝对是一个很酷的主意 - 谢谢! – turtlemonvh