2014-10-07 73 views
0

无处不在似乎在讨论从频道中读取应始终是阻止操作。态度似乎是这是Go的方式。这是有道理的,但我试图弄清楚如何从渠道中汇总信息。转到非阻止多频道接收

例如,发送http请求。假设我有一个可以产生数据流的流水线设置,所以我有一个产生队列/流点的通道。然后,我可以让一个goroutine监听这个频道,并发送一个HTTP请求将它存储在一个服务中。这有效,但我正在为每个点创建一个http请求。

我送它也端点可以让我在一个批处理发送多个数据点。我想要做的,是

  1. 读取尽可能多的值,直到我将阻止频道。
  2. 合并它们/发送单个http请求。
  3. 然后在信道被阻塞,直到我可以再次读取 之一。

这就是我将如何在C中完成事情,包括线程安全队列和选择语句。在可能的情况下基本上刷新整个/队列缓冲区。这是一个有效的技术吗?

看来去select语句确实给我类似C的选择的东西,但我仍然不知道,如果有一个通道上“非阻塞读”。

编辑:我也愿意接受什么,我打算可能不是去路,而是不断砸过不停的http请求也似乎我错了,特别是如果他们可以聚合。如果有人有一个很酷的替代架构,但我想避免这样的事情,例如神奇地缓冲N个物品,或等待X秒直到发送。

+0

相关:https://blog.golang.org/go-concurrency-patterns-timing-out-and – dyoo 2014-10-07 03:06:47

+1

虽然它不是真的超时。如果没有,我想阻止,然后在可能时读取多个。如果多个由1组成,那就很好。如果有100件事情,让我把它们全部处理掉。这样,http帖子就可以适应负载。 – 2014-10-07 03:21:51

回答

3

这里,直到信道是空的怎么批。变量batch是您的数据点类型的一部分。变量ch是数据点类型的通道。

var batch []someType 
for { 
    select { 
    case v := <-ch: 
     batch = append(batch, v) 
    default: 
     if len(batch) > 0 { 
      sendBatch(batch) 
      batch := batch[:0] 
     } 
     batch = append(batch, <-ch) // receiving a value here prevents busy waiting. 
    } 
} 

您应该防止该批次无限制地增长。这里有一个简单的方法来做到这一点:

var batch []someType 
for { 
    select { 
    case v := <-ch: 
     batch = append(batch, v) 
     if len(batch) >= batchLimit { 
      sendBatch(batch) 
      batch := batch[:0] 
     } 
    default: 
     if len(batch) > 0 { 
      sendBatch(batch) 
      batch := batch[:0] 
     } 
     batch = append(batch, <-ch) 
    } 
} 
+0

我认为这可能就像试图阅读我在选择中等待的东西:)。这是一个很好的例子。看起来确实很奇怪:) – 2014-10-07 08:48:05

+1

如果你不想自己实现它,这个模式也在我的频道包中的BatchingChannel中实现:https://godoc.org/github。com/eapache/channels#BatchingChannel – Evan 2014-10-07 14:23:20

2

Dewy Broto为您的问题提供了一个很好的解决方案。这是一个直截了当的直接解决方案,但我想更广泛地评论您将如何寻找针对不同问题的解决方案。

Go使用通信顺序过程代数(CSP)作为其信道,选择和轻量级进程(“够程”)的基础。 CSP保证事件的顺序;它只是通过做出选择而引入非确定性(又名select)。保证排序有时被称为“之前发生” - 它使编码比替代(广泛流行)非阻塞风格简单得多。它还为创建组件提供了更多空间:长期功能的单元,通过可预测的方式通过渠道与外部世界进行交互。

也许在频道上讨论阻止会给人们学习Go的方式带来心理障碍。我们在I/O块,但我们等待在频道上。在通道上等待是不应该皱眉的,只要系统作为一个整体具有足够的平行松弛度(即其他活动参数)来保持CPU忙碌。

可视化组件

所以,回到你的问题。让我们从组件的角度思考它,你有很多你需要探索的要点。假设每个源都是一个goroutine,它就会在您的设计中用输出通道形成一个组件。 Go允许共享通道端,因此许多来源可以安全地将它们的点按顺序交错到单个通道上。你不需要做任何事情 - 这只是渠道的工作方式。

Dewy Broto描述的配料功能本质上是另一个组件。作为一种学习练习,以这种方式表达它是一件好事。批处理组件具有一个点输入通道和一个批量输出通道。

最后,HTTP I/O行为也可能是一个带有一个输入通道和没有输出通道的组件,仅用于接收整个批次的点,然后通过HTTP发送它们。

以只有一个源的简单情况,这可能是描述是这样的:

+--------+  point  +---------+  batch  +-------------+ 
| source +------->-------+ batcher +------->-------+ http output | 
+--------+    +---------+    +-------------+ 

的这里的目的是在他们的基本水平描绘出不同的活动。这有点像数字电路图,这不是巧合。

你确实可以在Go中实现它,它会工作。它甚至可以工作得很好,但实际上您可能更喜欢通过组合成对元件来优化它,必要时重复。在这种情况下,很容易将batcher和http输出结合起来,并以Dewy Broto的解决方案结束。

重要的一点是,围棋并发发生的最简单的

  • (一)不用担心前面有关阻止;
  • (b)描述需要在相当细致的级别上进行的活动(您可以在简单的情况下在头上做这件事);
  • (c)如果需要,通过组合功能进行优化。

我将离开作为一个挑战更可视化移动通道结束(Pi-Calculus)的主题,其中通道用于将通道末端发送到其他goroutines。

+0

我的系统实际上是以这种方式构建的。我有大约7个左右的“管道组件”,每个组件都推到下一个。它的设计非常有趣。它或多或少地收集来自不同系统的数据点,并在将每个点存储在最终系统之前将其转换为新的点。我甚至有一个单点http输出组件的工作实现,但是它在1秒内丢弃了3天的数据,而且速度很慢。 259200 http连接的价格相当昂贵,因此想要弄清楚'组合'进入通道的事物的方式,以便他们可以采取行动:)。 – 2014-10-09 00:34:20

+0

虽然写得非常好。这对于解决这个问题的其他人来说是非常有用的。在做'GO方式'的时候要记住好东西, – 2014-10-09 00:35:06