我是Apache Hadoop的初学者,并尝试使用Apache的Word Counting程序,它工作正常。但现在我想制作自己的户外温度计算程序来计算每日平均值。平均计算不符合我的预期;没有组合&数据的平均。Apache Hadoop并没有在我的程序中进行组合和减少工作,它应该这样做
更具体地讲,这里是我sample2.txt输入文件的一部分:
25022016 00:00:00 -10.3
25022016 00:01:00 -10.3
25022016 00:02:00 -10.3
25022016 00:03:00 -10.3
...
25022016 00:59:00 -11.2
而且我想有应该是输出:
25022016 7.9
这是平均该日期的所有温度观测值。所以我有60个观察结果,需要一个平均值。将来我想在同一个程序的更多日子里处理更多的观测数据。 1.列是日期(文本),2.时间和第三个是温度。温度计算是在代码中用Java的float数据类型完成的。
现在发生的是,该输出是:
25022016 -10.3
25022016 -10.3
25022016 -10.3
25022016 -10.3
...
25022016 -11.2
所以平均计数的每一个观测(从一个数计算一个数的平均值)。我希望获得60个观测值的平均值(一个数字)!
所以我的输入和输出文件都在上面。我的Java代码(我在Windows 7上运行 - > VirtualBox的 - > Ubuntu的64位)以下:
package hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.cli.Options;
public class ProcessUnits2
{
public static class E_EMapper extends
Mapper<Object, Text, Text, FloatWritable>
{
private FloatWritable temperature = new FloatWritable();
private Text date = new Text();
public void map(Object key, Text value,
Context context) throws IOException, InterruptedException
{
StringTokenizer dateTimeTemperatures = new StringTokenizer(value.toString());
while(dateTimeTemperatures.hasMoreTokens()) {
date.set(dateTimeTemperatures.nextToken());
while(dateTimeTemperatures.hasMoreTokens()) {
dateTimeTemperatures.nextToken();
temperature.set(Float.parseFloat(dateTimeTemperatures.nextToken()));
context.write(date, temperature);
}
}
}
}
public static class E_EReduce extends Reducer<Text,Text,Text,FloatWritable>
{
private FloatWritable result = new FloatWritable();
public void reduce(Text key, Iterable<FloatWritable> values, Context context
) throws IOException, InterruptedException
{
float sumTemperatures=0, averageTemperature;
int countTemperatures=0;
for (FloatWritable val : values) {
sumTemperatures += val.get();
countTemperatures++;
}
averageTemperature = sumTemperatures/countTemperatures;
result.set(averageTemperature);
context.write(key, result);
}
}
public static void main(String args[])throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "VuorokaudenKeskilampotila");
job.setJarByClass(ProcessUnits2.class);
job.setMapperClass(E_EMapper.class);
job.setCombinerClass(E_EReduce.class);
job.setReducerClass(E_EReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
---------------------------------------------------
的Hadoop版本是2.7.2和Ubuntu 14.04 LTS。我以独立模式运行hadoop(最基本的设置)。
下面是命令我用它来构建程序(如果有帮助?):
rm -rf output2
javac -Xdiags:verbose -classpath hadoop-core-1.2.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar -d units2 ProcessUnits2.java
jar -cvf units2.jar -C units2/ .
hadoop jar units2.jar hadoop.ProcessUnits2 input2 output2
cat output2/part-m-00000
作为一个初学者我很困惑,并在我的脑海Hadoop是不是在这里做任何合并&减少(=平均)在它的默认设置中工作,这应该是最终目的。我承认我从这里到那里(代码)都选择了代码,因为没有任何工作,我相信这只是解决方案的一小步,但我无法猜测它是什么。我可以很容易地用C++做到这一点,而不需要任何减少地图的框架,但是我希望基本操作能够正常工作,这样我就可以继续使用更复杂的示例,并且最终生产使用和真正的分布式映射 - 合并 - 减少。
我会很感激任何形式的帮助。我现在陷入了困境(很多时候......)。如果您需要任何额外的数据来帮助寻找解决方案,我会发送它们。
谢谢吉姆提出这个更好的问题! –