2016-08-04 86 views
3

因此,我正在学习通过Apache Spark从ElasticSearch获取数据。 假设我已连接到具有“用户”索引的ElasticSearch。Spark:使用ElasticSearch索引进行优化连接

sqlContext = SQLContext(sc) 
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user') 

解释(usersES)显示我:

==实际规划==

扫描ElasticsearchRelation(MAP(es.nodes - > MYNODE, es.resource - > 用户/用户),org.apache.spark.sql.SQLContext @ 6c78e806,无)[关于#145,#活动146,bdate#147, UID#148]

当我使用过滤器:

usersES.filter(usersES.uid==1566324).explain() 

==实际规划==过滤器(UID#203L = 1566324) + - 扫描ElasticsearchRelation(MAP(es.nodes - > MYNODE,es.resource - > 用户/用户),org.apache.spark.sql.SQLContext @ 6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters:[EqualTo(uid,1566324)]

如您所见,Spark优雅地将过滤器推送到ElasticSearch,使索引搜索更快速舒适。

但是,当我尝试将usersES与其他数据框加入时,我始终得到相同的问题: Spark扫描整个ElasticSearch索引,而不是推送任何我给它的过滤器。 例如:

a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF() 
a.join(usersES, usersES.uid==a.id).explain() 

所示:

SortMergeJoin [ID#210L],[UID#203L]: - 排序[ID#210L ASC],假,0: + - TungstenExchange散列分区(id#210L,200),无:+ - ConvertToUnsafe:+ - 扫描ExistingRDD [id#210L] + - Sort [uid#203L ASC],false,0 + - TungstenExchange hashpartitioning(uid#203L,200),无 + - ConvertToUnsafe + - Scan ElasticsearchRelation(Map(es.nodes - > mynode,es.resource - > user S /用户),org.apache.spark.sql.SQLContext @ 6c78e806,无)[约145#,#活动146,bdate#147,#UID 148]

请告诉我,是可能的在Elasticsearch内部推入过滤器内部的连接?

回答

2

这是一个预期的行为,是elaticsearch-hadoop连接器支持下推谓词,但加入时不会推送。

这是因为连接操作不知道密钥在数据框中的分区方式。

默认情况下,此操作将散列两个数据框的所有密钥,将具有相同密钥散列的所有元素通过网络发送到同一台计算机,然后将该元素与该计算机上的相同密钥连接在一起。

这就是为什么你得到的执行计划没有被推下谓词。

编辑:似乎自2.1版的IN条款像连接器支持。如果你的DataFrame a不大,你应该使用它。

+0

谢谢!似乎这是唯一的方法,虽然不符合我的需要(在IN的df到py中有数千条记录) –

+0

比唯一的方法是使用spark来加入而不是使用In子句,或者甚至可以使用广播变量。 – eliasah

+0

你能否请至少接受解决问题的答案? – eliasah