2017-08-28 28 views
0

我是Spark的新手。我试图在本地模式(Windows)中使用spark java将csv文件保存为parquet。我有这个错误。另存为火花java中的Parquet文件

产生的原因:org.apache.spark.SparkException:任务失败,而写行

我提到的其他线程和残疾人火花炒作

集( “spark.speculation”, “假”)

我仍然得到错误。我在csv中只使用了两列来进行测试,但仍然出现在这个问题中。

输入:

portfolio_id;portfolio_code 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 

我的代码:

JavaPairRDD<Integer, String> rowJavaRDD = pairRDD.mapToPair(new PairFunction<String, Integer, String>() { 
    private Double[] splitStringtoDoubles(String s){ 
     String[] splitVals = s.split(";"); 
     Double[] vals = new Double[splitVals.length]; 
     for(int i= 0; i < splitVals.length; i++){ 
      vals[i] = Double.parseDouble(splitVals[i]); 
     } 
     return vals; 
    } 

    @Override 
    public Tuple2<Integer, String> call(String arg0) throws Exception { 
     // TODO Auto-generated method stub 
     return null; 
    } 
}); 


SQLContext SQLContext; 
SQLContext = new org.apache.spark.sql.SQLContext(sc); 

Dataset<Row> fundDF = SQLContext.createDataFrame(rowJavaRDD.values(), funds.class); 
fundDF.printSchema(); 

fundDF.write().parquet("C:/test"); 

请帮助什么,我在这里失踪。

+0

把完整的错误和堆栈跟踪你的问题,谢谢。 – Tim

+0

我通过添加的功能在Tuple2()这样分割解决错误的:public void运行(串T,字符串U){ \t公共Tuple2 <字符串,字符串>呼叫(字符串REC){ \t字符串[] tokens = rec.split(“;”); \t String [] vals = new String [tokens.length];对于(int i = 0; i (tokens [0],tokens [1]); \t}}); – Ans8

+0

@ Ans8请将您的答案放在答案中并接受它,以便它从“未答复”部分中消失。 –

回答

0

请找到 1)加载CSV我的Java /星火代码印度星火数据集 2)保存数据集,以实木复合地板

import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.SparkSession; 

SparkSession spark = SparkSession 
        .builder() 
        .appName("csv2parquet") 
        .config("spark.sql.warehouse.dir", "/file:/tmp") 
        .master("local") 
        .getOrCreate(); 

final String dir = "test.csv"; 


Dataset<Row> ds = spark.read().option("header", true).option("inferSchema", true).csv(dir); 

final String parquetFile = "test.parquet"; 
final String codec = "parquet"; 

ds.write().format(codec).save(parquetFile); 

spark.stop(); 

添加到您的POM

<dependency> 
       <groupId>org.apache.hadoop</groupId> 
       <artifactId>hadoop-mapreduce-client-core</artifactId> 
       <version>2.8.1</version> 
</dependency> 
1

这里回答,所以正如@Glennie Helles Sindholt所讲的那样,它从未回答的部分开始。对不起,在张贴这

延迟我加入的功能Tuple2()分裂这样解决了错误:

public void run(String t, String u) 

    { 

    public Tuple2<String,String> call(String rec){ 
     String[] tokens = rec.split(";"); 
     String[] vals = new String[tokens.length]; 
     for(int i= 0; i < tokens.length; i++) 
     { 
      vals[i] =tokens[i]; 
     } 

     return new Tuple2<String, String>(tokens[0], tokens[1]); 

    } 
    });