2014-09-21 59 views
0

我编写了一个MapReduce作业,用于在使用旧API存储在HDFS中的输入文件中查找搜索字符串(通过命令行参数传递)的发生。MapReduce旧API - 将命令行参数传递到地图

下面是我的Driver类 -

public class StringSearchDriver 
{ 

    public static void main(String[] args) throws IOException 
    { 
     JobConf jc = new JobConf(StringSearchDriver.class); 
     jc.set("SearchWord", args[2]); 
     jc.setJobName("String Search"); 
     FileInputFormat.addInputPath(jc, new Path(args[0])); 
     FileOutputFormat.setOutputPath(jc, new Path(args[1])); 
     jc.setMapperClass(StringSearchMap.class); 
     jc.setReducerClass(StringSearchReduce.class); 
     jc.setOutputKeyClass(Text.class); 
     jc.setOutputValueClass(IntWritable.class); 
     JobClient.runJob(jc); 
    } 
} 

下面是我的映射类 -

public class StringSearchMap extends MapReduceBase implements 
     Mapper<LongWritable, Text, Text, IntWritable> 
{ 
    String searchWord; 

    public void configure(JobConf jc) 
    { 
     searchWord = jc.get("SearchWord"); 

    } 



    @Override 
    public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> out, Reporter reporter) 
      throws IOException 
    { 
     String[] input = value.toString().split(""); 

     for(String word:input) 
     { 
      if (word.equalsIgnoreCase(searchWord)) 
       out.collect(new Text(word), new IntWritable(1)); 
     } 
    } 

} 

在运行作业(通过命令行字符串 “HI”),我得到下面的错误 -

14/09/21 22:35:41 INFO mapred.JobClient: Task Id : attempt_201409212134_0005_m_000001_2, Status : FAILED 
java.lang.ClassCastException: interface javax.xml.soap.Text 
    at java.lang.Class.asSubclass(Class.java:3129) 
    at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795) 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:422) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:416) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

请建议。

+0

'ClassCastException:interface javax.xml.soap.Text'看起来像在源代码中自动导入了错误的Text类? – jkovacs 2014-09-21 21:49:32

回答

1

您自动导入错误导入。 而不是进口org.apache.hadoop.io.Text进口javax.xml.soap.Text

你可以找到在此blog样本错误进口。

一点,最好是采用新API

编辑

我使用新的API

import java.io.IOException; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

/** 
* @author Unmesha sreeveni 
* @Date 23 sep 2014 
*/ 
public class StringSearchDriver extends Configured implements Tool { 
    public static class Map extends 
    Mapper<LongWritable, Text, Text, IntWritable> { 

     private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 

     public void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 
      Configuration conf = context.getConfiguration(); 
      String line = value.toString(); 
      String searchString = conf.get("word"); 
      StringTokenizer tokenizer = new StringTokenizer(line); 
      while (tokenizer.hasMoreTokens()) { 
       String token = tokenizer.nextToken(); 
       if(token.equals(searchString)){ 
        word.set(token); 
        context.write(word, one); 
       } 

      } 
     } 
    } 

    public static class Reduce extends 
    Reducer<Text, IntWritable, Text, IntWritable> { 

     public void reduce(Text key, Iterable<IntWritable> values, 
       Context context) throws IOException, InterruptedException { 

      int sum = 0; 
      for (IntWritable val : values) { 
       sum += val.get(); 
      } 
      context.write(key, new IntWritable(sum)); 
     } 
    } 
    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     int res = ToolRunner.run(conf, new StringSearchDriver(), args); 
     System.exit(res); 

    } 
    @Override 
    public int run(String[] args) throws Exception { 
     // TODO Auto-generated method stub 
     if (args.length != 3) { 
      System.out 
      .printf("Usage: Search String <input dir> <output dir> <search word> \n"); 
      System.exit(-1); 
     } 

     String source = args[0]; 
     String dest = args[1]; 
     String searchword = args[2]; 
     Configuration conf = new Configuration(); 
     conf.set("word", searchword); 
     Job job = new Job(conf, "Search String"); 
     job.setJarByClass(StringSearchDriver.class); 
     FileSystem fs = FileSystem.get(conf); 

     Path in =new Path(source); 
     Path out =new Path(dest); 
     if (fs.exists(out)) { 
      fs.delete(out, true); 
     } 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(IntWritable.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(IntWritable.class); 
     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 
     FileInputFormat.addInputPath(job, in); 
     FileOutputFormat.setOutputPath(job, out); 
     boolean sucess = job.waitForCompletion(true); 
     return (sucess ? 0 : 1); 
    } 
} 

这工作。

+0

谢谢SreeVeni。我没有意识到错误的软件包在自动导入时已导入。我更正了软件包并且成功运行,没有发生任何错误。现在,问题是我没有得到期望的输出。我通过“hi”作为搜索词,我的HDFS示例文件包含这个词。运行作业后生成的输出文件仍为空。逻辑看起来不正确? – Hadooper 2014-09-22 19:13:54

+0

请参阅我的编辑。希望这可以帮助你 – 2014-09-23 05:01:39

+0

这对于新的API来说是很好的。但我应该使用旧的API对其进行编码。 – Hadooper 2014-09-23 07:11:33

0

For Text;需要的hadoop包是org.apache.hadoop.io。检查你的包

import java.io.IOException; 
import java.util.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*;