我有org.apache.avro.generic.GenericRecord
,avro schema
利用这一点,我们需要与SQLContext
API的帮助下创建dataframe
列表,创建dataframe
需要RDD
org.apache.spark.sql.Row
和avro schema
。创建DF的先决条件是我们应该拥有org.apache.spark.sql.Row的RDD,它可以通过使用下面的代码来实现,但也可以通过使用下面的代码来实现,但一些它不能正常工作并给出错误代码。转换org.apache.avro.generic.GenericRecord到org.apache.spark.sql.Row
1. Convert GenericRecord to Row
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
{
val fields = avroSchema.getFields
var rows = new Seq[Row]
for (avroRecord <- genericRecords) {
var avroFieldsSeq = Seq[Any]();
for (i <- 0 to fields.size - 1) {
avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
}
val avroFieldArr = avroFieldsSeq.toArray
val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
rows = rows :+ genericRow
}
return rows;
}
2. Convert `Avro schema` to `Structtype`
Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType
3. Create `Dataframe` using `SQLContext`
val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
val rowRdd = sc.parallelize(rowSeq, 1)
val finalDF =sqlContext.createDataFrame(rowRDD,structType)
但它在创建DataFrame
抛出一个错误。有人可以帮助我在上面的代码中出现什么问题。此外,如果有人有不同的逻辑转换和创建dataframe
。
每当我将调用的数据帧的任何行动,它将执行DAG,并尝试建立DF对象,但在这一点,与下面的异常失败的
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
在此之后,我想给正确版本的jar在罐子火花的参数与其它参数提交和--conf spark.driver.userClassPathFirst =真 但现在它与MAPR未按照
ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
at java.lang.Class.forName0(Native Method)
我们正在使用的MAPR分布和火花类路径变更后提交,它由于上述例外而失败。
有人可以请帮助这里或我的基本需要它将Avro GenericRecord转换成Spark行,所以我可以创建Dataframe与它,请帮助
谢谢。
请告诉我确切的错误?并请使用genericRecords示例avroSchema更新问题。 –
@RameshMaharjan驱动堆栈跟踪: org.apache.spark.SparkException:由于阶段故障导致作业中止:阶段0.0中的任务0失败4次,最近发生故障:阶段0.0中丢失的任务0.3(TID 3,hdpoc-c01-r03 -01,executor 2):java.io.InvalidClassException:org.apache.commons.lang3.time.FastDateFormat; local class incompatible:stream classdesc serialVersionUID = 2,local class serialVersionUID = 1 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) –
error看起来像流式数据源和本地转换代码之间的版本错误。您将不得不使用源使用的相同版本的FastDateFormat软件包。并请更新问题中的错误,以便其他人也可以帮助你。 –