2017-06-22 219 views
2

我已经写了一个从s3存储区触发的lambda来解压zip文件并处理里面的文本文档。由于lambda的内存限制,我需要将我的进程移至类似AWS批处理的内容。纠正我,如果我错了,但我的工作流程应该看起来像这样。AWS Lambda/Aws批处理工作流程

work flow

我beleive我需要写一个lambda把S3桶的位置上亚马逊SQS是一个AWS一批可以读取的位置和做所有的解压缩/数据处理他们的是他们的是更多的内存。

这是我当前的lambda,它接受s3存储桶触发的事件,检查它是否为zip文件,然后将s3 Key的名称推送到SQS。 我应该告诉AWS批处理在我的lambda中开始阅读队列吗? 我完全陌生于AWS,不确定是否会从这里出发。

public class dockerEventHandler implements RequestHandler<S3Event, String> { 

private static BigData app = new BigData(); 
private static DomainOfConstants CONST = new DomainOfConstants(); 
private static Logger log = Logger.getLogger(S3EventProcessorUnzip.class); 

private static AmazonSQS SQS; 
private static CreateQueueRequest createQueueRequest; 
private static Matcher matcher; 
private static String srcBucket, srcKey, extension, myQueueUrl; 

@Override 
public String handleRequest(S3Event s3Event, Context context) 
{ 
    try { 
     for (S3EventNotificationRecord record : s3Event.getRecords()) 
     { 
      srcBucket = record.getS3().getBucket().getName(); 
      srcKey = record.getS3().getObject().getKey().replace('+', ' '); 
      srcKey = URLDecoder.decode(srcKey, "UTF-8"); 
      matcher = Pattern.compile(".*\\.([^\\.]*)").matcher(srcKey); 

      if (!matcher.matches()) 
      { 
       log.info(CONST.getNoConnectionMessage() + srcKey); 
       return ""; 
      } 
      extension = matcher.group(1).toLowerCase(); 

      if (!"zip".equals(extension)) 
      { 
       log.info("Skipping non-zip file " + srcKey + " with extension " + extension); 
       return ""; 
      } 
      log.info("Sending object location to key" + srcBucket + "//" + srcKey); 

      //pass in only the reference of where the object is located 
      createQue(CONST.getQueueName(), srcKey); 
     } 
    } 
    catch (IOException e) 
    { 
     log.error(e);   
    } 
    return "Ok"; 
} 

/* 
* 
* Setup connection to amazon SQS 
* TODO - Find updated api for sqs connection to eliminate depreciation 
* 
* */ 
@SuppressWarnings("deprecation") 
public static void sQSConnection() { 
    app.setAwsCredentials(CONST.getAccessKey(), CONST.getSecretKey());  
    try{ 
     SQS = new AmazonSQSClient(app.getAwsCredentials()); 
     Region usEast1 = Region.getRegion(Regions.US_EAST_1); 
     SQS.setRegion(usEast1); 
    } 
    catch(Exception e){ 
     log.error(e);  
    } 
} 

//Create new Queue 
public static void createQue(String queName, String message){ 
    createQueueRequest = new CreateQueueRequest(queName); 
    myQueueUrl = SQS.createQueue(createQueueRequest).getQueueUrl(); 
    sendMessage(myQueueUrl,message); 
} 

//Send reference to the s3 objects location to the queue 
public static void sendMessage(String SIMPLE_QUE_URL, String S3KeyName){ 
    SQS.sendMessage(new SendMessageRequest(SIMPLE_QUE_URL, S3KeyName)); 
} 

//Fire AWS batch to pull from que 
private static void initializeBatch(){ 
    //TODO 
} 

我已经安装了docker并且了解了docker镜像。我相信我的Docker镜像应该包含读取队列的所有代码,将文件解压缩,处理并将这些文件整合到一个docker镜像/容器中。

我在找一个有类似工作的人,他们可以分享来帮忙。

S3先生:沿东西线嗨拉姆达我有一个文件

LAMBDA先生:好吧S3我看你,嘿嘿AWS批你能解压缩和做的东西这个

先生批处理:Gotchya mr lambda,生病了,并将其放入RDS或某些数据库之后。

我还没有编写类/泊坞窗的图像,但我已经完成了所有的代码来处理/解压缩并启动rds完成。由于一些文件大于或等于1GB,Lambda仅限于内存。

+0

也许一个不同的方向,任何人都可以告诉我一个拉姆达的例子,触发EMR火花或什么东西关闭传入的S3桶触发器? –

回答

1

好了,通过批量查看AWS文档后,您不需要SQS队列。批处理有一个称为作业队列的概念,它类似于SQS FIFO队列,但不同之处在于这些作业队列具有优先级,其中的作业可能依赖于其他作业。其基本过程是:

  1. 首先怪异的一部分是建立IAM角色,使用集装箱代理可以倾诉的集装箱服务,AWS一批能够开展各种实例时,它需要(也有一个独立的角色如果你确实需要实例)。有关所需权限的详细信息,请参阅本文档(PDF),网址为around page 54
  2. 现在,当你完成设置计算环境。这些是EC2按需或现场实例,它们持有您的容器。作业在容器级别上运行。这个想法是,您的计算环境是您的作业容器可以使用的最大资源分配。一旦达到这个限制,你的工作必须等待资源被释放。
  3. 现在您创建一个作业队列。这将作业与您创建的计算环境关联起来。
  4. 现在您创建一个作业定义。那么,从技术上讲,你不必通过lambda来做到这一点,但是这使得事情变得更简单。你的工作定义将指出你的工作需要什么样的容器资源(当然你也可以用lambda覆盖)
  5. 现在完成了这一切,你会想创建一个lambda函数。这将由您的S3存储桶事件触发。该功能需要必要的IAM权限才能针对批处理服务(以及任何其他权限)运行提交作业。基本上所有的lambda需要做的就是将提交作业调用到AWS批处理。您需要的基本参数是作业队列和作业定义。您还需要将所需zip的S3密钥设置为作业的参数。
  6. 现在,当触发相应的S3事件时,它会调用lambda,然后将该作业提交给AWS批处理作业队列。然后假设设置是好的,它会愉快地提取资源来处理你的工作。请注意,根据分配的EC2实例大小和容器资源,这可能需要一点时间(比准备Lambda函数要长得多)。
+0

谢谢,这个文件清理了一下。我不确定为什么,但是我很难理解这些文档,但是我对AWS和云计算也很新。所以选择不使用SQS,因为我确实开始注意到它与批处理队列不同。相反,我有我的lambda发送PUT触发器中的每个新的s3密钥(zip)到数据库以存储所有记录/ s3keys。从他们我会刚从批处理/码头集装箱数据表中读取,然后删除它完成后。接缝是简单的解决方案。我不能找到Java API提交工作?? –

+0

@JohnHanewich提交作业的Java API调用可以在[这里找到](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/batch/AbstractAWSBatch.html#submitJob-com .amazonaws.services.batch.model.SubmitJobRequest-)。另一种选择是使用SQS来存储S3密钥,然后让CloudWatch触发一个警报来激活一个实例。该实例(具有适当的IAM角色)可以利用运行实际脚本的用户数据脚本来处理数据(自定义AMI或具有用户数据从某处拖动脚本)。 –