2017-03-16 89 views
0

我正在使用AWS SDK从将数据发布到Kinesis流的Java应用程序写入数据。使用下面的代码一次批量完成10条记录;将Kinesis中的数据写入S3

// Convert to JSON object, and then to bytes... 
       ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); 
       String json = ow.writeValueAsString(transaction); 

       // Add byte array to PutRecordsRequestEntry 
       PutRecordsRequestEntry record = new PutRecordsRequestEntry(); 
       record.setPartitionKey(String.valueOf(java.util.UUID.randomUUID())); 
       record.setData(ByteBuffer.wrap(json.getBytes())); 

       // Add to list... 
       batch.add(record); 

       // Check and send batches 
       if(counter>=batchLimit){ 

        logger.info("Sending batch of " + batchLimit + " rows."); 

        putRecordsRequest.setRecords(batch); 
        PutRecordsResult result = amazonKinesisClient.putRecords(putRecordsRequest); 
        batch = new ArrayList<>(); 
        counter=0; 

       }else{ 
        counter++; 
       } 

然后我有室壁运动收到了被触发。每笔交易的NodeJS lambda函数,这种想法是为它写从室壁运动来进行交易,并把它们转化为他们的数据来流水流保存到S3。

var AWS = require('aws-sdk'); 
var firehose = new AWS.Firehose(); 

exports.handler = function(event, context) { 

    console.log(event); 

    var params = { 
     DeliveryStreamName: "transaction-postings", 
     Record: { 
      Data: decodeURIComponent(event) 
     } 
    }; 
    firehose.putRecord(params, function(err, data) { 
     if (err) console.log(err, err.stack); // an error occurred 
     else { 
      console.log(data);   // successful response 
     } 

     context.done(); 
    }); 
}; 

然而,在S3上的数据看时,我看到的是下面的,并没有像我期待的对象的JSON列表...

[object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object] 

可有人请点我什么我错过了将Kinesis中的数据作为JSON对象流向s3吗?

回答

1
Data: decodeURIComponent(event) 

您需要序列化事件,因为Lambda会自动对参数进行反序列化。即:

Data: JSON.stringify(decodeURIComponent(event)) 
+0

刚做了改变,仍然得到“[object object]”“[object object]”“[object object]”“[object Object]”“[object Object]”“[object Object]” “[对象对象]”“[对象对象]”“[对象对象]”“[对象对象]”“[对象对象]”“[对象对象]结果是 – Mez

+0

实际上,当我将这行代码更改为“Data:event”时,我在S3中得到了例子:{“Records”:[{“kinesis”:{“kinesisSchemaVersion”:“1.0”,“partitionKey “:”2a4bb9d9-a023-4c03-8616-ef3e7c567459“,”sequenceNumber“:”49571132156681255058105982946244422009241197082071531522“,”data“:”ewogICJyb3dJZCIgOiA3MjEzMTU0NSwKICAi ......为什么我没有得到上面发送的实际JSON对象? – Mez

+0

我认为这是因为当设置数据record.setData(ByteBuffer.wrap(json.getBytes()))...我需要转换回utf8。 – Mez

0

对于那些想知道所需的代码更改...要写入S3从生产者发送的实际信息,需要被解码的PutRecordsRequestEntry的数据属性。换句话说,这些代码块表示使用的依赖性,拉姆达从室壁运动流解析数据...

var AWS = require('aws-sdk'); 
var firehose = new AWS.Firehose(); 
var firehoseStreamName = "transaction-postings"; 

exports.handler = function(event, context) { 

    // This is the actual transaction, encapsulated with Kinesis Put properties 
    var transaction = event; 

    // Convert data object because this is all that we need 
    var buf = new Buffer(transaction.data, "base64"); 

    // Convert to actual string which is readable 
    var jsonString = buf.toString("utf8"); 

    // Prepare storage to postings firehose stream... 
    var params = { 
     DeliveryStreamName: firehoseStreamName, 
     Record: { 
      Data: jsonString 
     } 
    }; 

    // Store data! 
    firehose.putRecord(params, function(err, data) { 
     if (err) { 

      // This needs to be fired to Kinesis in the future... 
      console.log(err, err.stack); 
     } 
     else{ 
      console.log(data);    
     } 

     context.done(); 
    }); 
}; 

这是因为,记录使用AWS生成器依赖性下面

<dependency> 
     <groupId>com.amazonaws</groupId> 
     <artifactId>amazon-kinesis-producer</artifactId> 
     <version>0.12.3</version> 
    </dependency> 

发送会看起来像这样;

{ 
    "kinesisSchemaVersion": "1.0", 
    "partitionKey": "cb3ff3cd-769e-4d48-969d-918b5378e81b", 
    "sequenceNumber": "49571132156681255058105982949134963643939775644952428546", 
    "data": "[base64 string]", 
    "approximateArrivalTimestamp": 1490191017.614 
}