3

所以我有一个蟒蛇Stream-sourced DataFrame df,它具有所有我想要放入卡斯安德表的spark-cassandra-connector表中的数据。我已经在两个方面试着这样做:如何将流数据集写入Cassandra?

df.write \ 
    .format("org.apache.spark.sql.cassandra") \ 
    .mode('append') \ 
    .options(table="myTable",keyspace="myKeySpace") \ 
    .save() 

query = df.writeStream \ 
    .format("org.apache.spark.sql.cassandra") \ 
    .outputMode('append') \ 
    .options(table="myTable",keyspace="myKeySpace") \ 
    .start() 

query.awaitTermination() 

但是我不断获取此错误,分别为:

pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame; 

java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing. 

有反正我可以把我的流DataFrame放入我的Cassandra表中?

回答

6

Spark Cassandra Connector中目前没有用于Cassandra的流式传输Sink。您将需要实施自己的Sink或等待它变得可用。

如果您使用的是Scala或Java,则可以使用foreach运算符并使用Using Foreach中所述的ForeachWriter

+1

有什么办法可以将我的Streaming DataFrame转换为非Streaming数据框? – user2361174

+2

不,没有转换(至少没有我知道的) – RussS

+0

您是否在Java中有工作示例?看起来所有的解决方案来''CassandraConnector.withSessionDo'这需要Scala实现特质;所以没有运气与Kotlin或Java .. – Reith