2013-08-03 63 views
9

我试图使用AWS管道从S3斗CSV数据传输到DynamoDB,以下是我管行脚本,它不能正常工作,AWS数据管道CSV数据DynamoDB

CSV文件结构

Name, Designation,Company 

A,TL,C1 

B,Prog, C2 

DynamoDb:N_Table,与名称的哈希值

{ 
"objects": [ 
    { 
     "id": "Default", 
     "scheduleType": "cron", 
     "name": "Default", 
     "role": "DataPipelineDefaultRole", 
     "resourceRole": "DataPipelineDefaultResourceRole" 
    }, 
    { 
     "id": "DynamoDBDataNodeId635", 
     "schedule": { 
      "ref": "ScheduleId639" 
     }, 
     "tableName": "N_Table", 
     "name": "MyDynamoDBData", 
     "type": "DynamoDBDataNode" 
    }, 
    { 
     "emrLogUri": "s3://onlycsv/error", 
     "id": "EmrClusterId636", 
     "schedule": { 
      "ref": "ScheduleId639" 
     }, 
     "masterInstanceType": "m1.small", 
     "coreInstanceType": "m1.xlarge", 
     "enableDebugging": "true", 
     "installHive": "latest", 
     "name": "ImportCluster", 
     "coreInstanceCount": "1", 
     "logUri": "s3://onlycsv/error1", 
     "type": "EmrCluster" 
    }, 
    { 
     "id": "S3DataNodeId643", 
     "schedule": { 
      "ref": "ScheduleId639" 
     }, 
     "directoryPath": "s3://onlycsv/data.csv", 
     "name": "MyS3Data", 
     "dataFormat": { 
      "ref": "DataFormatId1" 
     }, 
     "type": "S3DataNode" 
    }, 
    { 
     "id": "ScheduleId639", 
     "startDateTime": "2013-08-03T00:00:00", 
     "name": "ImportSchedule", 
     "period": "1 Hours", 
     "type": "Schedule", 
     "endDateTime": "2013-08-04T00:00:00" 
    }, 
    { 
     "id": "EmrActivityId637", 
     "input": { 
      "ref": "S3DataNodeId643" 
     }, 
     "schedule": { 
      "ref": "ScheduleId639" 
     }, 
     "name": "MyImportJob", 
     "runsOn": { 
      "ref": "EmrClusterId636" 
     }, 
     "maximumRetries": "0", 
     "myDynamoDBWriteThroughputRatio": "0.25", 
     "attemptTimeout": "24 hours", 
     "type": "EmrActivity", 
     "output": { 
      "ref": "DynamoDBDataNodeId635" 
     }, 
     "step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,s3://elasticmapreduce/libs/hive/hive-script,--run-hive-script,--hive-versions,latest,--args,-f,s3://elasticmapreduce/libs/hive/dynamodb/importDynamoDBTableFromS3,-d,DYNAMODB_OUTPUT_TABLE=#{output.tableName},-d,S3_INPUT_BUCKET=#{input.directoryPath},-d,DYNAMODB_WRITE_PERCENT=#{myDynamoDBWriteThroughputRatio},-d,DYNAMODB_ENDPOINT=dynamodb.us-east-1.amazonaws.com" 
    }, 
    { 
     "id": "DataFormatId1", 
     "name": "DefaultDataFormat1", 
     "column": [ 
      "Name", 
      "Designation", 
      "Company" 
     ], 
     "columnSeparator": ",", 
     "recordSeparator": "\n", 
     "type": "Custom" 
    } 
] 

}

出的四个步骤在执行管道,两者得到完成,但它并没有完全执行

回答

0

我会建议使用datapipeline,而不是定制提供的CSV数据格式。

为了调试集群中的错误,你可以查找在EMR控制台jobflow并查看失败任务的日志文件。

5

目前(2015-04)默认进口管道模板不支持导入CSV文件。

如果您的CSV文件不是太大(不到1GB左右),您可以创建一个ShellCommandActivity,将CSV转换为DynamoDB JSON格式,并将EmrActivity将其导入到您的表格中。

正如你可以创建样品DynamoDB表,包括你所需要的,填充虚拟值,然后用管道(导出/导入按钮DynamoDB控制台)导出记录的所有字段类型的第一步。这将为您提供有关导入管道预期格式的想法。类型名称不明显,导入活动对于正确的大小写非常敏感(例如,您应该为boolean字段使用bool)。

之后,应该很容易创建一个awk脚本(或任何其他文本转换器,至少在awk中,您可以使用默认的AMI映像作为您的shell活动),您可以将它们提供给shellCommandActivity。不要忘记启用“暂存”标记,因此您的输出将上传回S3以供导入活动提取。