2016-03-04 87 views
1

获取所有源字段我有以下Elasticseach数据(本地单节点服务器)星火RDD不Elasticsearch

SEACH命令curl -XPOST 'localhost:9200/sparkdemo/_search?pretty' -d '{ "query": { "match_all": {} } }'

OUTPUT:

{ 
    "took" : 4, 
    "timed_out" : false, 
    "_shards" : { 
    "total" : 5, 
    "successful" : 5, 
    "failed" : 0 
    }, 
    "hits" : { 
    "total" : 10, 
    "max_score" : 1.0, 
    "hits" : [ { 
     "_index" : "sparkdemo", 
     "_type" : "hrinfo", 
     "_id" : "AVNAY_H0lYe0cQl--Bin", 
     "_score" : 1.0, 
     "_source" : { 
     "date" : "9/Mar/2016", 
     "pid" : "1", 
     "propName" : "HEARTRATE", 
     "var" : null, 
     "propValue" : 86, 
     "avg" : 86, 
     "stage" : "S1" 
     } 
    }, { 
     "_index" : "sparkdemo", 
     "_type" : "hrinfo", 
     "_id" : "AVNAY_KklYe0cQl--Bir", 
     "_score" : 1.0, 
     "_source" : { 
     "date" : "13/Mar/2016", 
     "pid" : "1", 
     "propName" : "HEARTRATE", 
     "var" : null, 
     "propValue" : 86, 
     "avg" : 87, 
     "stage" : "S1" 
     } 
    }, { 
     "_index" : "sparkdemo", 
     "_type" : "hrinfo", 
     "_id" : "AVNAY-TolYe0cQl--Bii", 
     "_score" : 1.0, 
     "_source" : { 
     "date" : "4/Mar/2016", 
     "pid" : "1", 
     "propName" : "HEARTRATE", 
     "var" : null, 
     "propValue" : 82, 
     "avg" : 82, 
     "stage" : "S0" 
     } 
    }, 
....... 
... Few more records 
.......... 
    }, { 
     "_index" : "sparkdemo", 
     "_type" : "hrinfo", 
     "_id" : "AVNAY_KklYe0cQl--Biq", 
     "_score" : 1.0, 
     "_source" : { 
     "date" : "12/Mar/2016", 
     "pid" : "1", 
     "propName" : "HEARTRATE", 
     "var" : null, 
     "propValue" : 91, 
     "avg" : 89, 
     "stage" : "S1" 
     } 
    } ] 
    } 
} 

我想所有的取Spark程序中的数据(从eclipse运行的本地独立程序)。

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.elasticsearch.spark._ 
import scala.collection.mutable.Map; 

object Test1 { 

    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[2]").setAppName("HRInfo"); 
    val sc = new SparkContext(conf); 

    val esRdd = sc.esRDD("sparkdemo/hrinfo", "?q=*"); 

    val searchResultRDD = esRdd.map(t => { 
     println("id:" + t._1 + ", map:" + t._2); 
     t._2; 
    }); 

    val infoRDD = searchResultRDD.collect().foreach(map => { 
     var stage = map.get("stage"); 
     var pid = map.get("pid"); 
     var date = map.get("date"); 
     var propName = map.get("propName"); 
     var propValue = map.get("propValue"); 
     var avg = map.get("avg"); 
     var variation = map.get("var"); 

     println("Info(" + stage + "," + pid + "," + date + "," + propName + "," + propValue + "," + avg + "," + variation + ")"); 

    }); 

    } 
} 

但是程序没有获取存储在ElasticSearch中的所有记录文件。

程序输出:

