获取所有源字段我有以下Elasticseach数据(本地单节点服务器)星火RDD不Elasticsearch
SEACH命令curl -XPOST 'localhost:9200/sparkdemo/_search?pretty' -d '{ "query": { "match_all": {} } }'
OUTPUT:
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 10,
"max_score" : 1.0,
"hits" : [ {
"_index" : "sparkdemo",
"_type" : "hrinfo",
"_id" : "AVNAY_H0lYe0cQl--Bin",
"_score" : 1.0,
"_source" : {
"date" : "9/Mar/2016",
"pid" : "1",
"propName" : "HEARTRATE",
"var" : null,
"propValue" : 86,
"avg" : 86,
"stage" : "S1"
}
}, {
"_index" : "sparkdemo",
"_type" : "hrinfo",
"_id" : "AVNAY_KklYe0cQl--Bir",
"_score" : 1.0,
"_source" : {
"date" : "13/Mar/2016",
"pid" : "1",
"propName" : "HEARTRATE",
"var" : null,
"propValue" : 86,
"avg" : 87,
"stage" : "S1"
}
}, {
"_index" : "sparkdemo",
"_type" : "hrinfo",
"_id" : "AVNAY-TolYe0cQl--Bii",
"_score" : 1.0,
"_source" : {
"date" : "4/Mar/2016",
"pid" : "1",
"propName" : "HEARTRATE",
"var" : null,
"propValue" : 82,
"avg" : 82,
"stage" : "S0"
}
},
.......
... Few more records
..........
}, {
"_index" : "sparkdemo",
"_type" : "hrinfo",
"_id" : "AVNAY_KklYe0cQl--Biq",
"_score" : 1.0,
"_source" : {
"date" : "12/Mar/2016",
"pid" : "1",
"propName" : "HEARTRATE",
"var" : null,
"propValue" : 91,
"avg" : 89,
"stage" : "S1"
}
} ]
}
}
我想所有的取Spark程序中的数据(从eclipse运行的本地独立程序)。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.elasticsearch.spark._
import scala.collection.mutable.Map;
object Test1 {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("HRInfo");
val sc = new SparkContext(conf);
val esRdd = sc.esRDD("sparkdemo/hrinfo", "?q=*");
val searchResultRDD = esRdd.map(t => {
println("id:" + t._1 + ", map:" + t._2);
t._2;
});
val infoRDD = searchResultRDD.collect().foreach(map => {
var stage = map.get("stage");
var pid = map.get("pid");
var date = map.get("date");
var propName = map.get("propName");
var propValue = map.get("propValue");
var avg = map.get("avg");
var variation = map.get("var");
println("Info(" + stage + "," + pid + "," + date + "," + propName + "," + propValue + "," + avg + "," + variation + ")");
});
}
}
但是程序没有获取存储在ElasticSearch中的所有记录文件。
程序输出:
id:AVNAY_H0lYe0cQl--Bin, map:Map(date -> 9/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_KklYe0cQl--Bir, map:Map(date -> 13/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-TolYe0cQl--Bii, map:Map(date -> 4/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_H0lYe0cQl--Bio, map:Map(date -> 10/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_KklYe0cQl--Bip, map:Map(date -> 11/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-TolYe0cQl--Bij, map:Map(date -> 5/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-Y9lYe0cQl--Bil, map:Map(date -> 7/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-Y9lYe0cQl--Bim, map:Map(date -> 8/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-Y9lYe0cQl--Bik, map:Map(date -> 6/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_KklYe0cQl--Biq, map:Map(date -> 12/Mar/2016, pid -> 1, propName -> HEARTRATE)
Info(None,Some(1),Some(9/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(13/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(4/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(10/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(11/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(5/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(7/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(8/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(6/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(12/Mar/2016),Some(HEARTRATE),None,None,None)
计划获取的所有记录,但每条记录不取其他领域(即阶段,propValue,AVG和variabtion)为什么? 非常感谢。
感谢@Steve Willcock,为 “VAR” 非空值,解决了我的问题。但有一个困惑是,为了正确工作,我不应该在ElasticSeach文档中放置'null'值,是不是? – Mahendra
嗨马亨德拉,很高兴我很高兴这是为你工作。 ElasticSearch肯定支持空值,虽然有一些注意事项 - 详情请参阅:https://www.elastic.co/guide/en/elasticsearch/guide/current/_dealing_with_null_values.html。通过curl查询数据显示,nurse是按照您的预期在json文档中返回的,所以问题似乎发生在弹性搜索spark库中的json文档的反序列化中 - 我不确定这是否是意图的行为 - 这似乎有点奇怪。 –