2013-03-04 183 views
4

我试图通过Java SDK运行EMR作业。从Java SDK运行Amazon EMR作业

但它根本不启动。

我正在粘贴我正在使用的代码。我也看了documentation。但这并没有太大的帮助。

 package com.zedo.aws.emr; 

    import com.amazonaws.auth.AWSCredentials; 
    import com.amazonaws.auth.BasicAWSCredentials; 
    import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient; 
    import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig; 
    import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest; 
    import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult; 
    import com.amazonaws.services.elasticmapreduce.model.StepConfig; 
    import com.amazonaws.services.elasticmapreduce.util.StepFactory; 

public class ExampleEMR { 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 

     AWSCredentials credentials = new BasicAWSCredentials("<my key>", "<my secret key>"); 
     AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials); 

     StepFactory stepFactory = new StepFactory(); 

     StepConfig enableDebugging = new StepConfig() 
      .withName("Enable Debugging") 
      .withActionOnFailure("TERMINATE_JOB_FLOW") 
      .withHadoopJarStep(stepFactory.newEnableDebuggingStep()); 

     StepConfig installHive = new StepConfig() 
      .withName("Install Hive") 
      .withActionOnFailure("TERMINATE_JOB_FLOW") 
      .withHadoopJarStep(stepFactory.newInstallHiveStep()); 

     StepConfig hiveScript = new StepConfig().withName("Hive Script") 
      .withActionOnFailure("TERMINATE_JOB_FLOW") 
      .withHadoopJarStep(stepFactory.newRunHiveScriptStep("s3://<path to script>")); 

     RunJobFlowRequest request = new RunJobFlowRequest() 
      .withName("Hive Interactive") 
      .withSteps(enableDebugging, installHive) 
      .withLogUri("s3://myawsbucket/") 
      .withInstances(new JobFlowInstancesConfig() 
       .withEc2KeyName("<my key>") 
       .withHadoopVersion("0.20") 
       .withInstanceCount(5) 
       .withKeepJobFlowAliveWhenNoSteps(true) 
       .withMasterInstanceType("m1.small") 
       .withSlaveInstanceType("m1.small")); 

     RunJobFlowResult result = emr.runJobFlow(request); 

    } 

} 

或者可以有人指向我的一些例子链接?

回答

-1

我已经解决了这个问题,通过在我的末尾更正密钥。

5

这项工作对我来说:

public void runScriptClientes(Calendar executionDate) { 

    // creacion credecencial s3 
    BasicAWSCredentials awsCreds = new BasicAWSCredentials(rb.getString("awsAccessKey"), 
      rb.getString("awsSecretKey")); 

    // creacion cliente para conectarse s3 
    AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(awsCreds); 
    emr.setRegion(Region.getRegion(Regions.EU_WEST_1)); 

    // calculo de las carpeta a procesar 
    Map<String, FolderS3> s3DataToProcessInput = getRutasInput(executionDate); 
    //Map<String, Boolean> s3DataToProcessOut = getRutaInput(); 

    for (Entry<String, FolderS3> bucket_ : s3DataToProcessInput.entrySet()){ 
     String nameBucket = bucket_.getKey(); 
     FolderS3 folderS3 = bucket_.getValue(); 
     // verificar existencia bucket 
     if(folderS3.getExistInBucket()){ 
      listaConcurrente.add(folderS3); 
      StepFactory stepFactory = new StepFactory(); 

      StepConfig stepHive = new StepConfig() 
        .withName(rb.getString("nameStepClientesS3")+":"+nameBucket)/*nombre del step a ejecutar*/ 
        .withActionOnFailure(ActionOnFailure.CONTINUE) /*accion a seguir si el step falla*/ 
        .withHadoopJarStep(
          stepFactory.newRunHiveScriptStep(rb.getString("scriptClienteS3"), 
            "-d", "s3DataToProcess=s3://"+rb.getString("bucketPropio")+"/"+rb.getString("ruta_input_c1")+folderS3.getNameKey(), 
            "-d", "s3DataToProcessOut=s3://"+rb.getString("bucketPropioOUT")+"/"+rb.getString("ruta_output_c1")+folderS3.getOutputFolder(), 
            "-d", "windowTime=tablaparametro")); 

      AddJobFlowStepsRequest jobFlow = new AddJobFlowStepsRequest().withJobFlowId(rb.getString("jobflowID")) 
        .withSteps(stepHive); 

      //mientras el estado sea pending o running 
      AddJobFlowStepsResult result = emr.addJobFlowSteps(jobFlow); 
      List<String> id = result.getStepIds(); 
      DescribeStepRequest describe = new DescribeStepRequest().withStepId(id.get(0)); 
      describe.setClusterId(rb.getString("jobflowID")); 
      describe.setRequestCredentials(awsCreds); 
      DescribeStepResult res = emr.describeStep(describe); 
      StepStatus status = res.getStep().getStatus(); 
      String stas = status.getState(); 

      while (stas.equals(StepExecutionState.PENDING.name()) || stas.equals(StepExecutionState.RUNNING.name())){ 
       try { 
        Thread.sleep(5000); 
        res = emr.describeStep(describe); 
        status = res.getStep().getStatus(); 
        stas = status.getState(); 
        log.info(stas); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 

      if (stas.equals(StepExecutionState.COMPLETED.name())) { 
       folderS3.setProcessedInput(Boolean.TRUE); 
       listaConcurrente.remove(folderS3); 
       log.info("Step finalizado ok : "+folderS3); 
      }else if(stas.equals(StepExecutionState.FAILED.name()) || stas.equals(StepExecutionState.CANCELLED.name())){ 
       listaConcurrente.remove(folderS3); 
       folderS3.setProcessedInput(Boolean.FALSE); 
       listaConcurrente.add(folderS3); 
       log.info("Step Fallo o fue Cancelado : "+folderS3); 
      } 

      // leer datos del resultado y cargar en BBDD 

     } 
    } 
} 
+0

如何获得''jobflowID''? – coderz 2015-02-06 14:33:55