2
我在Spark Streaming中使用spark 1.6.0,并且遇到了广泛操作中的一个问题。当我从DStream加入PipelinedRDD和RDD时,应用程序挂起
代码示例: RDD名为“a”,其类型为:class'pyspark.rdd.PipelinedRDD'。
“一个” 被接收到为:
# Load a text file and convert each line to a Row.
lines = sc.textFile(filename)
parts = lines.map(lambda l: l.split(","))
clients = parts.map(lambda p: Row(client_id=int(p[0]), clientname=p[1] ...))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(clients)
schemaPeople.registerTempTable("clients")
client_list = sqlContext.sql("SELECT * FROM clients")
和后:
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry)))
有第二部分 “B” 型的类 'pyspark.streaming.dstream.TransformedDStream'。 我收到了 “B”,从水槽:
DStreamB = flumeStream.map(lambda tup: function_for_map(tup[1].encode('ascii','ignore')))
和
b = DStreamB.map(lambda event: (int(event[2]), value_from_event(event)))
问题是:当我尝试为加盟:
mult = b.transform(lambda rdd: rdd.join(a))
我的应用程序在此阶段挂起(现在我在b.pprint()之后和stage .join()之前显示屏幕)
但是,当我加入:
声明RDD “测试”:
test = sc.parallelize(range(1, 100000)).map(lambda k: (k, 'value'))
做:
mult0 = a.join(test) mult = b.transform(lambda rdd: rdd.join(mult0))`
然后它的工作原理(!! ):
我也可以这样做:
mult0 = b.transform(lambda rdd: rdd.join(test))
这样:
我有RDDS “a” 和 “测试”。 DStream“b”。 我可以乘:
- 一个*测试* B
- B *测试
但我不能这样做 'B *一个'。
任何帮助表示赞赏!谢谢!