0

我使用弗林克1.1.2和Maven中已经添加ElesticSearch依赖性如下客户端未连接到任何Elasticsearch节点弗林克

<dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-connector-elasticsearch2_2.10</artifactId> 
      <version>1.2.0</version> 
</dependency> 

我的程序包含以下代码是从卡夫卡与插入读取数据到弹性搜索

public class ReadFromKafka { 


    public static void main(String[] args) throws Exception { 
     // create execution environment 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "test"); 


     DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test", 
       new JoinSchema(), properties)); 

     System.out.println("reading form kafka "); 

     message.print(); 


     Map<String, String> config = new HashMap<>(); 
     config.put("bulk.flush.max.actions", "1"); // flush inserts after every event 
     config.put("cluster.name", "elasticsearch_amar"); // default cluster name 

     List<InetSocketAddress> transports = new ArrayList<>(); 
// set default connection details 
     transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); 

     message.addSink(new ElasticsearchSink<>(config,transports,new ElasticInserter())); 

     env.execute(); 


    } //main 

    public static class ElasticInserter implements ElasticsearchSinkFunction<JoinedStreamEvent>{ 


     @Override 
     public void process(JoinedStreamEvent record, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { 

      Map<String, Integer> json = new HashMap<>(); 

      json.put("Time", record.getPatient_id()); 
      json.put("heart Rate ", record.getHeartRate()); 
      json.put("resp rete", record.getRespirationRate()); 


      IndexRequest rqst = Requests.indexRequest() 
        .index("nyc-places")   // index name 
        .type("popular-locations")  // mapping name 
        .source(json); 

      requestIndexer.add(rqst); 

     } //process 


    } //ElasticInserter 

} //ReadFromKafka 

我已经使用homebrew安装ElesticSearch,然后使用如下所示

命令它开始

enter image description here

然而,当我开始我的节目,我有以下错误 enter image description here

回答

0

我的低于50的声誉,无法发表评论。

我有一点建议的:

  • 首先检查是否ES达, 看到Can't Connect to Elasticsearch (through Curl)
  • 建议使用docker容器启动ES,例如。 搬运工运行-d --name ES -p 9200:9200 elasticsearch:2 -Des.network.host = 0.0.0.0
  • 顺便说一句,你可以试试:
:在ES配置 elasticsearch.yml修改 es.network.host0.0.0.0