2016-06-28 258 views
1

我正在使用influxDB来存储我的时间序列数据。如何写入使用golang客户端连续写入influxdb

我写了一个简单的golang应用程序来读取文件time.log中的行。

文档在https://github.com/influxdata/influxdb/blob/master/client/README.md#inserting-data说:

插入数据

时间序列数据又名点被写入使用批量插入数据库。该机制是创建一个或多个点,然后创建一个批次aka批处理点并将其写入给定的数据库和系列。一个系列是测量(时间/值)和一组标签的组合。

在本示例中,我们将创建一个1,000点的批次。每个点都有一个时间和一个值,以及2个表示形状和颜色的标签。我们使用名为形状的度量将这些点写入名为square_holes的数据库。

注意:您可以指定一个RetentionPolicy作为批点的一部分。如果未提供,InfluxDB将使用数据库默认保留策略。

func writePoints(clnt client.Client) { 
    sampleSize := 1000 
    rand.Seed(42) 

    bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ 
     Database: "systemstats", 
     Precision: "us", 
    }) 

    for i := 0; i < sampleSize; i++ { 
     regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"} 
     tags := map[string]string{ 
      "cpu": "cpu-total", 
      "host": fmt.Sprintf("host%d", rand.Intn(1000)), 
      "region": regions[rand.Intn(len(regions))], 
     } 

     idle := rand.Float64() * 100.0 
     fields := map[string]interface{}{ 
      "idle": idle, 
      "busy": 100.0 - idle, 
     } 

     bp.AddPoint(client.NewPoint(
      "cpu_usage", 
      tags, 
      fields, 
      time.Now(), 
     )) 
    } 

    err := clnt.Write(bp) 
    if err != nil { 
     log.Fatal(err) 
    } 
} 

但因为我不断地从日志中读取数据。我从来没有读过日志。那么,我写入入流服务器的最佳方式是什么?

这是我的当前代码:

cmdBP := client.NewBatchPoints(...) 
for line := range logFile.Lines { 
    pt := parseLine(line.Text) 
    cmdBP.AddPoint(pt) 
} 

influxClient.Write(cmdBP) 

基本上范围logFile.Lines永远不会终止,因为它是基于一个频道上。

+0

那么每处理N行日志时只调用'client.Write(cmdBP)'怎么办? – sberry

+0

所以我需要使用范围循环内的计数器? –

回答

1

使用的批次分时间进行组合(运行作为一个够程):

func (h *InfluxDBHook) loop() { 
    var coll []*client.Point 
    tick := time.NewTicker(h._batchInterval) 

    for { 
     timeout := false 

     select { 
     case pt := <-h._points: 
      coll = append(coll, pt) 
     case <-tick.C: 
      timeout = true 
     } 

     if (timeout || len(coll) >= h._batchSize) && len(coll) > 0 { 
      bp, err := client.NewBatchPoints(h._batchPointsConfig) 
      if err != nil { 
       //TODO: 
      } 
      bp.AddPoints(coll) 
      err = h._client.Write(bp) 
      if err != nil { 
       //TODO: 
      } else { 
       coll = nil 
      } 
     } 
    } 
} 

顺便说一句,你可以用一个钩子与logrus日志记录包,以日志发送到InfluxDB(示例代码从logrusInfluxDB hook)。