id:AVNAY_H0lYe0cQl--Bin, map:Map(date -> 9/Mar/2016, pid -> 1, propName -> HEARTRATE) 
id:AVNAY_KklYe0cQl--Bir, map:Map(date -> 13/Mar/2016, pid -> 1, propName -> HEARTRATE) 
id:AVNAY-TolYe0cQl--Bii, map:Map(date -> 4/Mar/2016, pid -> 1, propName -> HEARTRATE) 
id:AVNAY_H0lYe0cQl--Bio, map:Map(date -> 10/Mar/2016, pid -> 1, propName -> HEARTRATE) 
id:AVNAY_KklYe0cQl--Bip, map:Map(date -> 11/Mar/2016, pid -> 1, propName -> HEARTRATE) 
id:AVNAY-TolYe0cQl--Bij, map:Map(date -> 5/Mar/2016, pid -> 1, propName -> HEARTRATE) 
id:AVNAY-Y9lYe0cQl--Bil, map:Map(date -> 7/Mar/2016, pid -> 1, propName -> HEARTRATE) 
id:AVNAY-Y9lYe0cQl--Bim, map:Map(date -> 8/Mar/2016, pid -> 1, propName -> HEARTRATE) 
id:AVNAY-Y9lYe0cQl--Bik, map:Map(date -> 6/Mar/2016, pid -> 1, propName -> HEARTRATE) 
id:AVNAY_KklYe0cQl--Biq, map:Map(date -> 12/Mar/2016, pid -> 1, propName -> HEARTRATE) 
Info(None,Some(1),Some(9/Mar/2016),Some(HEARTRATE),None,None,None) 
Info(None,Some(1),Some(13/Mar/2016),Some(HEARTRATE),None,None,None) 
Info(None,Some(1),Some(4/Mar/2016),Some(HEARTRATE),None,None,None) 
Info(None,Some(1),Some(10/Mar/2016),Some(HEARTRATE),None,None,None) 
Info(None,Some(1),Some(11/Mar/2016),Some(HEARTRATE),None,None,None) 
Info(None,Some(1),Some(5/Mar/2016),Some(HEARTRATE),None,None,None) 
Info(None,Some(1),Some(7/Mar/2016),Some(HEARTRATE),None,None,None) 
Info(None,Some(1),Some(8/Mar/2016),Some(HEARTRATE),None,None,None) 
Info(None,Some(1),Some(6/Mar/2016),Some(HEARTRATE),None,None,None) 
Info(None,Some(1),Some(12/Mar/2016),Some(HEARTRATE),None,None,None) 

计划获取的所有记录,但每条记录不取其他领域(即阶段,propValue,AVG和variabtion)为什么? 非常感谢。

回答

1

这是因为文档中的"var": null值所致。每个文档中的"var": null以及以下所有值都不会将其放入Scala中的地图中。

您可以通过将"var": null值之一替换为非空值(例如"var": "test")来显示此值。然后,您将按照预期正确返回所有值。或者,您可以在文档的开头放置一个空值。例如

curl -X POST 'http://localhost:9200/sparkdemo/hrinfo/5' -d '{"test":null,"date": "9/Mar/2016","pid": "1","propName": "HEARTRATE","propValue": 86,"avg": 86,"stage": "S1"}' 

,地图将是空该文档:

id:5, map:Map() 
+0

感谢@Steve Willcock,为 “VAR” 非空值,解决了我的问题。但有一个困惑是,为了正确工作,我不应该在ElasticSeach文档中放置'null'值,是不是? – Mahendra

+0

嗨马亨德拉,很高兴我很高兴这是为你工作。 ElasticSearch肯定支持空值,虽然有一些注意事项 - 详情请参阅:https://www.elastic.co/guide/en/elasticsearch/guide/current/_dealing_with_null_values.html。通过curl查询数据显示,nurse是按照您的预期在json文档中返回的,所以问题似乎发生在弹性搜索spark库中的json文档的反序列化中 - 我不确定这是否是意图的行为 - 这似乎有点奇怪。 –

0

试试这个:

import org.elasticsearch.spark.sql._ 

val sql = new SQLContext(sc) 
val index1 = sql.esDF("index/type") 
println(index1.schema.treeString)