2014-10-17 61 views
0

我想创建我自己的地图减少作业。如何定义我们自己的地图和减少类

地图类的输出是:文本(键),文本(值)

的减少类的输出是:文本,Intwritable

我试图实现它的方式如下:

import java.io.IOException; 
import java.util.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class artistandTrack { 

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 
     String line = value.toString(); 
      String[] names=line.split(" "); 
      Text artist_name = new Text(names[2]); 
      Text track_name = new Text(names[3]);    

output.collect(artist_name,track_name); 
    } 
    } 

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable> { 
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
     sum += 1; 
      Text x1=values.next(); 
        } 
     output.collect(key, new IntWritable(sum)); 
    } 
    } 

    public static void main(String[] args) throws Exception { 
    JobConf conf = new JobConf(artistandTrack.class); 
    conf.setJobName("artisttrack"); 

    conf.setOutputKeyClass(Text.class); 
    conf.setOutputValueClass(IntWritable.class); 

     conf.setMapOutputKeyClass(Text.class); 
     conf.setMapOutputValueClass(Text.class); 

    conf.setMapperClass(Map.class); 
    conf.setCombinerClass(Reduce.class); 
    conf.setReducerClass(Reduce.class); 

    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

    JobClient.runJob(conf); 
    } 
} 

当我尝试运行它,它显示下面的输出,并终止

WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
14/10/17 06:09:15 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 
14/10/17 06:09:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 
14/10/17 06:09:16 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 
14/10/17 06:09:18 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
14/10/17 06:09:19 INFO mapred.FileInputFormat: Total input paths to process : 1 
14/10/17 06:09:19 INFO mapreduce.JobSubmitter: number of splits:1 
14/10/17 06:09:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local803195645_0001 
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/staging/userloki803195645/.staging/job_local803195645_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. 
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/staging/userloki803195645/.staging/job_local803195645_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring. 
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/local/localRunner/userloki/job_local803195645_0001/job_local803195645_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring. 
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/local/localRunner/userloki/job_local803195645_0001/job_local803195645_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring. 
14/10/17 06:09:20 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 
14/10/17 06:09:20 INFO mapreduce.Job: Running job: job_local803195645_0001 
14/10/17 06:09:20 INFO mapred.LocalJobRunner: OutputCommitter set in config null 
14/10/17 06:09:20 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 
14/10/17 06:09:20 INFO mapred.LocalJobRunner: Waiting for map tasks 
14/10/17 06:09:20 INFO mapred.LocalJobRunner: Starting task: attempt_local803195645_0001_m_000000_0 
14/10/17 06:09:20 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 
14/10/17 06:09:20 INFO mapred.MapTask: Processing split: hdfs://localhost:54310/project5/input/sad.txt:0+272 
14/10/17 06:09:21 INFO mapred.MapTask: numReduceTasks: 1 
14/10/17 06:09:21 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
14/10/17 06:09:21 INFO mapreduce.Job: Job job_local803195645_0001 running in uber mode : false 
14/10/17 06:09:21 INFO mapreduce.Job: map 0% reduce 0% 
14/10/17 06:09:22 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 
14/10/17 06:09:22 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 
14/10/17 06:09:22 INFO mapred.MapTask: soft limit at 83886080 
14/10/17 06:09:22 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 
14/10/17 06:09:22 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 
14/10/17 06:09:25 INFO mapred.LocalJobRunner: 
14/10/17 06:09:25 INFO mapred.MapTask: Starting flush of map output 
14/10/17 06:09:25 INFO mapred.MapTask: Spilling map output 
14/10/17 06:09:25 INFO mapred.MapTask: bufstart = 0; bufend = 120; bufvoid = 104857600 
14/10/17 06:09:25 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600 
14/10/17 06:09:25 INFO mapred.LocalJobRunner: map task executor complete. 
14/10/17 06:09:25 WARN mapred.LocalJobRunner: job_local803195645_0001 
***java.lang.Exception: java.io.IOException: wrong value class: class   org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text 
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 
Caused by: java.io.IOException: wrong value class: class org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text 
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:199) 
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1307) 
at artistandTrack$Reduce.reduce(artistandTrack.java:44) 
at artistandTrack$Reduce.reduce(artistandTrack.java:37) 
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1572) 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1611) 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1462) 
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:437) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342) 
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)*** 


14/10/17 06:09:26 INFO mapreduce.Job: Job job_local803195645_0001 failed with state FAILED due to: NA 
14/10/17 06:09:26 INFO mapreduce.Job: Counters: 11 
Map-Reduce Framework 
    Map input records=4 
    Map output records=4 
    Map output bytes=120 
    Map output materialized bytes=0 
    Input split bytes=97 
    Combine input records=0 
    Combine output records=0 
    Spilled Records=0 
    Failed Shuffles=0 
    Merged Map outputs=0 
File Input Format Counters 
    Bytes Read=272 
Exception in thread "main" java.io.IOException: Job failed! 
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836) 
at artistandTrack.main(artistandTrack.java:68) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.hadoop.util.RunJar.main(RunJar.java:212) 

从哪里不对劲类快到

java.lang.Exception: java.io.IOException: wrong value class: class   org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text 

为什么作业失败

Exception in thread "main" java.io.IOException: Job failed! 
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836) 
at artistandTrack.main(artistandTrack.java:68) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.hadoop.util.RunJar.main(RunJar.java:212) 

我不明白的地方它会错了。 任何帮助

回答

0

我认为这个问题是在行:

conf.setCombinerClass(Reduce.class); 

Map产生一对TextText,那么你combiner需要它,并产生一对TextIntWritable,但你的Reducer能不接受IntWritable作为值,因此它引发异常。尝试删除Combiner设置。

+0

好,我除去组合线,但仍然是相同的问题 – numanumu 2014-10-17 09:31:30

+0

输入(键,值)对,输出的数据类型的数据类型(键,值)对用于减速器类应该相等还是可以不同? – numanumu 2014-10-17 09:55:01

+0

它们可以不同。这个异常是否仍然出现在输出中:'*** java.lang.Exception:java.io.IOException:错误的值类:class org.apache.hadoop.io.IntWritable不是类org.apache.hadoop.io。文字'? – 2014-10-17 11:17:52

0

使用主要从波纹管:

public static void main(String[] args) throws Exception { 
JobConf conf = new JobConf(artistandTrack.class); 
conf.setJobName("artisttrack"); 

FileInputFormat.setInputPaths(conf, new Path(args[0])); 
FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

conf.setMapperClass(Map.class); 
//conf.setCombinerClass(Reduce.class); 
conf.setReducerClass(Reduce.class); 

//conf.setOutputKeyClass(Text.class); 
//conf.setOutputValueClass(IntWritable.class); 

conf.setMapOutputKeyClass(Text.class); 
conf.setMapOutputValueClass(Text.class); 

conf.setInputFormat(TextInputFormat.class); 
conf.setOutputFormat(TextOutputFormat.class); 

JobClient.runJob(conf); 
} 
+0

尝试过,但结果相同 – numanumu 2014-10-17 10:18:28

+0

最后的想法,删除conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(Text.class); conf.setCombinerClass(Reduce.class); – www 2014-10-17 11:15:44

+0

试图没有帮助任何想法为什么例外仍然存在 – numanumu 2014-10-17 13:49:10