2017-06-13 111 views
0

我有org.apache.avro.generic.GenericRecordavro schema利用这一点,我们需要与SQLContext API的帮助下创建dataframe列表,创建dataframe需要RDDorg.apache.spark.sql.Rowavro schema。创建DF的先决条件是我们应该拥有org.apache.spark.sql.Row的RDD,它可以通过使用下面的代码来实现,但也可以通过使用下面的代码来实现,但一些它不能正常工作并给出错误代码。转换org.apache.avro.generic.GenericRecord到org.apache.spark.sql.Row

1. Convert GenericRecord to Row 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 
    import org.apache.avro.Schema 
    import org.apache.spark.sql.types.StructType 
    def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] = 
    { 
     val fields = avroSchema.getFields 
     var rows = new Seq[Row] 
     for (avroRecord <- genericRecords) { 
     var avroFieldsSeq = Seq[Any](); 
     for (i <- 0 to fields.size - 1) { 
      avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name) 
     } 
     val avroFieldArr = avroFieldsSeq.toArray 
     val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType) 
     rows = rows :+ genericRow 
     } 
     return rows; 
    } 

2. Convert `Avro schema` to `Structtype` 
    Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType 

3. Create `Dataframe` using `SQLContext` 
    val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType) 
    val rowRdd = sc.parallelize(rowSeq, 1) 
    val finalDF =sqlContext.createDataFrame(rowRDD,structType) 

但它在创建DataFrame抛出一个错误。有人可以帮助我在上面的代码中出现什么问题。此外,如果有人有不同的逻辑转换和创建dataframe

每当我将调用的数据帧的任何行动,它将执行DAG,并尝试建立DF对象,但在这一点,与下面的异常失败的

ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1 
         at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) 
         at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) 

在此之后,我想给正确版本的jar在罐子火花的参数与其它参数提交和--conf spark.driver.userClassPathFirst =真 但现在它与MAPR未按照

ERROR CLDBRpcCommonUtils: Exception during init 
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;) 
        at com.mapr.security.JNISecurity.SetClusterOption(Native Method) 
        at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163) 
        at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73) 
        at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63) 
        at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69) 
        at java.lang.Class.forName0(Native Method) 

我们正在使用的MAPR分布和火花类路径变更后提交,它由于上述例外而失败。

有人可以请帮助这里或我的基本需要它将Avro GenericRecord转换成Spark行,所以我可以创建Dataframe与它,请帮助
谢谢。

+1

请告诉我确切的错误?并请使用genericRecords示例avroSchema更新问题。 –

+0

@RameshMaharjan驱动堆栈跟踪: org.apache.spark.SparkException:由于阶段故障导致作业中止:阶段0.0中的任务0失败4次,最近发生故障:阶段0.0中丢失的任务0.3(TID 3,hdpoc-c01-r03 -01,executor 2):java.io.InvalidClassException:org.apache.commons.lang3.time.FastDateFormat; local class incompatible:stream classdesc serialVersionUID = 2,local class serialVersionUID = 1 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) –

+0

error看起来像流式数据源和本地转换代码之间的版本错误。您将不得不使用源使用的相同版本的FastDateFormat软件包。并请更新问题中的错误,以便其他人也可以帮助你。 –

回答