2017-09-22 16 views
0

下面的简单程序从kafka流中读取并每隔5分钟写入一次CSV文件及其火花流。在“驱动程序”(不在执行程序中)中进行微批处理后,是否可以调用Java函数?在Spark流中的每个微批次之后调用java函数

我同意它不是一个很好的习惯来调用流中的任意代码,但这是我们的数据量很低的特殊情况。请adivse。谢谢。

public static void main(String[] args) throws Exception { 

    if (args.length == 0) 
     throw new Exception("Usage program configFilename"); 
    String configFilename = args[0]; 

    addShutdownHook(); 

    ConfigLoader.loadConfig(configFilename); 
    sparkSession = SparkSession 
      .builder() 
      .appName(TestKafka.class.getName()) 
      .master(ConfigLoader.getValue("master")).getOrCreate(); 
    SparkContext context = sparkSession.sparkContext(); 
    context.setLogLevel(ConfigLoader.getValue("logLevel")); 

    SQLContext sqlCtx = sparkSession.sqlContext(); 
    System.out.println("Spark context established"); 

    DataStreamReader kafkaDataStreamReader = sparkSession.readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", ConfigLoader.getValue("brokers")) 
      .option("group.id", ConfigLoader.getValue("groupId")) 
      .option("subscribe", ConfigLoader.getValue("topics")) 
      .option("failOnDataLoss", false); 
    Dataset<Row> rawDataSet = kafkaDataStreamReader.load(); 
    rawDataSet.printSchema(); 
    rawDataSet.createOrReplaceTempView("rawEventView1"); 

    rawDataSet = rawDataSet.withColumn("rawEventValue", rawDataSet.col("value").cast("string")); 
    rawDataSet.printSchema(); 
    rawDataSet.createOrReplaceTempView("eventView1"); 
    sqlCtx.sql("select * from eventView1") 
      .writeStream() 
      .format("csv") 
      .option("header", "true") 
      .option("delimiter", "~") 
      .option("checkpointLocation", ConfigLoader.getValue("checkpointPath")) 
      .option("path", ConfigLoader.getValue("recordsPath")) 
      .outputMode(OutputMode.Append()) 
      .trigger(ProcessingTime.create(Integer.parseInt(ConfigLoader.getValue("kafkaProcessingTime")) 
        , TimeUnit.SECONDS)) 
      .start() 
      .awaitTermination(); 
} 
+0

您想运行哪种代码?它是一种副作用,因为它不返回任何值?为什么它必须在司机中发生? – raam86

+0

这可能类似于通过电子邮件通知微量批次完成, – Manjesh

回答

0

你应该能够做到这一点的是这样的:

kafkaDataStreamReader.map{value -> mySideEffect(); value} 

这将每次从卡夫卡收到微量分批时间调用函数mySideEffect,怎么过的我不推荐这样做,更好的方法是观看保存CSV文件的文件夹,或者仅仅检查网络用户界面,考虑到每隔几秒发生一次微量批次,您最多只会收到一封电子邮件。如果您想确保流式传输应用程序已启动,您可以每隔几秒查询一次Spark REST API并确保它仍然运行。 https://spark.apache.org/docs/latest/monitoring.html

相关问题