2017-08-11 80 views
5

1)我们使用结构化流式处理从kafka进行消费,并将处理后的数据集写入s3。 我们也想把处理过的数据写到kafka前进,是否有可能从同一个流式查询中做到这一点? (火花版本2.1.1)Spark结构化流式传输:多个接收器

2)在日志中,我看到了流式查询进度输出,并且我有一个日志采样持续时间JSON,有些人可以提供更多清晰度addBatchgetBatch

3)触发执行 - 是否需要处理提取的数据并写入接收器?

"durationMs" : { 
    "addBatch" : 2263426, 
    "getBatch" : 12, 
    "getOffset" : 273, 
    "queryPlanning" : 13, 
    "triggerExecution" : 2264288, 
    "walCommit" : 552 
    }, 

问候 aravias

回答

6

1)是的。

在Spark 2.1.1中,您可以使用writeStream.foreach将数据写入卡夫卡。在这个博客中有一个例子:https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

或者您可以使用Spark 2.2.0,它添加Kafka接收器以支持正式写入Kafka。

2)getBatch测量从源创建DataFrame的时间。这通常很快。 addBatch可测量在接收器中运行DataFrame的时间。

3)triggerExecution措施多长时间运行触发器执行,通常几乎相同getOffset + getBatch + addBatch

+0

感谢您的答复,可以请你澄清如下 - 写从源主题创建一个数据集时都S3与卡夫卡检查点已为每个汇另行规定,因此,它是公平的期望即使使用从该源创建的同一DataSet写入这些2个差异汇,数据将从源主题读取两次? – user2221654

+0

如果您有两个接收器,这意味着您有两个查询。每个查询都有自己的Kafka使用者,并独立地从Kafka获取数据。 – zsxwing

0

有类似的情况有问题,我试图写数据到两个kafka接收器。我正在获取classCastException,如下所示。代码看起来像这样

final Dataset<String> eventDataset = feedMessageDataset 
      .map(toEvent(nodeCodeToAliasBroadcast), OBSERVED_EVENT_ENCODER) 
      .map(SparkFeedReader::serializeToJson, STRING()); 
    final StreamingQuery eventQuery = kafkaStreamWriterForEvents(eventDataset, configuration, feedReaderEngineName).start(); 

    final Dataset<String> splunkEventDataset = feedMessageDataset 
      .map(toSplunkEvent(), SPLUNK_OBSERVED_EVENT_ENCODER) 
      .filter(event -> !event.getIndicatorCode().equals(HEARBEAT_INDICATOR_CODE)) 
      .map(SparkFeedReader::serializeToJson, STRING()); 

    final StreamingQuery splunkEventQuery = kafkaStreamWriterForSplunkEvents(splunkEventDataset, configuration, feedReaderEngineName).start(); 

如果我评论一个接收器它工作正常。 这发生在spark 2.2.0中。

java.lang.ClassCastException: x.SplunkObservedEvent cannot be cast to x.ObservedEvent 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91) 
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 
相关问题