2016-05-16 108 views
1

从下面的代码我不明白两两件事:不了解路径分布路径

  1. DistributedCache.addcachefile(new URI ('/abc.dat'), job.getconfiguration())

我不明白URI路径必须存在于HDFS。纠正我,如果我错了。

  • 什么是p.getname().equals()从下面的代码:

    public class MyDC { 
    
    public static class MyMapper extends Mapper < LongWritable, Text, Text, Text > { 
    
        private Map < String, String > abMap = new HashMap < String, String >(); 
    
        private Text outputKey = new Text(); 
    
        private Text outputValue = new Text(); 
    
        protected void setup(Context context) throws 
        java.io.IOException, InterruptedException { 
    
         Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
    
         for (Path p: files) { 
    
          if (p.getName().equals("abc.dat")) { 
    
           BufferedReader reader = new BufferedReader(new FileReader(p.toString())); 
    
           String line = reader.readLine(); 
    
           while (line != null) { 
    
            String[] tokens = line.split("\t"); 
    
            String ab = tokens[0]; 
    
            String state = tokens[1]; 
    
            abMap.put(ab, state); 
    
            line = reader.readLine(); 
    
           } 
    
          } 
    
         } 
    
         if (abMap.isEmpty()) { 
    
          throw new IOException("Unable to load Abbrevation data."); 
    
         } 
    
        } 
    
        protected void map(LongWritable key, Text value, Context context) 
        throws java.io.IOException, InterruptedException { 
    
         String row = value.toString(); 
    
         String[] tokens = row.split("\t"); 
    
         String inab = tokens[0]; 
    
         String state = abMap.get(inab); 
    
         outputKey.set(state); 
    
         outputValue.set(row); 
    
         context.write(outputKey, outputValue); 
    
        } 
    
    } 
    
    public static void main(String[] args) 
    throws IOException, ClassNotFoundException, InterruptedException { 
    
        Job job = new Job(); 
    
        job.setJarByClass(MyDC.class); 
    
        job.setJobName("DCTest"); 
    
        job.setNumReduceTasks(0); 
    
        try { 
    
         DistributedCache.addCacheFile(new URI("/abc.dat"), job.getConfiguration()); 
    
        } catch (Exception e) { 
    
         System.out.println(e); 
    
        } 
    
        job.setMapperClass(MyMapper.class); 
    
        job.setMapOutputKeyClass(Text.class); 
    
        job.setMapOutputValueClass(Text.class); 
    
    
        FileInputFormat.addInputPath(job, new Path(args[0])); 
    
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    
        job.waitForCompletion(true); 
    
    } 
    
    } 
    
  • 回答

    0

    DistributedCache是​​用于添加一个文件或一组在存储器中的文件的,将可为每一个API数据节点是否可以使用map-reduce。使用分布式缓存的一个例子是地图边连接。

    DistributedCache.addcachefile(new URI('/abc.dat'),job.getconfiguration())将在缓存区域中添加abc.dat文件。缓存中可以有n个文件,p.getName()。equals(“abc.dat”))会检查你需要的文件。 HDFS中的每条路径都将在Path []下进行map-reduce处理。例如:

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    

    第一路径(参数[0])是第一个参数 (输入文件位置)传递而罐执行与路径(参数[1])是第二个参数,其输出文件位置。一切都被视为Path数组。

    以同样的方式,当你添加任何文件缓存时,它将被排列在Path数组中,你不能使用下面的代码来检索它。

    Path [] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());

    它将返回缓存中的所有文件,并且您将通过p.getName()。equals()方法获取文件名。

    +0

    谢谢你!这非常有帮助 – Sri

    1

    分布式缓存的想法是做一些静态的数据之前提供给任务节点它开始执行。

    文件已存在于HDFS,以便它可以将其添加到分布式缓存(每个任务节点)

    DistributedCache.getLocalCacheFile基本上得到所有存在于任务节点的缓存文件。通过if (p.getName().equals("abc.dat")) {您正在获取适当的缓存文件以供您的应用程序处理。

    请参考文档如下:

    https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#DistributedCache

    https://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/filecache/DistributedCache.html#getLocalCacheFiles(org.apache.hadoop.conf.Configuration)

    +0

    感谢您的精彩回应!我清楚明白了! – Sri