2016-12-07 101 views
0

我有以下卡桑德拉表用法:分区键检索与joinWithCassandraTable

CREATE TABLE listener.snapshots_geohash 
(
    created_date text, -- date when record have come to the system 
    geo_part text, -- few signs of geo hash - just for partitioning 
    when timestamp, -- record creation date 
    device_id text, -- id of device produced json data (see snapshot column) 
    snapshot text, -- json data, should be aggregated by spark 
    PRIMARY KEY ((created_date, geo_part), when, device_id) 
) 

每天早上聚合应用程序应该加载前一天和快照列总JSON数据。聚合将通过geohash对数据进行分组,这就是为什么它的部分被选为分区键的一部分。

我知道使用joinWithCassandraTable加载Cassandra中的数据是有效的 - 但为此,我必须从(created_date,geo_part)对中构建RDD。尽管我知道created_date值,但我无法列出geo_part值 - 因为它只是geohash的一部分,并且其值不连续。所以我有办法运行select distinct created_date, geo_part from ks.snapshots并从其结果创建RDD。问题是如何使用spark 2.0.2和cassandra-connector 2.0.0-M3运行此选择,或者可能有其他方法?

回答

0

我发现的方式运行CQL查询与CassandraConnector获取卡桑德拉分区键:

val cassandraConnector = CassandraConnector(spark.sparkContext.getConf) 
val distinctRows = cassandraConnector.withSessionDo(session => { 
    session.execute(s"select distinct created_date, geo_part from ${keyspace}.$snapshots_table") 
}).all().map(row => {TableKeyM(row.getString("created_date"), row.getString("geo_part"))}).filter(k => {days.contains(k.created_date)}) 
val data_x = spark.sparkContext.parallelize(distinctRows) 

表结构设计有以下问题:卡桑德拉不允许添加WHERE CREATED_DATE =“...”条款到选择不同的created_date,geo_part,它需要获取整个列表对并在应用程序中对其进行过滤。

替代解决方案可以使分区键连续。如果聚合是按小时完成的 - 那么分区键可以是(created_date,hour),24小时可以在应用程序中列出。如果每天有24个分区不够用,并且聚合有组by by geohash,可以坚持geohash的重要部分 - 但它应该被翻译成可数的东西 - 例如geoPart.hash()%desiredNumberOfSubpartitions