2016-02-27 52 views
2

我是Apache Hadoop的初学者,并尝试使用Apache的Word Counting程序,它工作正常。但现在我想制作自己的户外温度计算程序来计算每日平均值。平均计算不符合我的预期;没有组合&数据的平均。Apache Hadoop并没有在我的程序中进行组合和减少工作,它应该这样做

更具体地讲,这里是我sample2.txt输入文件的一部分:

25022016 00:00:00 -10.3 
25022016 00:01:00 -10.3 
25022016 00:02:00 -10.3 
25022016 00:03:00 -10.3 
... 
25022016 00:59:00 -11.2 

而且我想有应该是输出:

25022016 7.9 

这是平均该日期的所有温度观测值。所以我有60个观察结果,需要一个平均值。将来我想在同一个程序的更多日子里处理更多的观测数据。 1.列是日期(文本),2.时间和第三个是温度。温度计算是在代码中用Java的float数据类型完成的。

现在发生的是,该输出是:

25022016 -10.3 
25022016 -10.3 
25022016 -10.3 
25022016 -10.3 
... 
25022016 -11.2 

所以平均计数的每一个观测(从一个数计算一个数的平均值)。我希望获得60个观测值的平均值(一个数字)!

所以我的输入和输出文件都在上面。我的Java代码(我在Windows 7上运行 - > VirtualBox的 - > Ubuntu的64位)以下:


package hadoop; 

import java.io.IOException; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.FloatWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 

import org.apache.commons.cli.Options; 

public class ProcessUnits2 
{ 
    public static class E_EMapper extends 
    Mapper<Object, Text, Text, FloatWritable> 
    { 
     private FloatWritable temperature = new FloatWritable(); 
     private Text date = new Text();  

     public void map(Object key, Text value, 
     Context context) throws IOException, InterruptedException 
     { 
      StringTokenizer dateTimeTemperatures = new StringTokenizer(value.toString()); 

      while(dateTimeTemperatures.hasMoreTokens()) { 
       date.set(dateTimeTemperatures.nextToken()); 

       while(dateTimeTemperatures.hasMoreTokens()) { 
        dateTimeTemperatures.nextToken();  
        temperature.set(Float.parseFloat(dateTimeTemperatures.nextToken())); 

        context.write(date, temperature); 
       } 
      } 
     } 
    } 


    public static class E_EReduce extends Reducer<Text,Text,Text,FloatWritable> 
    { 
     private FloatWritable result = new FloatWritable(); 

     public void reduce(Text key, Iterable<FloatWritable> values, Context context 
     ) throws IOException, InterruptedException 
     { 
      float sumTemperatures=0, averageTemperature; 
      int countTemperatures=0; 

      for (FloatWritable val : values) { 
       sumTemperatures += val.get(); 
       countTemperatures++; 
      } 

      averageTemperature = sumTemperatures/countTemperatures; 

      result.set(averageTemperature); 
      context.write(key, result); 

     } 
    } 

    public static void main(String args[])throws Exception 
    { 
     Configuration conf = new Configuration(); 
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 

     if (otherArgs.length < 2) { 
      System.err.println("Usage: wordcount <in> [<in>...] <out>"); 
      System.exit(2); 
     } 
     Job job = Job.getInstance(conf, "VuorokaudenKeskilampotila"); 
     job.setJarByClass(ProcessUnits2.class); 

     job.setMapperClass(E_EMapper.class); 
     job.setCombinerClass(E_EReduce.class); 
     job.setReducerClass(E_EReduce.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(FloatWritable.class); 
     for (int i = 0; i < otherArgs.length - 1; ++i) { 
      FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 
     } 
     FileOutputFormat.setOutputPath(job, 
     new Path(otherArgs[otherArgs.length - 1])); 
     job.setNumReduceTasks(0); 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 
--------------------------------------------------- 

的Hadoop版本是2.7.2和Ubuntu 14.04 LTS。我以独立模式运行hadoop(最基本的设置)。

下面是命令我用它来构建程序(如果有帮助?):

rm -rf output2 
javac -Xdiags:verbose -classpath hadoop-core-1.2.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar -d units2 ProcessUnits2.java 
jar -cvf units2.jar -C units2/ . 
hadoop jar units2.jar hadoop.ProcessUnits2 input2 output2 
cat output2/part-m-00000 

作为一个初学者我很困惑,并在我的脑海Hadoop是不是在这里做任何合并&减少(=平均)在它的默认设置中工作,这应该是最终目的。我承认我从这里到那里(代码)都选择了代码,因为没有任何工作,我相信这只是解决方案的一小步,但我无法猜测它是什么。我可以很容易地用C++做到这一点,而不需要任何减少地图的框架,但是我希望基本操作能够正常工作,这样我就可以继续使用更复杂的示例,并且最终生产使用和真正的分布式映射 - 合并 - 减少。

我会很感激任何形式的帮助。我现在陷入了困境(很多时候......)。如果您需要任何额外的数据来帮助寻找解决方案,我会发送它们。

+0

谢谢吉姆提出这个更好的问题! –

回答

2

您没有正确实施减速器。它应该是:

public static class E_EReduce extends Reducer<Text, FloatWritable, Text, FloatWritable> 
{ 
    @Override 
    public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException 
    { 

永远不要忘记@Override,否则编译器不会赶上错误。

+0

我做出了改变,行为与以前相似。我想映射器输出: –

+0

新的审判;我的5分钟评论时间没有了:我做出了改变,行为与以前相似。我希望mapper输出为25022016 -10.3 25022016 -17.6 ...并希望减速器在同一日期获得同样的关键值,并且可以重复所有六十个温度值。为什么没有类似键的温度(= 25022016)结合在一起,我无法在一次缩减调用中迭代它们?这是关于减速器设置,全局hadoop配置还是其他?无论如何,谢谢你的帮助! –

1

现在我发现是什么问题:

行:

job.setNumReduceTasks(0); 

说没有减速。我将它改为job.setNumReduceTasks(1);,甚至完全删除它,现在程序运行。为什么它在那里? =>因为遇到麻烦,你尽可能地尝试一切,没有时间阅读文档。

谢谢所有参与的人。我继续研究这个系统。

+2

完全删除它相当于默认的'job.setNumReduceTasks(1);' – vefthym