2017-04-15 192 views
0

我正在使用scala将json数据读入spark数据框。 架构如下:使用scala从spark中的数组中抽取结构值

root 
|-- metadata: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- playerId: string (nullable = true) 
| | |-- sources: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- matchId: long (nullable = true) 

的数据如下所示:

{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 1 } ] }, { "playerId": "1235", "sources": [ { "matchId": 1 } ] } ] } 
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 2 } ] }, { "playerId": "1248", "sources": [ { "score": 12.2 , "matchId": 1 } ] } ] } 
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 3 } ] }, { "playerId": "1248", "sources": [ { "matchId": 3 } ] } ] } 

的目标是找出是否playerId是1234和matchId为1,则返回isPlayed为真。来源的结构不固定。可能有其他字段不是matchId。

我写了一个UDF考虑对象元数据类型WrappedArray [字符串]和我能够读取playerId列

def hasPlayer = udf((metadata: WrappedArray[String], playerId: String) => { 
    metadata.contains(playerId) 
    }) 

df.withColumn("hasPlayer", hasPlayer(col("metadata"), col("superPlayerId"))) 

但我无法弄清楚如何查询给出playerId的matchId领域。我尝试将该字段作为WrappedArray [WrappedArray [Long]]读取,但它在metadata.sources.matchId列的withColumn中提供了类型转换异常。

我对Spark比较新。任何帮助将深表感谢。

干杯!

回答

0

当您处理JSON时,请了解内置函数explode,该函数将包含WrappedArray的单个单元格变成表示内部结构的多行。我认为它有助于在这里(两次):

df.select(explode($"metadata").as("metadata")) 
    .select($"metadata.playerId", explode($"metadata.sources.matchId").as("matchId")) 
    .filter($"matchId".equalTo(1)) 
    .select($"matchId", lit(true).as("isPlayed")) 

基本上我用explode创建多个行(和重命名为方便),浏览对象树的JSON领域我想,重复explode /为重命名过程在matchId,并筛选出所有不1这让我终于用​​3210功能为“硬编码”的true值一个全新的列名为isPlayed因为这是不1消失了一切。

如果这不是你正在寻找的东西,希望它能给你一些指示。当你了解Spark时,functionslibrary对你很有帮助。

+0

嘿谢谢你的答案。这会是一个代价高昂的操作吗?还有一种方法可以保留行数。爆炸我认为会乘以阵列1中爆炸元素的数量*阵列2中爆炸元素的数量。将这个数据与原始表格连接起来效率高吗? –

+0

它使用了所有更高级别的Spark功能,这是尽可能让Catalyst优化事情的一种方法。但是,你为什么不试试看它是否“高效”? – Vidya