2014-11-05 91 views
4

我有一个DStream“Crowd”,我想将“Crowd”中的每个元素写入套接字。当我试图从该套接字读取时,它不打印任何东西。我使用的代码下面一行:将Spark Streaming输出写入套接字

val server = new ServerSocket(4000,200); 
val conn = server.accept() 
val out = new PrintStream(conn.getOutputStream()); 
crowd.foreachRDD(rdd => {rdd.foreach(record=>{out.println(record)})}) 

但如果使用(这不是我想要的,虽然):

crowd.foreachRDD(rdd => out.println(rdd)) 

它写的东西套接字。

我怀疑使用rdd.foreach()有问题。虽然它应该工作。我不知道我错过了什么。

回答

4

DStream封闭外的代码在驱动程序中执行,而rdd.foreach(...)将在RDD的每个分布式分区上执行。 因此,在驱动程序的机器上创建了一个套接字,并且该作业试图在另一台机器上写入该套接字 - 这不会出于显而易见的原因。

DStream.foreachRDD在驱动程序上执行,所以在这种情况下,套接字和计算在同一个主机上执行。因此,它的工作。

由于RDD计算的分布式特性,这种服务器套接字方法将很难做到,因为动态服务发现成为一个挑战,即“我的服务器套接字在哪里打开?”。看看一些可以让你集中访问分布式数据的系统。对于这种流式传输过程,Kafka是一个很好的选择。

+0

谢谢。理解这很有帮助。所以为了克服这个服务发现挑战,它修改了以下语句:crowd.foreachRDD(rdd => {rdd.collect.foreach(record => {out.println(record)})})。这将工作(它的工作),因为它会从工人收集rdd分区,并将其发送到将写入套接字的驱动程序。我希望这是正确的方法。你怎么看待这个...... collect.foreach .....东西? – vick 2014-11-06 09:39:47

0
crowd.foreachRDD(rdd => {rdd.collect.foreach(record=>{out.println(record)})}) 

您在您的意见建议的代码将正常工作,但在这种情况下,你必须收集RDD的所有记录中的驱动程序。如果记录的数量很少,那么可以,但是如果记录的数量大于驾驶员的记忆将成为瓶颈。你的第一次尝试应该总是在客户端处理数据。请记住,RDD分布在工作机器上,这意味着首先需要将RDD中的所有记录都添加到驱动程序中,从而增加通信量,这在分布式计算中是一种牺牲。如上所述,只有当RDD中的记录有限时,您的代码才会正常。

我正在处理类似的问题,我一直在搜索如何将连接池和序列化到客户机。如果有些机构有任何答案,将会很好。

0

Here in the official documentation你有答案!

你必须创建foreachRDD函数内的连接,如果你想这样做优化,你需要创建一个连接的“池”,然后把你想要的foreachPartition功能的内部连接,并呼叫到foreach函数通过该连接发送元素。这是最好的方式做示例代码:

dstream.foreachRDD { rdd => 
    rdd.foreachPartition { partitionOfRecords => 
    // ConnectionPool is a static, lazily initialized pool of connections 
    val connection = ConnectionPool.getConnection() 
    partitionOfRecords.foreach(record => connection.send(record)) 
    ConnectionPool.returnConnection(connection) // return to the pool for future reuse 
    } 
} 

在任何情况下,检查的其他意见,因为他们提供了关于这个问题的背景下良好的知识。