2015-11-05 67 views
1

我有问题将二进制文件(这是作为序列文件存储在Hadoop中)复制到本地机器。问题是我从hdfs下载的二进制文件不是我在运行map-reduce任务时生成的原始二进制文件。我搜索了类似的问题,我想问题是,当我将序列文件复制到本地机器时,我得到了序列文件的头文件和原始文件。Java - 在Hadoop中下载序列文件

我的问题是:有没有办法避免下载的头,但仍保持我原来的二进制文件?

有两种方法我能想到的:

  1. 我可以转换二进制文件到像文本的其他格式,这样我可以尽量避免使用SequenceFile。在我做copyToLocal之后,我将它转换回二进制文件。

  2. 我仍然可以使用序列文件。但是当我生成二进制文件时,我还会生成一些关于相应序列文件的元信息(例如,头文件的长度和文件的原始长度)。在我做copyToLocal之后,我使用下载的二进制文件(包含头文件等)以及元信息来恢复我的原始二进制文件。

我不知道哪一个是可行的。谁能给我一个解决方案?你能不能给我看一些你给的解决方案的示例代码?

我非常感谢您的帮助。

+0

如何从SequenceFile下载二进制内容?请发布您的代码。 –

回答

1

我找到了解决此问题的方法。由于下载序列文件会给你在二进制文件中的标题和其他魔术字,我避免这个问题的方式是将我的原始二进制文件转换为Base64字符串并将其作为文本存储在HDFS中,并且当下载编码的二进制文件时,我解码它回到我原来的二进制文件。

我知道这将需要额外的时间,但目前我没有找到任何其他办法解决这个问题。直接删除序列文件中的标题和其他魔术字的难点在于Hadoop可能会在我的二进制文件之间插入一些“Sync”字。

如果有人有更好的解决这个问题,我会很高兴听到这一点。 :)

0

使用MapReduce的代码读取SequenceFile并使用SequenceFileInputFormat作为InputFileFormat读取HDFS序列文件。这会将文件分割为Key Value对,并且该值只有可用于创建二进制文件的二进制文件内容。

下面的代码片段,以拆分是由多张图片,并拆分成单独的二进制文件,并将其写入到本地文件系统中的序列文件。

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class CreateOrgFilesFromSeqFile { 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 

     if (args.length !=2){ 
      System.out.println("Incorrect No of args (" + args.length + "). Expected 2 args: <seqFileInputPath> <outputPath>"); 
      System.exit(-1); 
     } 

     Path seqFileInputPath = new Path(args[0]); 
     Path outputPath = new Path(args[1]); 

     Configuration conf = new Configuration(); 
     Job job = Job.getInstance(conf, "CreateSequenceFile"); 

     job.setJarByClass(M4A6C_CreateOrgFilesFromSeqFile.class); 
     job.setMapperClass(CreateOrgFileFromSeqFileMapper.class); 

     job.setInputFormatClass(SequenceFileInputFormat.class); 

     job.setOutputKeyClass(NullWritable.class); 
     job.setOutputValueClass(Text.class); 

     FileInputFormat.addInputPath(job, seqFileInputPath); 
     FileOutputFormat.setOutputPath(job, outputPath); 

     //Delete the existing output File 
     outputPath.getFileSystem(conf).delete(outputPath, true); 

     System.exit(job.waitForCompletion(true)? 0 : -1); 

    } 

} 

class CreateOrgFileFromSeqFileMapper extends Mapper<Text, BytesWritable, NullWritable, Text>{ 

    @Override 
    public void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException{ 


     Path outputPath = FileOutputFormat.getOutputPath(context); 
     FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); 

     String[] filePathWords = key.toString().split("/"); 
     String fileName = filePathWords[filePathWords.length-1]; 

     System.out.println("outputPath.toString()+ key: " + outputPath.toString() + "/" + fileName + "value length : " + value.getLength()); 

     try(FSDataOutputStream fdos = fs.create(new Path(outputPath.toString() + "/" + fileName));){ 

      fdos.write(value.getBytes(),0,value.getLength()); 
      fdos.flush(); 
     } 

      //System.out.println("value: " + value + ";\t baos.toByteArray().length: " + baos.toByteArray().length); 
      context.write(NullWritable.get(), new Text(outputPath.toString() + "/" + fileName));    
    } 
}