2

我想从Kafka读取数据并通过Spark RDD存储到Cassandra表中。值分裂不是(String,String)的成员

获取错误,而编译代码:

/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String) 

[error]  val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 
[error]            ^
[error] one error found 

[error] (compile:compileIncremental) Compilation failed 

下面的代码:当我通过互动spark-shell手动运行该代码,它工作正常,但而​​错误编译代码来。

// Create direct kafka stream with brokers and topics 
val topicsSet = Set[String] (kafka_topic) 
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

// Create the processing logic 
// Get the lines, split 
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long")) 
+0

@RameshMaharjan:请不要格式化专有名词作为代码。卡夫卡和卡桑德拉只需要一个初始资金,就是这样 - 他们本身并不是代码。然而,像'spark-shell'这样的东西都可以,因为代码格式适合于控制台I/O(假定'spark-shell'是一个键入的命令)。 – halfer

回答

1

KafkaUtils.createDirectStream返回的键和值的元组(因为在卡夫卡消息被任选键控)。在你的情况下,它的类型是(String, String)。如果您要拆分的,你必须首先把它拿出来:

val lines = 
    messages 
    .map(line => line._2.split(',')) 
    .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 

或者使用部分函数的语法:

val lines = 
    messages 
    .map { case (_, value) => value.split(',') } 
    .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 
2

在卡夫卡的所有消息是有方向性的。原始的Kafka流,在这种情况下为messages,是一个元组流(key,value)

而且由于编译错误指出,元组上没有split方法。

我们要在这里做的是:

messages.map{ case (key, value) => value.split(','))} ... 
相关问题