2016-08-23 45 views
1

我是Storm的新手。我正在将它用于大学项目。如何停止Storm中的元组处理并执行其他代码

我创建了我的拓扑,连接到MySql数据库的Spout和两个Bolts。与喷口相连的第一个螺栓准备并去除元组中不需要的信息;第二,做元组的过滤。

我在本地模式下工作。

我的问题是: 为什么运行拓扑后,在我的控制台中,我看到输出像下面的行?

38211 [Thread-14-movie-SPOUT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30] 
67846 [Thread-10-__acker] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67846 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67852 [Thread-10-__acker] INFO backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]] 
67853 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]] 
67854 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67855 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]] 

,我读了这些行处理的最后一个元组后,被认为是正常的。不是吗?

如何在提交拓扑后运行其他代码?例如,我想打印在第二个螺栓中完成的过滤结果,保存在HashMap中。 如果我将代码放在包含submitTopology()方法的行之后,代码将在元组完成之前运行。

第二个和最后一个问题是:为什么在风暴的每一个例子,我看到在喷

“的Thread.sleep(1000)”?

也许它与我的第一个问题有关。

我希望我的问题很清楚。 提前谢谢!

回答

0

我读到这些行处理后的最后一个元组被认为是正常的。不是吗?

这些只是INFO消息。所以不用担心它们。

如果我把代码放在包含submitTopology()方法的行后面,代码就会在元组完成之前运行。

如果您提交了拓扑结构,拓扑将在后台执行(即多线程)。这是必需的,因为拓扑运行是“永久”的(直到你明确地停止它 - 或者你的Java应用程序终止,因为你正在运行本地模式)。

“拓扑完成后”的运行代码与Storm概念并不一致,因为Strom是一个流式处理系统,并且“处理中没有结束”(输入流处于无限状态,因此处理将永久运行)。如果你想处理一个有限的数据集,你可能想要考虑像Flink或Spark这样的批处理框架。

因此,如果您想在Storm中完成这项工作,您需要能够确定何时处理所有数据。因此,在拓扑提交之后,在处理完所有数据之后,您将显式阻止并等待。

但是,对于您的用例,您为什么不直接从最后一个螺栓中打印结果?

关于Thread.sleep()我不确定你指的是什么样的例子。不知道为什么有人应该把它用于生产。也许是为了演示目的而人为地减慢处理速度。

+0

谢谢你的详尽回复,马提亚! –

+0

如果这回答你的问题,随时接受和/或upvote我的答案。 –