2017-02-24 102 views
0

在独立的spark中,我试图从一个数据框写入Elasticsearch。虽然我可以得到它的工作,但我无法弄清楚如何写入格式为'index_name- {ts_col:{YYYY-mm-dd}}'的动态命名索引,其中'ts_col'是一个日期时间字段在数据集中。是否有可能使用elasticsearch-hadoop/spark写入带有格式化日期的动态创建的Elasticsearch索引?

我见过各种各样的帖子说这种类型的语法应该可以工作,但是当我尝试它时,我会收到包含在底部的错误。它似乎首先检查在创建索引之前索引是否存在,但它将未格式化的索引名称传递给该索引,而不是动态创建索引名称。我已经尝试使用python elasticsearch模块以相同语法首先创建索引,但它无法处理动态索引名称。

是否有任何解决方案可用于我,或者是否必须遍历Spark中的数据集才能找到所表示的每个日期,创建我需要的索引,然后一次写入一个索引?我错过了明显的东西吗? Logstash很轻松地做到这一点,我不明白为什么我不能在Spark中使用它。

下面是我使用的写命令(尝试了它不同的变化也是如此):

df.write.format("org.elasticsearch.spark.sql") 
    .option('es.index.auto.create', 'true') 
    .option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name') 
    .option('es.mapping.id', 'es_id') 
    .save() 

下面是我使用的jar:

elasticsearch-hadoop-5.0.0/dist/elasticsearch-spark-20_2.11-5.0.0.jar 

这是我得到的错误,当我使用上面的写命令:

ERROR NetworkClient: Node [##.##.##.##:9200] failed (Invalid target URI [email protected]/index_name-{ts_col:{YYYY.mm.dd}}/type_name); selected next node [##.##.##.##:9200]

...

...

Py4JJavaError: An error occurred while calling o114.save. : org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed;

如果我设置改写为True,我得到:

Py4JJavaError: An error occurred while calling o58.save. : org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: no such index null at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:363) at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92) at org.elasticsearch.hadoop.rest.RestRepository.delete(RestRepository.java:455) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:500) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:442) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

如果我尝试使用Elasticsearch Python客户端可以提前创建索引我得到:

RequestError: TransportError(400, u'invalid_index_name_exception', u'Invalid index name [index_name-{ts_col:YYYY.MM.dd}], must be lowercase')

回答

1

你并不需要再次把日期格式大括号内。你可以阅读更多关于它的here

.option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')

改变上面,如下图所示:

.option('es.resource', 'index_name-{ts_col:YYYY.mm.dd}/type_name') 

注:确保您ts_col领域有适当的日期格式。

+0

对不起,延迟响应,但我终于回到尝试这个,它的工作原理!我有两个问题。我的花括号过多,我使用的是时间戳列,而不仅仅是日期列。一旦我添加了一个新的日期列,我就可以基于此创建索引。下面是工作的示例代码: df.write \ \t .format( “org.elasticsearch.spark.sql”)\ \t。选项( 'es.index.auto.create', '真')\ \t。选项( 'es.write.operation', 'UPSERT')\ \t .mode( '追加')\ \t。选项( 'es.mapping.id', 'ES_ID')\ \t .save( “%s- {es_date:YYYY.MM.dd} /%s”%(index,type)) – Jim

相关问题