2017-09-25 99 views
0

我一直在为Apache Beam工作几天。我想快速迭代我正在工作的应用程序,并确保我构建的管道没有错误。在火花中,我们可以使用sc.parallelise,当我们应用某些操作时,我们可以获得我们可以检查的值。从Apache Beam管道收集输出并将其显示到控制台

同样,当我读到关于Apache梁,我发现,我们可以创建一个PCollection与它使用下面的语法

with beam.Pipeline() as pipeline: 
    lines = pipeline | beam.Create(["this is test", "this is another test"]) 
    word_count = (lines 
        | "Word" >> beam.ParDo(lambda line: line.split(" ")) 
        | "Pair of One" >> beam.Map(lambda w: (w, 1)) 
        | "Group" >> beam.GroupByKey() 
        | "Count" >> beam.Map(lambda (w, o): (w, sum(o)))) 
    result = pipeline.run() 

我其实是想打印结果到控制台中工作。但是我找不到任何文档。

有没有办法将结果打印到控制台,而不是每次都将其保存到文件中?

回答

0

在进一步探索和了解如何为我的应用程序编写测试用例之后,我想出了将结果打印到控制台的方法。请不要说我现在正在将所有东西都运行到单个节点机器上,并试图了解由apache提供的功能,以及如何在不影响行业最佳实践的情况下采用它。

所以,这是我的解决方案。在我们的管道的最后阶段,我们可以引入一个地图功能,将打印结果到控制台或累积的结果在一个变量以后,我们可以打印变量,看值

import apache_beam as beam 

# lets have a sample string 
data = ["this is sample data", "this is yet another sample data"] 

# create a pipeline 
pipeline = beam.Pipeline() 
counts = (pipeline | "create" >> beam.Create(data) 
    | "split" >> beam.ParDo(lambda row: row.split(" ")) 
    | "pair" >> beam.Map(lambda w: (w, 1)) 
    | "group" >> beam.CombinePerKey(sum)) 

# lets collect our result with a map transformation into output array 
output = [] 
def collect(row): 
    output.append(row) 
    return True 

counts | "print" >> beam.Map(collect) 

# Run the pipeline 
result = pipeline.run() 

# lets wait until result a available 
result.wait_until_finish() 

# print the output 
print output 
0

我知道它不是”你要求什么,但为什么不把它存储到一个文本文件?它总是比通过stdout打印它好,并且它不易挥发