为了在卡桑德拉做一个准备好的声明,然后注册数据和结构化的火花流,同时还处理流,你需要:
- 进口com.datastax.driver.core。会议
- 进口com.datastax.spark.connector.cql.CassandraConnector
然后,建立你的连接器:
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
有两个会议和连接器,你现在就可以给你打电话声明斯卡拉类
connector.withSessionDo { session =>
Statements.PreparedStatement()
}
你终于可以写的准备好的声明功能用Cassandra将数据写入以下函数完成,cql是结合变量到准备好的声明,并执行它的功能:
private def processRow(value: Commons.UserEvent) = {
connector.withSessionDo { session =>
session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream))
}
}
当然,你必须在foreach作家调用这个函数(processRow)
// This Foreach sink writer writes the output to cassandra.
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Commons.UserEvent) = {
processRow(value)
}
override def close(errorOrNull: Throwable) = {}
}
val query =
ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start