0

我使用Spark Structured Streaming进行实时机器学习,并且希望将预测存储在我的Cassandra集群中。Spark(SQL /结构化流式传输)Cassandra - PreparedStatement

由于我处于流式上下文中,每秒执行多次相同的请求,所以一个强制优化是使用PreparedStatement。

在卡桑德拉火花驱动程序(https://github.com/datastax/spark-cassandra-connector)有没有办法在使用PreparedStatement(Scala中或Python,我不考虑Java作为一个选项)

我应该用一个斯卡拉(https://github.com/outworkers/phantom)/蟒蛇( https://github.com/datastax/python-driver)cassandra驱动程序? 它是如何工作的,那么我的连接对象需要被序列化以传递给工人?

如果有人能帮助我!

谢谢:)

回答

1

为了在卡桑德拉做一个准备好的声明,然后注册数据和结构化的火花流,同时还处理流,你需要:

  • 进口com.datastax.driver.core。会议
  • 进口com.datastax.spark.connector.cql.CassandraConnector

然后,建立你的连接器:

val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf) 

有两个会议连接器,你现在就可以给你打电话声明斯卡拉类

connector.withSessionDo { session => 
Statements.PreparedStatement() 

}

你终于可以写的准备好的声明功能用Cassandra将数据写入以下函数完成,cql是结合变量到准备好的声明,并执行它的功能:

private def processRow(value: Commons.UserEvent) = { 
    connector.withSessionDo { session => 
    session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream)) 
} 

}

当然,你必须在foreach作家调用这个函数(processRow

 // This Foreach sink writer writes the output to cassandra. 
import org.apache.spark.sql.ForeachWriter 
val writer = new ForeachWriter[Commons.UserEvent] { 
    override def open(partitionId: Long, version: Long) = true 
    override def process(value: Commons.UserEvent) = { 
    processRow(value) 
    } 
    override def close(errorOrNull: Throwable) = {} 
} 

val query = 
    ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start 
相关问题