2016-11-18 61 views
0

我试图创建一个InputFormat,它只是在不从外部位置读取的情况下生成数据。它从配置中读取关闭之前要生成多少数据。这是为了帮助在非测试环境中分析OutputFormat。不幸的是,我找不到任何有关使用本质上的生成器InputFormat的参考。创建一个没有输入数据的自定义生成器Hadoop InputFormat

的InputFormat我到目前为止是:从0

public static class GeneratorInputFormat extends InputFormat<LongWritable, LongWritable> { 

    @Override 
    public RecordReader<LongWritable, LongWritable> createRecordReader(
     InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { 
     return new GeneratorRecordReader(); 
    } 

    @Override 
    public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 
     long splitCount = job.getConfiguration().getLong(SPLITS_COUNT_KEY, 0); 
     long splitSize = job.getConfiguration().getLong(SPLITS_SIZE_KEY, 0); 
     List<InputSplit> splits = new ArrayList<InputSplit>(); 
     for (int i = 0; i < splitCount; i++) { 
     splits.add(new TestInputSplit(splitSize)); 
     } 
     return splits; 
    } 
    } 

    public static class TestInputSplit extends InputSplit { 

    private final long size; 

    public TestInputSplit(long size) { 
     this.size = size; 
    } 

    @Override 
    public long getLength() throws IOException, InterruptedException { 
     return size; 
    } 

    @Override 
    public String[] getLocations() throws IOException, InterruptedException { 
     return new String[0]; 
    } 
    } 

记录读取器简单毛刺编号,输入长度。

我得到的错误是缺少文件例外:

16/11/18 03:28:54 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/root/.staging/job_1479265882561_0037 
Exception in thread "main" java.lang.NullPointerException 
     at org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits(JobSplitWriter.java:132) 
     at org.apache.hadoop.mapreduce.split.JobSplitWriter.createSplitFiles(JobSplitWriter.java:79) 
     at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:307) 
     at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318) 
     at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196) 
     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290) 
     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.Subject.doAs(Subject.java:422) 
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) 
     at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287) 
     at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308) 
     at com.gmail.mooman219.cloud.hadoop.WordCountBench.main(WordCountBench.java:208) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.hadoop.util.RunJar.run(RunJar.java:221) 
     at org.apache.hadoop.util.RunJar.main(RunJar.java:136) 
     at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12) 
16/11/18 03:28:54 WARN hdfs.DFSClient: DataStreamer Exception 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/hadoop-yarn/staging/root/.staging/job_1479265882561_0037/job.split (inode 34186): File does $ 
ot exist. Holder DFSClient_NONMAPREDUCE_232487306_1 does not have any open files. 
     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3430) 
     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3233) 
     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3071) 
     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031) 
     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725) 
     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492) 
     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) 
     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) 
     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) 
     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) 
     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.Subject.doAs(Subject.java:422) 
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) 
     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) 

我觉得这很奇怪,因为在任何时候,我会引用在输入端的任何文件。

回答

0

看来,Hadoop的问题试图读取不存在的文件从InputSplit茎。如果InputSplit没有定义读取数据的行为,那么Hadoop默认使用它自己的方法。

这可以通过为InputSplit实现Writable来解决。在我的情况下,由此产生的InputSplit实现最终看起来是这样的:

public static class TestInputSplit extends InputSplit implements Writable { 

    @Override 
    public long getLength() throws IOException, InterruptedException { 
     return 0L; 
    } 

    @Override 
    public String[] getLocations() throws IOException, InterruptedException { 
     return new String[0]; 
    } 

    @Override 
    public void readFields(DataInput arg0) throws IOException {} 

    @Override 
    public void write(DataOutput arg0) throws IOException {} 
    } 
0

这个错误清楚地说,文件找不到

+0

首先没有定义文件,它不应该从任何地方读取。 –

相关问题