2014-08-28 84 views
0

我看过关于这个主题的其他线程,仍然没有找到答案......猪访问分布式缓存StoreFunc

简单地说,我想从一个猪StoreFunc访问Hadoop分布式高速缓存,以及不是直接在UDF内。

相关PIG行代码:

DEFINE CustomStorage KeyValStorage('param1','param2','param3'); 
... 
STORE BLAH INTO /path/ using CustomStorage(); 

相关的Java代码:

public class KeyValStorage<M extends Message> extends BaseStoreFunc /* ElephantBird Storage which inherits from StoreFunc */ { 

... 
public KeyValStorage(String param1, String param2, String param3) { 
    ... 
     try { 
      InputStream is = new FileInputStream(configName); 
      try { 
       prop.load(is); 
      } catch (IOException e) { 
       System.out.println("PROPERTY LOADING FAILED"); 
       e.printStackTrace(); 
      } 
     } catch (FileNotFoundException e) { 
      System.out.println("FILE NOT FOUND"); 
      e.printStackTrace(); 
     } 
    } 
... 
} 

配置名称是本地文件,我应该能够从分布式缓存读取的名字,但是,我得到一个FileNotFoundException。当我直接在PIG UDF中使用EXACT相同的代码时,该文件被找到,所以我知道该文件通过分布式缓存发送。我设置适当的参数,以确保发生这种情况:

<property><name>mapred.cache.files</name><value>/path/to/file/file.properties#configName</value></property> 

任何想法如何解决这个问题?

谢谢!

回答

0

StroreFunc的构造函数被称为前端后端。当它从前端被调用时(在作业启动之前),那么你会得到FileNotFoundException,因为此时来自分布式缓存的文件还没有被复制到节点的本地磁盘。
您可以检查您是否在后端(时正在执行的任务),只有在这种情况下加载该文件e.g:

DEFINE CustomStorage KeyValStorage('param1','param2','param3'); 
set mapreduce.job.cache.files hdfs://host/user/cache/file.txt#config 
... 
STORE BLAH INTO /path/ using CustomStorage(); 

public KeyValStorage(String param1, String param2, String param3) { 
    ... 
    try { 
    if (!UDFContext.getUDFContext().isFrontend()) { 
     InputStream is = new FileInputStream("./config"); 
     BufferedReader br = new BufferedReader(new InputStreamReader(is)); 
     ... 
    ... 
}