2017-08-16 132 views
1

是否可以将JSON字符串从Spark立即索引到Elasticsearch?我不想有任何中间Scala案例类或POJOS。将Spark的Json字符串从Spark立即索引到Elasticsearch

我正在使用Spark,Scala和Elastic 5.5。

我的代码如下所示:

val s = xml 

    .map { x => 
     import org.json.XML 

     XML.toJSONObject(x).toString 

    }.top(1) 

    spark.sparkContext.makeRDD(s).saveToEs("test/article") 

不过,我不断收到:

org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - failed to parse; Bailing out.. 
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138) 

即使我可以索引JSON字符串中使用Kibana或其他ES客户。

在这段代码中,我只是将带有XML内容的字符串的RDD与JSON进行对话,然后尝试在ES中进行索引。

回答

1

您可以使用下一个方法,如果你的对象是已经在JSON格式:

import org.apache.spark.SparkContext 
import org.elasticsearch.spark.rdd.EsSpark 
val jsonField = .....//some json          
val rdd = sc.makeRDD(jsonField)    
EsSpark.saveToEs(rdd, "spark/docs") 

如果你的对象是不是在JSON类写入操作之前,你可以简化你的生活,并使用它转换成JSON格式的方法:

val persitedObject = .....//some json             
sparkContext.makeRDD(persitedObject) 
         .saveJsonToEs("spark/docs") 

有关详情,请specification