-1

我们考虑使用Apache Spark来更快地匹配记录,但我们发现它比使用select语句的SQL匹配效率低。
使用,
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("AIRecordLinkage").setMaster("local[*]"));<br> Dataset<Row> sourceFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);Apache Spark性能问题

我们能够进口约180万条记录引发存储在数据集对象的环境。 现在用滤波函数 targetFileContent.filter(COL( “TARGETUPC”)。equalTo(upcValue))

上述筛选语句是在一个循环中,其中up​​cValue得到大约46K的ID更新。

该程序正在执行几个小时,但我们使用sql IN操作符尝试了相同的操作,其中我们保留了所有在一分钟内执行的所有46k UPC标识。

配置:
火花SQL 2.11
火花芯2.11
JDK 8
视窗10,一节点4个核3GHz的,16GB的RAM。
C盘 - > 12 GB可用空间。 Eclipse - >运行配置 - > -Xms15000m。

请帮助我们分析和理解是否有任何错误,并告诉我们需要做些什么来提高性能。

@Component("upcExactMatch") 
    public class UPCExactMatch { 
     @Autowired 
     private Environment envirnoment; 

     @Autowired 
     private LoadCSV loadCSV; 

     @Autowired 
     private SQLHandler sqlHandler; 

     public ArrayList<Row> perform(){ 

      ArrayList<Row> upcNonMatchedItemIDs=new ArrayList<Row>(); 
      ArrayList<Row> upcMatchedItemIDs=new ArrayList<Row>(); 

      JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); 
      SQLContext sqlContext = new SQLContext(javaSparkContext); 
      SparkSession sparkSession = SparkSession.builder().appName("JavaStopWordshandlerTest").getOrCreate(); 

      try{ 
       Dataset<Row> sourceFileContent =loadCSV.load(sourceFileName,sourceFileLocation,javaSparkContext,sqlContext); 

       // load target from database 
       Dataset<Row> targetFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties); 
       System.out.println("File counts :"+sourceFileContent.count()+" : "+targetFileContent.count()); 

       ArrayList<String> upcMatched = new ArrayList<String>(); 
       ArrayList<String> partNumberMatched = new ArrayList<String>(); 

       List<Row> sourceFileContents = sourceFileContent.collectAsList(); 

       int upcColumnIndex=-1; 
       int itemIDColumnIndex=-1; 
       int partNumberTargetIndex=-1; 
       String upcValue=""; 

       StructType schema = targetFileContent.schema(); 
       List<Row> data = Arrays.asList(); 
       Dataset<Row> upcMatchedRows = sparkSession.createDataFrame(data, schema); 

       for(Row rowSourceFileContent: sourceFileContents){ 

        upcColumnIndex=rowSourceFileContent.fieldIndex("Vendor UPC"); 

        if(!rowSourceFileContent.isNullAt(upcColumnIndex)){ 

         upcValue=rowSourceFileContent.get(upcColumnIndex).toString(); 
         upcMatchedRows=targetFileContent.filter(col("TARGETUPC").equalTo(upcValue)); 

         if(upcMatchedRows.count() > 0){ 

          for(Row upcMatchedRow: upcMatchedRows.collectAsList()){ 
           partNumberTargetIndex=upcMatchedRow.fieldIndex("PART_NUMBER"); 

           if(partNumberTargetIndex != -1){ 
            upcMatched.add(upcValue); 
            partNumberMatched.add(upcMatchedRow.get(partNumberTargetIndex).toString()); 
            System.out.println("Source UPC : "+upcValue +"\tTarget part number :"+ upcMatchedRow.get(partNumberTargetIndex)); 

           } 
          } 

         } 

        } 

       } 

       for(int i=0;i<upcMatched.size();i++){ 
        System.out.println("Matched Exact UPC ids are :"+upcMatched.get(i) + "\t:Target\t"+partNumberMatched.get(i)); 

       } 

      }catch(Exception e){ 
       e.printStackTrace(); 
      }finally{ 
       sparkSession.stop(); 
       sqlContext.clearCache(); 
       javaSparkContext.close(); 
      } 

      return upcMatchedItemIDs; 

     } 

    } 

回答

0

尝试在匹配记录的两个数据集数据框之间进行内连接。

+0

谢谢内部联接工作匹配记录 – Nischay