2016-07-29 51 views
0

我对Spark Apache非常陌生,这主要是对我自己的练习。我有两个json文件。在数据框中搜索条件

File1 companies.json) 
[ 
{"symbol":...,"name":...,"description":...} 
. 
. 
] 

File 2) emails.json: 
[ 
{"from":...,"to":...,"subject":...,"body":...} 
] 

现在我已经通过阅读这两个文件合并成一个数据帧:

val companies = spark.read.json("hdfs://symbols.json") 
    val emails = spark.read.json("hdfs://emails-out.json") 

我想要做的就是把所有的电子邮件行,并行他们并筛选出只包含搜索字词的邮件来自companies.json中的(符号,名称)。我在电子邮件中匹配(符号,名称)与from,to,subject和body字段。

这个问题的最佳方法是什么?我应该只是将电子邮件转换为RDD并行化行,然后检索每个单独的搜索词并匹配电子邮件?一旦电子邮件包含companies.json中的任何条款,我就会返回该列表。

我一直都在这一整天,因为我对这种发展很新。

感谢

回答

1

使用广播变量建立在Narendra之上。

val email = sqlContext.jsonFile("/Users/raviramadoss/emails.json") 
val companies = sqlContext.jsonFile("/Users/raviramadoss/companies.json") 

val companyMap = companies.flatMap(x => List(x.getString(1),x.getString(2))).collect() 
val bcCompany = sc.broadcast(companyMap) 
val bcval = bcCompany.value 

val func: (String => Boolean) = (arg: String) => bcCompany.value.foldLeft(false)(_ || arg.contains(_)) 

val sqlfunc = udf(func) 

email.show(false) 
companies.show(false) 
email.filter(sqlfunc(col("from"))).show() 

输出:

+----------------------------------------------------+-----------------------+-------------+ 
|body            |from     |to   | 
+----------------------------------------------------+-----------------------+-------------+ 
|First email from a company in the known company list|[email protected]   |[email protected]| 
|This email should be filtered out     |[email protected]|[email protected]| 
+----------------------------------------------------+-----------------------+-------------+ 
+-----------+----------+------+ 
|description|name  |symbol| 
+-----------+----------+------+ 
|Citigroup |Citi  |citi | 
|Capital One|capitalone|capone| 
+-----------+----------+------+ 
+--------------------+--------------+-------------+ 
|    body|   from|   to| 
+--------------------+--------------+-------------+ 
|First email from ...|[email protected]|[email protected]| 
+--------------------+--------------+-------------+ 
2

,如果你的数据量小,那么你可以按照任何的办法,如果企业的数据集有较少的数据,然后你把数组或地图上的驱动程序,然后让电子邮件

email.filter的RDD (eachline =>(companies.foldLeft(false)(_ || eachline(7).contains(_))))