2017-01-06 37 views
1

我有一个从Kafka消耗的流式作业(使用createDstream)。 它的“身份证”在Spark流式作业中调用实用程序(外部)

[id1,id2,id3 ..] 

流我有一个接受的id的数组,并做了一些外部呼叫并接收回一些信息说“T”每个ID

[id:t1,id2:t2,id3:t3...] 
的工具或API

我希望在调用实用程序保留Dstream时保留DStream。我不能在Dstream rdd上使用地图转换,因为它会调用每个id,而且该实用程序正在接受id的集合。

Dstream.map(x=> myutility(x)) -- ruled out 

如果我用

Dstream.foreachrdd(rdd=> myutility(rdd.collect.toarray)) 

我失去了DStream。我需要保留DStream用于下游处理。

+0

重新设计'myutility',以便它可以正确地并行工作?在Spark中有单一的本地收藏是不行的。并行的 – user7337271

+0

@ user7337271由Dstream.foreachrdd(rdd => myutility(rdd.collect.toarray))实现,但是丢失了DStream –

+0

这里没有并行性。整个机构'foreachrdd(rdd => myutility(rdd.collect.toarray))'在驱动程序本地执行。你可以'转换(rdd => sc.parallelize(myutility(rdd.collect.toarray)))',但它并不能解决这个问题。 – user7337271

回答

3

实现外部批量调用的方法是直接在分区级别转换DStream中的RDD。

的图案看起来像这样:

val transformedStream = dstream.transform{rdd => 
    rdd.mapPartitions{iterator => 
     val externalService = Service.instance() // point to reserve local resources or make server connections. 
     val data = iterator.toList // to act in bulk. Need to tune partitioning to avoid huge data loads at this level 
     val resultCollection = externalService(data) 
     resultCollection.iterator 
    } 
} 

该方法过程中使用的簇中的可用资源并联底层RDD的每个分区。请注意,需要为每个分区(而不是每个元素)实例化与外部系统的连接。

+0

谢谢,得到了我一直在寻找的东西 –

相关问题