我使用DSE 5.1(火花2.0.2.6和卡桑德拉3.10.0.1652)火花的作业(斯卡拉)写类型日期卡桑德拉
我卡桑德拉表:
CREATE TABLE ks.tbl (
dk int,
date date,
ck int,
val int,
PRIMARY KEY (dk, date, ck)
) WITH CLUSTERING ORDER BY (date DESC, ck ASC);
数据如下:
dk | date | ck | val
----+------------+----+-----
1 | 2017-01-01 | 1 | 100
1 | 2017-01-01 | 2 | 200
我的代码必须读取这些数据,并写同样的事情,但与昨天的日期(它编译成功):
package com.datastax.spark.example
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import com.github.nscala_time.time._
import com.github.nscala_time.time.Imports._
object test extends App {
val conf = new SparkConf().setAppName("DSE calculus app TEST")
val sc = new SparkContext(conf)
val yesterday= (DateTime.now - 1.days).toString(StaticDateTimeFormat.forPattern("yyyy-MM-dd"))
val tbl = sc.cassandraTable("ks","tbl").select("dk","date","ck","val").where("dk=1")
tbl.map(row => (row.getInt("dk"),yesterday,row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")
sc.stop()
sys.exit(0)
}
当我运行这个程序:
dse spark-submit --class com.datastax.spark.example.test test-assembly-0.1.jar
它不能正确写入卡桑德拉。看来日期变量没有正确插入地图中。 我得到的错误是:
Error:
WARN 2017-05-08 22:23:16,472 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <IP of one of my nodes>): java.io.IOException: Failed to write statements to ks.tbl.
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:207)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
然而,当我直接在地图语句插入日期(字符串)如下代码没有正确地插入数据:
tbl.map(row => (row.getInt("dk"),"2017-02-02",row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")
还插入数据正确,如果我昨天设置为一个整数(自纪元以来的天数)。这将是最佳的,但不能得到'昨天'表现这种方式
编辑:这不会正确插入数据,实际上。无论将“昨天”设置为1还是100,000,000,它始终插入纪元('1970-01-01)
失败的代码表现正确,并且正如我在DSE Spark控制台中所期望的那样。
我只是无法弄清楚我做错了什么。欢迎任何帮助。
编辑2:excecutor 0 stderr log确实表明它试图在列日期插入一个空值,这显然不可能,因为它是一个聚类列。
您需要发布执行日志。由于驱动程序只能看到“写入语句失败”,因此它不会导致造成它的个别尝试和失败。 – RussS
你的意思是来自Spark主UI中应用程序的所有执行者的标准输出stderr? – Mematematica
至少你得到了例外:) :) – RussS