2016-12-30 567 views
1

我是Spark新手。Spark将rdd字段值替换为另一个值

我可以用看在我elasticsearch数据库中的第一RDD的内容:

print(es_rdd.first()) 
>>>(u'1', {u'name': u'john'}) 

我也可以用得到我的DSTREAM所需的值:

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list":brokers}) 
name=kvs.map(lambda x: x[1]) 
name.pprint() 
>>>>robert 

我打算更换rdd“name”:“john”with“robert”,然后在弹性搜索中使用saveAsNewAPIHadoopFile()插入新的rdd

我该怎么做? 有没有办法将“robert”映射到一个新的rdd中?喜欢的东西..

new_rdd=es_rdd.map(lambda item: {item[0]:name}) 

感谢

回答

2

我们可以根据索引列表与另一个RDD代替RDD的一部分。例如,将(RDD)中的元素从1,2,3,4替换为2,3,4,4。

a = sc.parallelize([1,2,3,4]) 
repVals = sc.parallelize([2,3,4]) 
idx = sc.parallelize([0,1,2]) . # idx has the same number of values with repVals 

a = a.zipWithIndex() 
ref = idx.zip(repVals).collectAsMap() # create a dictionary of format {idex:repValue} 

anew = a.map(lambda x:ref[x[1]] if x[1] in ref else x[0]) 
anew.collect() 

结果表明[2,3,4,4-]

相关问题