2017-08-01 93 views
0

我在Apache Flink的scala中运行一个简单的脚本。 我从Apache Kafka制作人处读取数据。这是我的代码。卡夫卡/ Flink与地图功能的集成问题

import java.util.Properties 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 
import org.json4s._ 
import org.json4s.native.JsonMethods._ 
import org.json4s.native.Serialization 
import org.json4s.native.Serialization.{read, write} 

object App { 

    def main(args : Array[String]) { 

case class Sensor2(sensor_name: String, start_date: String, end_date: String, data: String, stt: Int) 

val properties = new Properties(); 
    properties.setProperty("bootstrap.servers", "localhost:9092"); 
    properties.setProperty("group.id", "test"); 

    val env = StreamExecutionEnvironment.getExecutionEnvironment() 
    val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties) 
    val stream1 = env 
    .addSource(consumer1) 
    .flatMap(raw => JsonMethods.parse(raw).toOption) 

    env.execute() 

} 

} 

我上flatMap一个“缺少的参数类型”错误(但同样的错误我得到的,当我尝试使用其他功能,如地图或过滤器)。 我不知道要解决这个问题。 有什么帮助吗?

LF

回答

0

您应该使用的StreamExecutionEnvironment阶API版本。

改变你的进口:

import java.util.Properties 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 
import org.json4s._ 
import org.json4s.native.JsonMethods 
+0

谢谢。这解决了缺少的参数类型错误。 –

+0

为了将来自Kafka的DataStream [String]解析为DataStream [Sensor2]类,我不得不对代码进行一些修改。工作的代码是: 'VAL consumer1 =新FlinkKafkaConsumer010 [字符串]( “topics1”,新SimpleStringSchema(),属性) VAL流1 = ENV .addSource(consumer1) VAL S1 = {stream1.map X = > { 隐式val格式= DefaultFormats JsonMethods.parse(x).extract [Sensor2] } } –