2017-07-19 82 views
1

我有要求将流数据加载到DynamoDB表中的要求。我尝试了下面的代码。无法在spark中创建DynamoDB客户端执行程序

object UnResolvedLoad { 

    def main(args: Array[String]){ 
    val spark = SparkSession.builder().appName("unresolvedload").enableHiveSupport().getOrCreate() 
    val tokensDf = spark.sql("select * from unresolved_logic.unresolved_dynamo_load") 
    tokensDf.foreachPartition { x => loadFunc(x) } 
    } 


    def loadFunc(iter : Iterator[org.apache.spark.sql.Row]) = { 

     val client:AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard().build() 
     val dynamoDB:DynamoDB = new DynamoDB(client) 
     val table:Table = dynamoDB.getTable("UnResolvedTokens") 

     while(iter.hasNext){ 
     val cur = iter.next() 
     val item:Item = new Item().withString("receiverId ", cur.get(2).asInstanceOf[String]). 
       withString("payload_id", cur.get(0).asInstanceOf[String]). 
       withString("payload_confirmation_code", cur.get(1).asInstanceOf[String]). 
       withString("token", cur.get(3).asInstanceOf[String]) 

     table.putItem(item) 

     } 

} 

}

当我执行火花提交它不能够实例类。以下是错误信息。它说它不能实例化Class。帮助表示赞赏。 有没有一种方法,我们可以节省星火DataSet转换亚马逊DynamoDB

, executor 5): java.lang.NoClassDefFoundError: Could not initialize class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder 
     at com.dish.payloads.UnResolvedLoad$.loadFunc(UnResolvedLoad.scala:22) 
     at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16) 
     at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:748) 

17/07/19 17:35:15 INFO TaskSetManager: Lost task 26.0 in stage 0.0 (TID 26) on ip-10-176-225-151.us-west-2.compute.internal, executor 5: java.lang.NoClassDefFoundError (Could not initialize class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder) [duplicate 1] 
17/07/19 17:35:15 WARN TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, ip-10-176-225-151.us-west-2.compute.internal, executor 5): java.lang.IllegalAccessError: tried to access class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientConfigurationFactory from class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder 
     at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder.<clinit>(AmazonDynamoDBClientBuilder.java:30) 
     at com.dish.payloads.UnResolvedLoad$.loadFunc(UnResolvedLoad.scala:22) 
     at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16) 
     at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:748) 

回答

1

我终于能够通过使用DynamoDB API的较低版本来解决它。 EMR 5.7仅支持1.10.75.1。以下是对我来说工作正常的代码。

object UnResolvedLoad { 

    def main(args: Array[String]){ 
    val spark = SparkSession.builder().appName("unresolvedload").enableHiveSupport().getOrCreate() 
    val tokensDf = spark.sql("select * from unresolved_logic.unresolved_dynamo_load") 
    tokensDf.foreachPartition { x => loadFunc(x) } 
    } 


    def loadFunc(iter : Iterator[org.apache.spark.sql.Row]) = { 

     val client:AmazonDynamoDBClient = new AmazonDynamoDBClient(); 
     val usWest2 = Region.getRegion(Regions.US_WEST_2); 
     client.setRegion(usWest2) 



     while(iter.hasNext){ 
     val cur = iter.next() 

     val putMap = Map("receiverId" -> new AttributeValue(cur.get(2).asInstanceOf[String]), 
          "payload_id" -> new AttributeValue(cur.get(0).asInstanceOf[String]), 
          "payload_confirmation_code" -> new AttributeValue(cur.get(1).asInstanceOf[String]), 
          "token" -> new AttributeValue(cur.get(3).asInstanceOf[String])).asJava 

     val putItemRequest:PutItemRequest = new PutItemRequest("UnResolvedTokens",putMap) 
     client.putItem(putItemRequest) 
     } 

    } 
} 
+0

为我节省了大量时间!谢啦。你是怎么想到的?真的很难想到这个方向 – KAs

+0

我从AWS支持论坛得到了答案 –

相关问题