2016-02-29 64 views
0

我将在一个更大的文件中执行以下操作。现在,我有一个具有以下值的示例输入文件。Hadoop Mapreduce:reducer的值是相反的顺序

1000,SMITH,JERRY 
1001,JOHN,TIA 
1002,TWAIN,MARK 
1003,HARDY,DENNIS 
1004,CHILD,JACK 
1005,CHILD,NORTON 
1006,DAVIS,JENNY 
1007,DAVIS,KAREN 
1008,MIKE,JOHN 
1009,DENNIS,SHERIN 

现在我正在做的是运行mapreduce作业来加密每个记录的姓氏并写回输出。我使用映射器分区号作为键和修改后的文本作为值。

所以从映射器输出将是,

0 1000,Mj4oJyk=,,JERRY 
0 1001,KzwpPQ,TIA 
0 1002,NSQgOi8,MARK 
0 1003,KTIzNzg,DENNIS 
0 1004,IjsoPyU,JACK 
0 1005,IjsoPyU,NORTON 
0 1006,JTI3OjI,JENNY 
0 1007,JTI3OjI,KAREN 
0 1008,LDoqNg,JOHN 
0 1009,JTYvPSgg,SHERIN 

我不希望有任何的排序是done.I还使用了减速,因为在一个较大的文件的情况下,将有多个映射器和如果没有reducer,则会写入多个输出文件。所以我使用单个reduce从所有映射器合并值并写入单个文件。 现在,减速器的输入值以相反顺序出现,并且按照映射器的顺序排列。它如下所示:

1009,JTYvPSgg,SHERIN 
1008,LDoqNg==,JOHN 
1007,JTI3OjI=,KAREN 
1006,JTI3OjI=,JENNY 
1005,IjsoPyU=,NORTON 
1004,IjsoPyU=,JACK 
1003,KTIzNzg=,DENNIS 
1002,NSQgOi8=,MARK 
1001,KzwpPQ==,TIA 
1000,Mj4oJyk=,JERRY 

为什么它颠倒了顺序?以及我如何维护mapper的相同顺序?任何建议将是有益的

EDIT 1:

驱动程序代码,

Configuration conf = new Configuration(); 
Job job = Job.getInstance(conf); 
    job.setJobName("encrypt"); 
    job.setJarByClass(TestDriver.class); 
    job.setMapperClass(TestMap.class); 
    job.setNumReduceTasks(1); 
    job.setReducerClass(TestReduce.class); 
    job.setMapOutputKeyClass(IntWritable.class); 
    job.setMapOutputValueClass(Text.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 

    FileInputFormat.addInputPath(job, new Path(hdfsInputPath)); 
    FileOutputFormat.setOutputPath(job, new Path(hdfsOutputPath)); 
System.exit(job.waitForCompletion(true) ? 0 : 1); 

映射器代码是,

 inputValues = value.toString().split(","); 
     stringBuilder = new StringBuilder(); 
     TaskID taskId = context.getTaskAttemptID().getTaskID(); 
     int partition = taskId.getId(); 

// the mask(inputvalue) method is called to encrypt input values and write to stringbuilder in appropriate format 
     mask(inputvalues); 
     context.write(new IntWritable(partition), new Text(stringBuilder.toString())); 

减速器代码是,

 for(Text value : values) { 
     context.write(new Text(value), null); 
     } 
+1

请分享您的代码映射器和缩减器代码。直到我知道框架排序键上的地图输出,然后传递给reducer任务,由于这可能会倒过来。 –

回答

0

MapReduce的基本思想是事物完成的顺序是不相关的。 所以你不能(也不需要)控制

  • 输入记录通过映射器的顺序。
  • 关键和相关的值通过减速器。

您可以控制的唯一一件事是值放置在reducer中可用的迭代器中的顺序。

为此,您可以使用Object key来维护值的顺序。 LongWritable部分(或关键字)是文件中行的位置(不是行号,而是文件起始位置)。 您可以使用该部分来跟踪哪一行是第一行。

那么你的映射器代码将改为

protected void map(Object key, Text value, Mapper<Object, Text, LongWritable, Text>.Context context) 
     throws IOException, InterruptedException { 
    inputValues = value.toString().split(","); 
    stringBuilder = new StringBuilder(); 
    mask(inputValues); 
    // the mask(inputvalue) method is called to encrypt input values and write to stringbuilder in appropriate format 
    context.write(new LongWritable(((LongWritable) key).get()), value); 

} 

注意:您可以更改您的代码中的所有IntWritableLongWritable,但要小心。

0
inputValues = value.toString().split(","); 
    stringBuilder = new StringBuilder(); 
    TaskID taskId = context.getTaskAttemptID().getTaskID(); 
    //preserve the number value for sorting 
    IntWritable idNumber = new IntWritable(Integer.parseInt(inputValue[0]) 

    // the mask(inputvalue) method is called to encrypt input values and write to stringbuilder in appropriate format 
    mask(inputvalues); 
    context.write(idNumber, new Text(stringBuilder.toString())); 

我做了一些假设,因为您没有完整的mapper代码。由于toString()输出,我认为inputValues是一个字符串数组。数组的第一个值应该是来自输入的数值,但现在它是一个字符串。您必须将数字转换回IntWritable以匹配您的映射器正在发射的内容IntWritable,Text。 hadoop框架将按键排序,键为IntWritable的键将按升序排序。您提供的代码是使用任务ID并通过阅读API https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapred/TaskAttemptID.html#getTaskID()目前还不清楚这是否会按照您的意愿为您的值提供订单。要控制输出的顺序,我建议使用字符串数组的第一个值并将其转换为IntWritable。我不知道这是否违反了你掩盖inputValues的意图。

编辑

要使用您的评论跟进。您可以简单地乘以partition-1这将导致hadoop框架颠倒顺序。

int partition = -1*taskId.getId(); 
+0

你说的话可以做。但就像我之前提到的那样,我给出的数据只是一个例子。在实际情况下,我将使用的数据并不总是具有ID列。所以,我不能采用这种方法!不过谢谢你的解决方案。这可能是真正有用的另一种情况下,我正在研究:) – abisheksampath

+0

颠倒顺序的另一种方式是在从映射器中发射它之前,乘以“int partition * -1”。这将导致hadoop框架以相反的顺序对其进行排序。 –