0

我有一个Spark Streaming应用程序,它有多个数据流(DStreams)写入同一个Cassandra表。当在大量的随机数据上测试我的应用程序时,我收到Spark Cassandra Connector的一个错误信息,这些错误信息对调试非常有用。该错误看起来像这样:错误处理Spark Cassandra连接器中的错误处理

java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be empty 
    at com.baynote.shaded.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) 
    at com.baynote.shaded.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) 
    at com.baynote.shaded.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) 
    at com.datastax.spark.connector.rdd.CassandraJoinRDD$$anonfun$fetchIterator$1.apply(CassandraJoinRDD.scala:268) 
    at com.datastax.spark.connector.rdd.CassandraJoinRDD$$anonfun$fetchIterator$1.apply(CassandraJoinRDD.scala:268) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be empty 
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:136) 
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179) 
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184) 
    at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43) 
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798) 
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617) 
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005) 
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) 
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:831) 
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:346) 
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:254) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
    ... 1 more 

问题是,我不能告诉哪个DStream,以及哪些数据导致它。我可以检查每个写入Cassandra的DStream,或者编写自己的数据验证程序,但我正在寻找更通用的解决方案。

另一个问题是它错误会杀死整个作业而不是忽略它并继续写入其他数据。基本上在简单的无火花写入的情况下,我会捕获异常,记录并继续写入其余数据。有没有办法在Spark Cassandra Connector中做类似的事情?

那么有什么我可以做的关于这两个问题?

+0

如何保存数据?它是rdd还是数据框架,使用案例类还是普通方式? – Kaushal

+0

它是元组的RDD。 – Soid

+0

而不是元组,尝试使用默认值的case类,您也可以编写一些验证来检查您的输入数据。 – Kaushal

回答

0

我认为我们应该考虑两种情况:

  1. 验证您输入的数据,以确保数据的密钥(在卡桑德拉列)不为空或无效的数据格式

  2. 你的数据是RDD ,因此您可以在调用保存方法之前进行排序以忽略无效数据。