0

我正在使用Cassandra 2.2.8,JDK8,spark-cassandra-connector-java_2.10,spark-cassandra-connector_2.11-2.0.0-M3,cassandra-driver-core-3.1 .0及以下 Cassandra Spark Connector Example JavaDemo。此演示必须修复以使用新的2.1 Connetcor API进行编译。我有固定的几件事情,但是这下面的一个被绊倒了我:在这行 编译错误:Cassandra Spark连接器JavaDemo编译错误

JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMap(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() { 
     @Override 
     public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception { 

错误:

The method 
    flatMap(FlatMapFunction<Tuple2<Integer,Tuple2<SparkJavaDemo.Sale,SparkJavaDemo.Product>>,U>) in the type 
     AbstractJavaRDDLike<Tuple2<Integer,Tuple2<SparkJavaDemo.Sale,SparkJavaDemo.Product>>,JavaPairRDD<Integer,Tuple2<SparkJavaDemo. 
     Sale,SparkJavaDemo.Product>>> is not applicable for the arguments (new 
     PairFlatMapFunction<Tuple2<Integer,Tuple2<SparkJavaDemo.Sale,SparkJavaDemo.Product>>,Integer,BigDecimal>(){}) 

感谢

回答

1

可以使用flatMapToPair代替flatMap如下所示。

JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() { 
     @Override 
     public Iterator<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception { 
      Tuple2<Sale, Product> saleWithProduct = input._2(); 
      List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1); 
      allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice())); 
      for (Integer parentProduct : saleWithProduct._2().getParents()) { 
       allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice())); 
      } 
      return allSales.iterator(); 
     } 
    }); 

我在https://gist.github.com/baghelamit/f2963d9e37acc55474559104f5f16cf1

+0

试图创造了更新的代码要点这个**得到一个不同的错误,现在**:'公众的Iterator >通话(Tuple2 <整数,Tuple2 <销售,产品>>输入)**返回类型不兼容**与 \t PairFlatMapFunction >,Integer,BigDecimal> .call(Tuple2 >)' –

+0

当我使用核心3.1 java驱动程序(与2.1协同工作)时,会出现单独的编译错误资源类型Session会not implementation \t java.lang.AutoCloseable try(Session session = connector.openSession())' –

+0

我已经更新了JavaDemo的pom.xml和java类。更新的文件位于https://gist.github.com/baghelamit/f2963d9e37acc55474559104f5f16cf1。你检查了吗? – abaghel