2017-02-20 60 views
1

使用PySpark从S3加载多个JSON文件时出现错误,并且如果文件丢失,Spark作业将失败。PySpark作业在加载多个文件时失败,其中一个丢失

产生的原因:org.apache.hadoop.mapred.InvalidInputException:输入模式S3N://example/example/2017-02-18/*.json匹配0文件

这是怎么了我使用PySpark将最后5天添加到我的工作中。

days = 5 
x = 0 
files = [] 

while x < days: 
    filedate = (date.today() - timedelta(x)).isoformat() 
    path = "s3n://example/example/"+filedate+"/*.json" 
    files.append(path) 
    x += 1 

rdd = sc.textFile(",".join(files))      
df = sql_context.read.json(rdd, schema) 

我该如何让PySpark忽略丢失的文件并继续工作?

回答

1

使用试图加载文件的函数,如果文件丢失则失败并返回False。

from py4j.protocol import Py4JJavaError 

def path_exist(sc, path): 
    try: 
     rdd = sc.textFile(path) 
     rdd.take(1) 
     return True 
    except Py4JJavaError as e: 
     return False 

这可以让你检查文件是否为它们添加到您的列表,而不必使用AWS CLI或S3命令之前可用。

days = 5 
x = 0 
files = [] 

while x < days: 
    filedate = (date.today() - timedelta(x)).isoformat() 
    path = "s3n://example/example/"+filedate+"/*.json" 
    if path_exist(sc, path): 
     files.append(path) 
    else: 
     print('Path does not exist, skipping: ' + path) 
    x += 1 

rdd = sc.textFile(",".join(files))      
df = sql_context.read.json(rdd, schema) 

我发现这个解决方案在http://www.learn4master.com/big-data/pyspark/pyspark-check-if-file-exists