2017-10-19 103 views
0

首先,为标题道歉,我不确定如何简洁地描述这一点。Spark多个输出路径导致多个输入读取

我有一个Spark日志解析到JSON,然后使用spark-sql将特定列转换为ORC并写入各种路径。例如:

val logs = sc.textFile("s3://raw/logs") 
val jsonRows = logs.mapPartitions(partition => { 
    partition.map(log => { 
    logToJson.parse(log) 
    } 
} 

jsonRows.foreach(r => { 
    val contentPath = "s3://content/events/" 
    val userPath = "s3://users/events/" 
    val contentDf = sqlSession.read.schema(contentSchema).json(r) 
    val userDf = sqlSession.read.schema(userSchema).json(r) 
    val userDfFiltered = userDf.select("*").where(userDf("type").isin("users") 
    // Save Data 
    val contentWriter = contentDf.write.mode("append").format("orc") 
    eventWriter.save(contentPath) 
    val userWriter = userDf.write.mode("append").format("orc") 
    userWriter.save(userPath) 

当我写这篇文章时,我预计解析会发生一次,然后它会写入相应的位置。但是,它似乎在执行文件中的所有代码两次 - 一次为content,一次为users。这是预期的吗?我宁愿我最终不会从S3传输数据和解析两次,因为这是最大的瓶颈。我从Spark UI中附加了一个图像,以显示单个“流”窗口的任务重复。感谢您的任何帮助,您可以提供! Spark Application UI

回答

0

好的,这种嵌套的DFs是不行的。 DataFrames的意思是数据结构为数据集将不适合正常的数据结构(如SeqList),并且需要以分布式方式处理。它是而不是只是另一种阵列。你在这里试图做的是每个日志行创建一个DataFrame,这没什么意义。

据我可以告诉你在这里发布的(不完整)代码,你想创建两个新的DataFrames从你原来的输入(日志),然后你想存储在两个不同的位置。这样的事情:

val logs = sc.textFile("s3://raw/logs") 
val contentPath = "s3://content/events/" 
val userPath = "s3://users/events/" 

val jsonRows = logs 
    .mapPartitions(partition => { 
    partition.map(log => logToJson.parse(log)) 
    } 
    .toDF() 
    .cache() // Or use persist() if dataset is larger than will fit in memory 

jsonRows 
    .write 
    .format("orc") 
    .save(contentPath) 

jsonRows 
    .filter(col("type").isin("users")) 
    .write 
    .format("orc") 
    .save(userPath) 

希望这会有所帮助。