我是Storm的新手。我正在将它用于大学项目。如何停止Storm中的元组处理并执行其他代码
我创建了我的拓扑,连接到MySql数据库的Spout和两个Bolts。与喷口相连的第一个螺栓准备并去除元组中不需要的信息;第二,做元组的过滤。
我在本地模式下工作。
我的问题是: 为什么运行拓扑后,在我的控制台中,我看到输出像下面的行?
38211 [Thread-14-movie-SPOUT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30] 67846 [Thread-10-__acker] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 67846 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 67852 [Thread-10-__acker] INFO backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]] 67853 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]] 67854 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 67855 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]]
,我读了这些行处理的最后一个元组后,被认为是正常的。不是吗?
如何在提交拓扑后运行其他代码?例如,我想打印在第二个螺栓中完成的过滤结果,保存在HashMap中。 如果我将代码放在包含submitTopology()方法的行之后,代码将在元组完成之前运行。
第二个和最后一个问题是:为什么在风暴的每一个例子,我看到在喷
“的Thread.sleep(1000)”?
也许它与我的第一个问题有关。
我希望我的问题很清楚。 提前谢谢!
谢谢你的详尽回复,马提亚! –
如果这回答你的问题,随时接受和/或upvote我的答案。 –