2017-10-12 117 views
0

我有一个非常简单的管道从卡夫卡采取JSON消息,并将其发送给Elasticsearch:如何将输入文档字段映射到elasticsearch _id字段?

input { 
    kafka { 
     bootstrap_servers => "kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093" 
     topics => [ "transactions_load" ] 
    } 
} 
filter { 
    json { 
    source => "message" 
    } 
    mutate{ 
    remove_field => ["kafka"] 
    remove_field => ["@version"] 
    remove_field => ["@timestamp"] 
    remove_field => ["message"] 
    remove_tag => ["multiline"] 
    } 
} 
output { 
    elasticsearch { 
     hosts => [ 
       "xxxxx.ibm-343.composedb.com:16915", 
       "xxxxx.ibm-343.composedb.com:16915" 
      ] 
     ssl => true 
     user => "logstash_kafka" 
     password => "*****" 
     index => "pos_transactions" 
    } 
} 

JSON的记录有TransactionID领域唯一标识每个记录:

{"TransactionID": "5440772161", "InvoiceNo": 5440772, "StockCode": 22294, "Description": "HEART FILIGREE DOVE SMALL", "Quantity": 4, "InvoiceDate": 1507777440000, "UnitPrice": 1.25, "CustomerID": 14825, "Country": "United Kingdom", "LineNo": 16, "InvoiceTime": "03:04:00", "StoreID": 1} 
{"TransactionID": "5440772191", "InvoiceNo": 5440772, "StockCode": 21733, "Description": "RED HANGING HEART T-LIGHT HOLDER", "Quantity": 4, "InvoiceDate": 1507777440000, "UnitPrice": 2.95, "CustomerID": 14825, "Country": "United Kingdom", "LineNo": 19, "InvoiceTime": "03:04:00", "StoreID": 1} 

我可以配置logstash使用TransactionID作为_id字段,这样如果我为同一个事务处理重复记录,这些更新是幂等的?

回答

0

我想出了自己的答案。张贴在这里,因为它可能对别人有用:

output { 
    elasticsearch { 
     hosts => [ 
       "xxxxx.ibm-343.composedb.com:16915", 
       "xxxxx.ibm-343.composedb.com:16915" 
      ] 
     ssl => true 
     user => "logstash_kafka" 
     password => "*****" 
     index => "pos_transactions" 
     document_id => "%{TransactionID}" 
    } 
} 

document_id => "%{TransactionID}"配置条目使用传入的文档TransactionID领域的elasticsearch _id

相关问题