dstream

    5热度

    1回答

    我正在使用Apache Spark Streaming 1.6.1编写连接两个键/值数据流并将输出写入HDFS的Java应用程序。这两个数据流包含K/V字符串,并通过使用textFileStream()从HDFS周期性地从Spark中获取。 两个数据流不同步,这意味着在时间t0在stream1中的一些密钥可能在时间t1在stream2中出现,反之亦然。因此,我的目标是加入这两个流并计算“剩余”键,

    1热度

    1回答

    我有这个简单的卡夫卡流 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) // Each Kafka message is a flight val flights = messages.map(_._2

    0热度

    1回答

    我使用Spark笛卡尔函数来生成一个列表N对值。 我然后这些值映射到每个用户之间产生一距离度量: val cartesianUsers: org.apache.spark.rdd.RDD[(distance.classes.User, distance.classes.User)] = users.cartesian(users) cartesianUsers.map(m => manDista

    2热度

    1回答

    在函数中,有没有办法在使用filter后返回两个DStream? 例如,当我过滤DStream时,已过滤的将存储在DStream中,未过滤的将存储在另一个DStream中。

    -1热度

    1回答

    我加入了一些DSTREAM的在一起,使DSTREAM当前“数据类型”看起来像这样(键和值): DStream[(Long,((DateTime,Int),((Int,Double),Double)))] 但我想: DStream[(Long,DateTime,Int,Int,Double,Double)] 或 DStream[(Long,(DateTime,Int,Int,Double,D

    0热度

    1回答

    我有两个窗口的dstream,我想拉链像RDD中的正常压缩。 注意:主要目标是计算窗口dstream的均值和stdv,以防有更好的方法来计算。

    3热度

    1回答

    例如之间的差别,怎么会 x.foreach(rdd => rdd.cache()) 是不同 x.foreachRDD(rdd => rdd.cache()) 注意x是这里的DStream。

    0热度

    1回答

    是否有可能从spark中的单个DStream中获取多个DStream? 我的用例如下:我从HDFS文件获取日志数据流。 日志行包含一个id(id = xyz)。 我需要根据ID以不同的方式处理日志行。 所以我试图从输入Dstream的每个ID不同的Dstream。 我找不到任何与文档相关的东西。 有谁知道这是如何在Spark中实现的,或指向任何链接。 感谢

    2热度

    1回答

    我正在寻找最佳的解决方案来积累Spark DStream中最后N个消息。我还想指定要保留的消息数量。 例如,给出下面的流,我想保留最后的3个要素: Iteration New message Downstream 1 A [A] 2 B [A, B] 3 C [A, B, C] 4 D [B, C, D] 到目前为止,我期待在上DSTREAM以下方法:

    2热度

    1回答

    Spark Streaming中的DStream中的每个微批处理结束时是否可以执行某些操作?我的目标是计算Spark处理的事件的数量。 Spark Streaming给了我一些数字,但平均值似乎也总结为零值(因为一些微批是空的)。 例如我确实收集了一些统计数据并希望将它们发送到我的服务器,但收集数据的对象仅在某个批处理中存在,并且将从头开始初始化以用于下一批处理。我希望能够在完成批处理和对象消失之