2016-11-23 44 views
0

我有两个数据集已经使用相同的分区程序进行分区并存储在HDFS中。这些数据集是两个不同的Spark作业的输出,我们没有控制权。现在,我想加入这两个数据集来产生不同的信息。用Spark加入HDFS中的两个数据文件?

Example: 

Data Set 1: 
ORDER_ID CUSTOMER_ID ITEMS 
OD1  C1   1,2,3 -> partition 0 
OD2  C2   3,4,5 -> partition 0 
OD3  C4   1,2,3 -> partition 1 
OD4  C3   1,3  -> partition 1 

Data Set 1: 
ORDER_ID CUSTOMER_ID REFUND_ITEMS 
OD1  C1   1  -> partition 0 
OD2  C2   5  -> partition 0 
OD3  C4   2,3 -> partition 1 
OD4  C3   3  -> partition 1 

Options are: 

1) Create two RDDs from the datasets and join them. 
2) Create one RDD using one of the dataset. 
    -> For each partition in the RDD get the actual partition id i.e OD1 -> 0, OD3 -> 1 (using some custom logic) 
    -> Load data from HDFS for that partition for dataset 2 
    -> Iterate over both the dataset and produce combined result. 

For option 2 I don't know how to read a specific file form HDFS in the Spark executor. (I have the full URI for location of the file) 

回答

0

您可以尝试创建2个数据框并使用SQL将它们连接起来。请找到下面的代码。

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
import org.apache.spark.sql.Encoder 

// For implicit conversions from RDDs to DataFrames 
import spark.implicits._ 

case class struc_dataset(ORDER_ID: String,CUSTOMER_ID: String, ITEMS:String) 

//Read file1 
val File1DF = spark.sparkContext 
    .textFile("temp/src/file1.txt") 
    .map(_.split("\t")) 
    .map(attributes => struc_dataset(attributes(0), attributes(1),attributes(3))).toDF() 

//Register as Temp view - Dataset1 
File1DF.createOrReplaceTempView("Datset1") 

//Read file2 
val File2DF = spark.sparkContext 
    .textFile("temp/src/file2.txt") 
    .map(_.split("\t")) 
    .map(attributes => struc_dataset(attributes(0),attributes(1),attributes(3))).toDF() 

//Register as Temp view - Dataset2 
File2DF.createOrReplaceTempView("Datset2") 

// SQL statement to create final dataframe (JOIN) 
val finalDF = spark.sql("SELECT * FROM Dataset1 ds1 JOIN Dataset2 ds2 on ds1.ORDER_ID=ds2.ORDER_ID AND ds1.CUSTOMER_ID=ds2.CUSTOMER_ID") 

finalDF.show()