2015-11-18 301 views
2

我想在PySpark中有效地将numpy数组从/到工作者机器(函数)读取到HDFS。我有两台机器A和B.A有主人和工人。 B有一名工人。对于例如我想实现如下的内容:如何从PySpark工作者保存numpy数组到HDFS或共享文件系统?

if __name__ == "__main__": 
    conf = SparkConf().setMaster("local").setAppName("Test") 
    sc = SparkContext(conf = conf) 
    sc.parallelize([0,1,2,3], 2).foreachPartition(func) 

def func(iterator): 
    P = << LOAD from HDFS or Shared Memory as numpy array>> 
    for x in iterator: 
     P = P + x 

    << SAVE P (numpy array) to HDFS/ shared file system >> 

有什么可以为这个快速和有效的方法?

回答

0

我偶然发现了同样的问题。并最终使用了HdfsCli module和Python3.4的临时文件的解决方法。

  1. 进口:
from hdfs import InsecureClient 
from tempfile import TemporaryFile 
  • 创建HDFS客户端。在大多数情况下,最好是在脚本中的某处有一个实用功能,像这样的:
  • def get_hdfs_client(): 
        return InsecureClient("<your webhdfs uri>", user="<hdfs user>", 
         root="<hdfs base path>") 
    
  • 负荷,节省了工人函数内部numpy的:
  • hdfs_client = get_hdfs_client() 
    
    # load from file.npy 
    path = "/whatever/hdfs/file.npy" 
    tf = TemporaryFile() 
    
    with hdfs_client.read(path) as reader: 
        tf.write(reader.read()) 
        tf.seek(0) # important, set cursor to beginning of file 
    
    np_array = numpy.load(tf) 
    
    ... 
    
    # save to file.npy 
    tf = TemporaryFile() 
    numpy.save(tf, np_array) 
    tf.seek(0) # important ! set the cursor to the beginning of the file 
    # with overwrite=False, an exception is thrown if the file already exists 
    hdfs_client.write("/whatever/output/file.npy", tf.read(), overwrite=True) 
    

    注:

    • 的URI用于创建HDFS客户端开始与http://,因为它使用hdfs文件系统的web界面;
    • 确保你传递给HDFS客户端的用户具有读取和写入权限在我的经验
    • ,开销不显著(至少在执行时间期限)
    • 使用临时文件的优势(与常规文件/tmp)是你确保脚本结束后没有垃圾文件留在群集机器中,通常与否
    相关问题