2016-10-02 96 views
1

我有两个数据集,每个数据集都有两个元素。 下面是一些例子。如何通过scala中的键加入两个数据集spark

数据1:(名称,动物)

('abc,def', 'monkey(1)') 
('df,gh', 'zebra') 
... 

数据2:(姓名,水果)

('a,efg', 'apple') 
('abc,def', 'banana(1)') 
... 

结果预期:(姓名,动物,水果)

('abc,def', 'monkey(1)', 'banana(1)') 
... 

我想要通过使用第一列“名称”来加入这两个数据集。我试图做几个小时,但我无法弄清楚。谁能帮我?

val sparkConf = new SparkConf().setAppName("abc").setMaster("local[2]") 
val sc = new SparkContext(sparkConf) 
val text1 = sc.textFile(args(0)) 
val text2 = sc.textFile(args(1)) 

val joined = text1.join(text2) 

上面的代码不工作!

+0

你在哪里拆分输入文本到'(键,值)'元组? – maasg

+0

你会得到什么样的错误?它告诉你什么? – maasg

+0

@maasg它说''无法解析符号连接。' – tobby

回答

1

join上对RDDS定义的,即,类型的RDDS结果RDD[(K,V)]。 需要的第一步是将输入数据转换为正确的类型。

我们首先需要String类型的原始数据转化为对(Key, Value)

val parse:String => (String, String) = s => { 
    val regex = "^\\('([^']+)',[\\W]*'([^']+)'\\)$".r 
    s match { 
    case regex(k,v) => (k,v) 
    case _ => ("","") 
    } 
} 

(请注意,由于键包含逗号,我们不能用一个简单的split(",")表达式)

然后我们使用该函数来解析文本输入数据:

val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')") 
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')") 

val rdd1 = sparkContext.parallelize(s1) 
val rdd2 = sparkContext.parallelize(s2) 

val kvRdd1 = rdd1.map(parse) 
val kvRdd2 = rdd2.map(parse) 

最后,我们使用join方法来连接两个RDDS

val joined = kvRdd1.join(kvRdd2) 

//让我们看看结果

joined.collect 

// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1)))) 
+0

非常感谢! – tobby

+0

我还有一个问题。我怎样才能在数据中保留单引号? – tobby

+1

@tobby更改正则表达式以保留引号。 – maasg

0

您必须首先为您的数据集创建pairRDD,然后您必须应用连接转换。您的数据集看起来不准确。

请考虑下面的例子。

**Dataset1** 

a 1 
b 2 
c 3 

**Dataset2** 

a 8 
b 4 

您的代码应该像下面Scala中

val pairRDD1 = sc.textFile("/path_to_yourfile/first.txt").map(line => (line.split(" ")(0),line.split(" ")(1))) 

    val pairRDD2 = sc.textFile("/path_to_yourfile/second.txt").map(line => (line.split(" ")(0),line.split(" ")(1))) 

    val joinRDD = pairRDD1.join(pairRDD2) 

    joinRDD.collect 

下面是从阶壳

res10: Array[(String, (String, String))] = Array((a,(1,8)), (b,(2,4))) 
相关问题