2017-04-10 100 views
0

我正在将地板文件加载为火花数据集。我可以查询并从查询中创建新的数据集。现在,我想向数据集(“hashkey”)添加一个新列并生成值(例如md5sum(nameValue))。我怎样才能做到这一点?将列添加到火花数据集并转换数据

public static void main(String[] args) { 

    SparkConf sparkConf = new SparkConf(); 

    sparkConf.setAppName("Hello Spark"); 
    sparkConf.setMaster("local"); 

    SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example") 
      .config("spark.master", "local").config("spark.sql.warehouse.dir", "file:///C:\\spark_warehouse") 
      .getOrCreate(); 

    Dataset<org.apache.spark.sql.Row> df = spark.read().parquet("meetup.parquet"); 
    df.show(); 

    df.createOrReplaceTempView("tmpview"); 

    Dataset<Row> namesDF = spark.sql("SELECT * FROM tmpview where name like 'Spark-%'"); 

    namesDF.show(); 

} 

输出看起来是这样的:

+-------------+-----------+-----+---------+--------------------+ 
|   name|meetup_date|going|organizer|    topics| 
+-------------+-----------+-----+---------+--------------------+ 
| Spark-H20| 2016-01-01| 50|airisdata|[h2o, repeated sh...| 
| Spark-Avro| 2016-01-02| 60|airisdata| [avro, usecases]| 
|Spark-Parquet| 2016-01-03| 70|airisdata| [parquet, usecases]| 
+-------------+-----------+-----+---------+--------------------+ 

回答

1

只需添加火花SQL函数MD5在您查询。

Dataset<Row> namesDF = spark.sql("SELECT *, md5(name) as modified_name FROM tmpview where name like 'Spark-%'"); 
0
Dataset<Row> ds = sqlContext.read() 
    .format("com.databricks.spark.csv") 
    .option("inferSchema", "true") 
    .option("header", "true") 
    .option("delimiter","|") 
    .load("/home/cloudera/Desktop/data.csv"); 
ds.printSchema(); 

将打印:将上述代码之后

root 
|-- ReferenceValueSet_Id: integer (nullable = true) 
|-- ReferenceValueSet_Name: string (nullable = true) 
|-- Code_Description: string (nullable = true) 
|-- Code_Type: string (nullable = true) 
|-- Code: string (nullable = true) 
|-- CURR_FLAG: string (nullable = true) 
|-- REC_CREATE_DATE: timestamp (nullable = true) 
|-- REC_UPDATE_DATE: timestamp (nullable = true) 

Dataset<Row> df1 = ds.withColumn("Key", functions.lit(1)); 
     df1.printSchema(); 

,它将附加具有恒定值一列。

root 
|-- ReferenceValueSet_Id: integer (nullable = true) 
|-- ReferenceValueSet_Name: string (nullable = true) 
|-- Code_Description: string (nullable = true) 
|-- Code_Type: string (nullable = true) 
|-- Code: string (nullable = true) 
|-- CURR_FLAG: string (nullable = true) 
|-- REC_CREATE_DATE: timestamp (nullable = true) 
|-- REC_UPDATE_DATE: timestamp (nullable = true) 
|-- Key: integer (nullable = true) 

您可以看到名称为Key的列被添加到数据集中。

如果你想添加一些列的constunt值的位置,你可以使用下面的代码来添加它。

Dataset<Row> df1 = ds.withColumn("Key", functions.lit(ds.col("Code"))); 
     df1.printSchema(); 
     df1.show(); 

现在,它将打印watever的值在列CODE中。到名为Key的新列中。