dstream

    0热度

    1回答

    列表的DSTREAM我有串的名单,但我不能找到一个办法列表改为火花流的DSTREAM。 我尝试这样做: val tmpList = List("hi", "hello") val rdd = sqlContext.sparkContext.parallelize(Seq(tmpList)) val rowRdd = rdd.map(v => Row(v: _*)) 但日食说sparkCo

    2热度

    3回答

    我已经通过this stackoverflow的问题,根据答案它创建一个DStream与批间隔只有一个RDD。 例如: 我的批次间隔是1分钟和Spark流作业从卡夫卡主题消耗数据。 我的问题是,DStream中可用的RDD是否在最后一分钟内提取/包含整个数据?我们需要设置什么标准或选项来提取最后一分钟创建的所有数据? 如果我有一个带有3个分区的卡夫卡主题,并且所有3个分区都包含最后一分钟的数据,那

    1热度

    1回答

    我有一个从Kafka消耗的流式作业(使用createDstream)。 它的“身份证” [id1,id2,id3 ..] 流我有一个接受的id的数组,并做了一些外部呼叫并接收回一些信息说“T”每个ID [id:t1,id2:t2,id3:t3...] 的工具或API 我希望在调用实用程序保留Dstream时保留DStream。我不能在Dstream rdd上使用地图转换,因为它会调用每个i

    0热度

    1回答

    我有一个问题,从spark流(pyspark)索引数据到elasticserach。数据类型为dstream。下面它的外观 (u'01B', 0) (u'1A5', 1) .... 下面是我使用的弹力指数:指数=的CLU和类型=数据 GET /clus/_mapping/data { "clus": { "mappings": { "data": {

    1热度

    1回答

    我是Spark Streaming上的新手,我陷入困境,试图找出如何处理这个问题,因为我发现了很多单个(K,V)对的例子,但还有更多。为了找到使用Spark对Java进行转换的最佳方法,我将不胜感激。 让我简要介绍一下情况, 的目标是获得一个集中的时间窗口内的元素的错误率。 考虑下面的输入, (A, Error) (B, Success) (B, Error) (B, Success) (

    4热度

    1回答

    在以下代码中,似乎函数fn1 & fn2以顺序方式应用于inRDD,正如我在Spark Web UI的阶段部分中看到的。 DstreamRDD1.foreachRDD(new VoidFunction<JavaRDD<String>>() { public void call(JavaRDD<String> inRDD) { inRDD.foreach(fn1

    2热度

    1回答

    我有通过DStream从Kafka到达的数据。我想要执行特征提取以获得一些关键字。 我不想等待所有数据的到来(因为它打算是连续的流,可能永远不会结束),所以我希望能够以大块的方式执行提取 - 如果精度将会忍受一点。 到目前为止,我放在一起类似的东西: def extractKeywords(stream: DStream[Data]): Unit = { val spark: Spar

    0热度

    1回答

    以下两个相同吗? val dstream = stream.window(Seconds(60), Seconds(1)) val x = dstream.map(x => ...) 和 val dstream = stream.window(Seconds(60), Seconds(1)) val x = dstream.transform(rdd => rdd.map(x => ...

    0热度

    1回答

    我正在使用Dstream(Spark Streaming)的Transform API对数据进行排序。 我正在使用netcat从TCP套接字读取数据。 继使用的代码行: myDStream.transform(rdd => rdd.sortByKey()) 无法找到函数sortByKey。任何人都可以请帮助这一步中的问题是什么?

    1热度

    1回答

    我正在设置Apache Spark长时间运行的流式作业,以使用InputDStream执行(非并行化)流式传输。 我想要实现的是当队列上的批处理时间过长(基于用户定义的超时时间)时,我希望能够跳过批处理并完全放弃它 - 并继续其余部分执行。 我无法在Spark API或在线上找到解决这个问题的方法 - 我使用StreamingContext awaitTerminationOrTimeout进行了