我已经写了一个从s3存储区触发的lambda来解压zip文件并处理里面的文本文档。由于lambda的内存限制,我需要将我的进程移至类似AWS批处理的内容。纠正我,如果我错了,但我的工作流程应该看起来像这样。AWS Lambda/Aws批处理工作流程
我beleive我需要写一个lambda把S3桶的位置上亚马逊SQS是一个AWS一批可以读取的位置和做所有的解压缩/数据处理他们的是他们的是更多的内存。
这是我当前的lambda,它接受s3存储桶触发的事件,检查它是否为zip文件,然后将s3 Key的名称推送到SQS。 我应该告诉AWS批处理在我的lambda中开始阅读队列吗? 我完全陌生于AWS,不确定是否会从这里出发。
public class dockerEventHandler implements RequestHandler<S3Event, String> {
private static BigData app = new BigData();
private static DomainOfConstants CONST = new DomainOfConstants();
private static Logger log = Logger.getLogger(S3EventProcessorUnzip.class);
private static AmazonSQS SQS;
private static CreateQueueRequest createQueueRequest;
private static Matcher matcher;
private static String srcBucket, srcKey, extension, myQueueUrl;
@Override
public String handleRequest(S3Event s3Event, Context context)
{
try {
for (S3EventNotificationRecord record : s3Event.getRecords())
{
srcBucket = record.getS3().getBucket().getName();
srcKey = record.getS3().getObject().getKey().replace('+', ' ');
srcKey = URLDecoder.decode(srcKey, "UTF-8");
matcher = Pattern.compile(".*\\.([^\\.]*)").matcher(srcKey);
if (!matcher.matches())
{
log.info(CONST.getNoConnectionMessage() + srcKey);
return "";
}
extension = matcher.group(1).toLowerCase();
if (!"zip".equals(extension))
{
log.info("Skipping non-zip file " + srcKey + " with extension " + extension);
return "";
}
log.info("Sending object location to key" + srcBucket + "//" + srcKey);
//pass in only the reference of where the object is located
createQue(CONST.getQueueName(), srcKey);
}
}
catch (IOException e)
{
log.error(e);
}
return "Ok";
}
/*
*
* Setup connection to amazon SQS
* TODO - Find updated api for sqs connection to eliminate depreciation
*
* */
@SuppressWarnings("deprecation")
public static void sQSConnection() {
app.setAwsCredentials(CONST.getAccessKey(), CONST.getSecretKey());
try{
SQS = new AmazonSQSClient(app.getAwsCredentials());
Region usEast1 = Region.getRegion(Regions.US_EAST_1);
SQS.setRegion(usEast1);
}
catch(Exception e){
log.error(e);
}
}
//Create new Queue
public static void createQue(String queName, String message){
createQueueRequest = new CreateQueueRequest(queName);
myQueueUrl = SQS.createQueue(createQueueRequest).getQueueUrl();
sendMessage(myQueueUrl,message);
}
//Send reference to the s3 objects location to the queue
public static void sendMessage(String SIMPLE_QUE_URL, String S3KeyName){
SQS.sendMessage(new SendMessageRequest(SIMPLE_QUE_URL, S3KeyName));
}
//Fire AWS batch to pull from que
private static void initializeBatch(){
//TODO
}
我已经安装了docker并且了解了docker镜像。我相信我的Docker镜像应该包含读取队列的所有代码,将文件解压缩,处理并将这些文件整合到一个docker镜像/容器中。
我在找一个有类似工作的人,他们可以分享来帮忙。
S3先生:沿东西线嗨拉姆达我有一个文件
LAMBDA先生:好吧S3我看你,嘿嘿AWS批你能解压缩和做的东西这个
先生批处理:Gotchya mr lambda,生病了,并将其放入RDS或某些数据库之后。
我还没有编写类/泊坞窗的图像,但我已经完成了所有的代码来处理/解压缩并启动rds完成。由于一些文件大于或等于1GB,Lambda仅限于内存。
也许一个不同的方向,任何人都可以告诉我一个拉姆达的例子,触发EMR火花或什么东西关闭传入的S3桶触发器? –