我有一个kinesis流,其中包含一个分片和一个用python编写的lambda函数。我添加了kinesis流作为批处理大小为5的事件源。我将几百条记录添加到kinesis中,并且lambda函数被正确调用和执行。但是对于最后3条记录,即使函数返回成功,lambda函数也会无限调用。AWS Lambda函数从kinesis流中无限读取记录
lambda表达式:
from __future__ import print_function
import base64
import json
import urllib2
import json
print('Loading function')
def is_valid_url(url):
try:
urllib2.urlopen(url)
print("Valid URL ...")
return True # URL Exist
except ValueError, ex:
print("URL is not formatted...")
return False # URL not well formatted
except urllib2.URLError, ex:
print("Invalid URL ...")
return False # URL don't seem to be alive
def lambda_handler(event, context):
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
payload = base64.b64decode(record['kinesis']['data'])
params = json.loads(payload)
print("Decoded payload: " + payload + " : " + str(is_valid_url(params['url'])) + " : " + str(len(event['Records'])))
return 'Successfully processed {} records.'.format(len(event['Records']))
````
当我看着云watch日志
START RequestId: d6033244-1c43-40ea-8886-f38b8c48daa3 Version: $LATEST
Loading function
Valid URL ...
Decoded payload: { "url": "https://google.com" }
Valid URL ...
Decoded payload: { "url": "https://google.com" }
Valid URL ...
Decoded payload: { "url": "https://google.com" }
Valid URL ...
Decoded payload: { "url": "https://google.com" }
END RequestId: d6033244-1c43-40ea-8886-f38b8c48daa3
REPORT RequestId: d6033244-1c43-40ea-8886-f38b8c48daa3 Duration: 3003.00 ms Billed Duration: 3000 ms Memory Size: 128 MB Max Memory Used: 10 MB
2016-03-04T17:32:01.030Z d6033244-1c43-40ea-8886-f38b8c48daa3 Task timed out after 3.00 seconds
我无法弄清楚什么用lambda函数发生。有人可以提供一些见解这个错误。
关于每批读取仅5个事件的注释:您可以快速开始在处理中积累延迟每秒1000个事件到碎片,并应计划在每秒处理所有这些事件。 – Guy