2015-09-24 37 views
0

我有一组记录,我只需要处理男性记录,在map reduce程序中,如果条件仅过滤男性记录,但使用下面的程序,则会给出零记录作为输出。Map Reduce筛选记录

输入文件:

1,布兰登巴克纳,小时量,女性,525
2,吠陀霍普金斯,小时量,男性,633
3,齐亚Underwood的,扑热息痛,男性,980
4,奥斯汀迈耶,对乙酰氨基酚,女性,338
5,马拉希金斯,小时量,女性,153
6,西比尔克罗斯比,小时量,男性,193
7,泰勒罗萨莱斯,扑热息痛,男性,778
8,伊万黑尔, avil,女,454
9,阿利卡吉尔摩,对乙酰氨基酚,女,833
10,LEN伯吉斯,metacin,男,325

的MapReduce程序:

package org.samples.mapreduce.training; 

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 


public class patientrxMR_filter { 

    public static class MapDemohadoop extends 
      Mapper<LongWritable, Text, Text, IntWritable> { 

     // setup , map, run, cleanup 

     public void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 
      String line = value.toString(); 
      String[] elements = line.split(","); 



String gender =elements[3]; 


if (gender == "male") { 

    Text tx = new Text(elements[2]); 
       int i = Integer.parseInt(elements[4]); 
       IntWritable it = new IntWritable(i); 
       context.write(tx, it); 
} 
     } 
    } 

    public static class Reduce extends 
      Reducer<Text, IntWritable, Text, IntWritable> { 

     // setup, reduce, run, cleanup 
     // innput - para [150,100] 
     public void reduce(Text key, Iterable<IntWritable> values, 
       Context context) throws IOException, InterruptedException { 
      int sum = 0; 
      for (IntWritable val : values) { 
       sum += val.get(); 
      } 
      context.write(key, new IntWritable(sum)); 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     if (args.length != 2) { 
      System.err.println("Insufficient args"); 
      System.exit(-1); 
     } 
     Configuration conf = new Configuration(); 

     //conf.set("fs.default.name","hdfs://localhost:50000"); 
     conf.set("mapred.job.tracker", "hdfs://localhost:50001"); 

//  conf.set("DrugName", args[3]); 
     Job job = new Job(conf, "Drug Amount Spent"); 

     job.setJarByClass(patientrxMR_filter.class); // class conmtains mapper and 
               // reducer class 

     job.setMapOutputKeyClass(Text.class); // map output key class 
     job.setMapOutputValueClass(IntWritable.class);// map output value class 
     job.setOutputKeyClass(Text.class); // output key type in reducer 
     job.setOutputValueClass(IntWritable.class);// output value type in 
                // reducer 

     job.setMapperClass(MapDemohadoop.class); 
     job.setReducerClass(Reduce.class); 
     job.setNumReduceTasks(1); 
     job.setInputFormatClass(TextInputFormat.class); // default -- inputkey 
                 // type -- longwritable 
                 // : valuetype is text 
     job.setOutputFormatClass(TextOutputFormat.class); 



     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.waitForCompletion(true); 

    } 

} 
+0

是您的映射类拿起平等?我的意思是,你是否修过任何系统(如果在集群中)并检查了流程? – Ramzy

+0

尝试给你的映射器系统,这将帮助你... – madhu

回答

0
public void map(LongWritable key, Text value, Context context) 
throws IOException, InterruptedException { 
String line = value.toString(); 
String[] elements = line.split(","); 

Hadoop是使用分布式文件系统,在“字符串line = value.toString();“ 行是具有偏移(键)的块中的文件内容。在这种情况下,该行会加载整个测试文件,该文件显然可以放入一个块中,而不是像预期的那样放入文件中的每一行。

2
if (gender == "male") 

此行不为平等的检查工作,在Java中使用PLS的Object.Equals()

i.e 
if (gender.equals("male"))