因此,我正在学习通过Apache Spark从ElasticSearch获取数据。 假设我已连接到具有“用户”索引的ElasticSearch。Spark:使用ElasticSearch索引进行优化连接
sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')
解释(usersES)显示我:
==实际规划==
扫描ElasticsearchRelation(MAP(es.nodes - > MYNODE, es.resource - > 用户/用户),org.apache.spark.sql.SQLContext @ 6c78e806,无)[关于#145,#活动146,bdate#147, UID#148]
当我使用过滤器:
usersES.filter(usersES.uid==1566324).explain()
==实际规划==过滤器(UID#203L = 1566324) + - 扫描ElasticsearchRelation(MAP(es.nodes - > MYNODE,es.resource - > 用户/用户),org.apache.spark.sql.SQLContext @ 6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters:[EqualTo(uid,1566324)]
如您所见,Spark优雅地将过滤器推送到ElasticSearch,使索引搜索更快速舒适。
但是,当我尝试将usersES与其他数据框加入时,我始终得到相同的问题: Spark扫描整个ElasticSearch索引,而不是推送任何我给它的过滤器。 例如:
a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()
所示:
SortMergeJoin [ID#210L],[UID#203L]: - 排序[ID#210L ASC],假,0: + - TungstenExchange散列分区(id#210L,200),无:+ - ConvertToUnsafe:+ - 扫描ExistingRDD [id#210L] + - Sort [uid#203L ASC],false,0 + - TungstenExchange hashpartitioning(uid#203L,200),无 + - ConvertToUnsafe + - Scan ElasticsearchRelation(Map(es.nodes - > mynode,es.resource - > user S /用户),org.apache.spark.sql.SQLContext @ 6c78e806,无)[约145#,#活动146,bdate#147,#UID 148]
请告诉我,是可能的在Elasticsearch内部推入过滤器内部的连接?
谢谢!似乎这是唯一的方法,虽然不符合我的需要(在IN的df到py中有数千条记录) –
比唯一的方法是使用spark来加入而不是使用In子句,或者甚至可以使用广播变量。 – eliasah
你能否请至少接受解决问题的答案? – eliasah