0
我在Apache Flink的scala中运行一个简单的脚本。 我从Apache Kafka制作人处读取数据。这是我的代码。卡夫卡/ Flink与地图功能的集成问题
import java.util.Properties
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.json4s.native.Serialization.{read, write}
object App {
def main(args : Array[String]) {
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data: String, stt: Int)
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties)
val stream1 = env
.addSource(consumer1)
.flatMap(raw => JsonMethods.parse(raw).toOption)
env.execute()
}
}
我上flatMap一个“缺少的参数类型”错误(但同样的错误我得到的,当我尝试使用其他功能,如地图或过滤器)。 我不知道要解决这个问题。 有什么帮助吗?
LF
谢谢。这解决了缺少的参数类型错误。 –
为了将来自Kafka的DataStream [String]解析为DataStream [Sensor2]类,我不得不对代码进行一些修改。工作的代码是: 'VAL consumer1 =新FlinkKafkaConsumer010 [字符串]( “topics1”,新SimpleStringSchema(),属性) VAL流1 = ENV .addSource(consumer1) VAL S1 = {stream1.map X = > { 隐式val格式= DefaultFormats JsonMethods.parse(x).extract [Sensor2] } } –