我正在使用AWS SDK从将数据发布到Kinesis流的Java应用程序写入数据。使用下面的代码一次批量完成10条记录;将Kinesis中的数据写入S3
// Convert to JSON object, and then to bytes...
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = ow.writeValueAsString(transaction);
// Add byte array to PutRecordsRequestEntry
PutRecordsRequestEntry record = new PutRecordsRequestEntry();
record.setPartitionKey(String.valueOf(java.util.UUID.randomUUID()));
record.setData(ByteBuffer.wrap(json.getBytes()));
// Add to list...
batch.add(record);
// Check and send batches
if(counter>=batchLimit){
logger.info("Sending batch of " + batchLimit + " rows.");
putRecordsRequest.setRecords(batch);
PutRecordsResult result = amazonKinesisClient.putRecords(putRecordsRequest);
batch = new ArrayList<>();
counter=0;
}else{
counter++;
}
然后我有室壁运动收到了被触发。每笔交易的NodeJS lambda函数,这种想法是为它写从室壁运动来进行交易,并把它们转化为他们的数据来流水流保存到S3。
var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();
exports.handler = function(event, context) {
console.log(event);
var params = {
DeliveryStreamName: "transaction-postings",
Record: {
Data: decodeURIComponent(event)
}
};
firehose.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else {
console.log(data); // successful response
}
context.done();
});
};
然而,在S3上的数据看时,我看到的是下面的,并没有像我期待的对象的JSON列表...
[object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object]
可有人请点我什么我错过了将Kinesis中的数据作为JSON对象流向s3吗?
刚做了改变,仍然得到“[object object]”“[object object]”“[object object]”“[object Object]”“[object Object]”“[object Object]” “[对象对象]”“[对象对象]”“[对象对象]”“[对象对象]”“[对象对象]”“[对象对象]结果是 – Mez
实际上,当我将这行代码更改为“Data:event”时,我在S3中得到了例子:{“Records”:[{“kinesis”:{“kinesisSchemaVersion”:“1.0”,“partitionKey “:”2a4bb9d9-a023-4c03-8616-ef3e7c567459“,”sequenceNumber“:”49571132156681255058105982946244422009241197082071531522“,”data“:”ewogICJyb3dJZCIgOiA3MjEzMTU0NSwKICAi ......为什么我没有得到上面发送的实际JSON对象? – Mez
我认为这是因为当设置数据record.setData(ByteBuffer.wrap(json.getBytes()))...我需要转换回utf8。 – Mez