2017-10-19 122 views
1

我知道星火知道如何分配需要为将在例如另一个节点故障的情况下,开始了一个新的节点上完成的工作。星火数据集选择重新计算

我想知道这是否可以在其他用例中使用。

假设我有转换和操作的树。当其中一个数据集/数据框得到更新时会发生什么(例如,导入了新文件)。在这种情况下,我只想重复那些受到影响并与此变化相关联的转换和操作。其他不相关的转换和操作应该从缓存中使用,因为它们不受影响。

现在,我应该只有一些这些数据框和转换和行动,我可以手动执行。但是我有几十个甚至更多的这样的DF和动作,我正试图理解,如果spark能够在框架内部构建一些内容,可以帮助我在这里。

这里是我的代码示例:

val carLines = spark 
    .read 
    .option("header", "true") 
    .schema(carLineSchema) 
    .csv("src/test/resources/cars") 

val ageMappingFunction: Int => String = (age: Int) => if (age > 80) "old" else "young" 
// 
val _age = udf.register("_age", ageMappingFunction) 

val personLines = spark 
    .read 
    .option("header", "true") 
    .schema(personLineSchema) 
    .csv("src/test/resources/persons") 
    .withColumn("_age", _age($"age")) 

val accidentsLines = spark 
    .read 
    .option("header", "true") 
    .schema(accidentLineSchema) 
    .csv("src/test/resources/accidents") 

val carOwners = personLines 
    .withColumnRenamed("id", "driver_id") 
    .join(carLines, Seq("driver_id"), "left") 
    .withColumnRenamed("id", "car_id") 
    .withColumnRenamed("car_make", "car_maker") 
    .withColumnRenamed("driver_id", "id") 

现在对于一些转变:

val accidentsWithDrivers = accidentsLines 
    .join(personLines.withColumnRenamed("id", "driver_id"), "driver_id") 

val accidentsPerDriverID = accidentsWithDrivers 
    .groupBy("driver_id") 
    .agg(Map(
    "name" -> "count" 
)) 
    .withColumnRenamed("count(name)", "accident_count") 
    .withColumnRenamed("driver_id", "id") 

val finalTable = carOwners 
    .join(numberOfCarsPerDriver, Seq("id", "name", "age", "_age")) 
    .join(accidentsPerDriverID, "id") 

然后我做一些动作(为简单起见,我将使用 '秀'):

carOwners.show(true) 
numberOfCarsPerDriver.show(true) 
finalTable.show(true) 

所以 - 我问的是如果accidentsLines已更改,但不是carLinespersonLines。我们可以做carOwners转型与carLinespersonLines缓存的值?

其他语言: 假设我想将它保存在内存中的spark集群中,我能以某种方式使用RDD#cache()api以在不同的驱动程序运行之间生存吗?

回答

1

原来我需要或者使用job-server或Apache的点燃使用IgniteRDD支持:

//WRITE 
val igniteContext = new IgniteContext(spark.sparkContext, "ignite-config.xml", true) 
val schema = dataframe.schema 
val rdd = dataframe.rdd 
igniteContext.ignite().getOrCreateCache("ignite-cache").put("schema", schema) 
igniteContext.fromCache(name).saveValues(rdd) 

//READ 
val schema = igniteContext.ignite() 
    .getOrCreateCache[String, StructType]("ignite-cache") 
    .get("schema") 
    .asInstanceOf[StructType] 

    val igniteRdd: IgniteRDD[String, Row] = igniteContext.fromCache(name) 
    val rdd = igniteRdd.map(a => a._2) 
    val dataframe = spark.createDataFrame(rdd, schema)