2017-09-23 95 views
1

我有一个数据帧。Json解析在Spark UDF中抛出意外的输出

该数据框中所有列的数据类型都是字符串。一些列的是jsonString

+--------+---------+--------------------------+ 
|event_id|event_key|    rights  | 
+--------+---------+--------------------------+ 
|  410|(default)|{"conditions":[{"devic...| 
+--------+---------+--------------------------+ 

我想独自解析jsonString并从取一个值,将其添加为新列。我正在使用杰克逊解析器来做到这一点。

这里是“权利”

{ 
"conditions": [ 
    { 
     "devices": [ 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "IOS", 
       "type": "MOBILE", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "ANDROID", 
       "type": "MOBILE", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "IOS", 
       "type": "TABLET", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "ANDROID", 
       "type": "TABLET", 
       "provider": "TELETV" 
      } 
     ], 
     "endDateTime": "2017-01-09T22:59:59.000Z", 
     "inclusiveGeoTerritories": [ 
      "DE", 
      "IT", 
      "ZZ" 
     ], 
     "mediaType": "Linear", 
     "offers": [ 
      { 
       "endDateTime": "2017-01-09T22:59:59.000Z", 
       "isRestartable": true, 
       "isRecordable": true, 
       "isCUTVable": false, 
       "recordingMode": "UNIQUE", 
       "retentionCUTV": "P7DT2H", 
       "retentionNPVR": "P2Y6M5DT12H35M30S", 
       "offerId": "MOTOGP-RACE", 
       "offerType": "IPPV", 
       "startDateTime": "2017-01-09T17:00:00.000Z" 
      } 
     ], 
     "platformName": "USA", 
     "startDateTime": "2017-01-09T17:00:00.000Z", 
     "territory": "USA" 
    } 
] 
} 

现在我想建立在现有的数据帧新列的值。要添加的新列的名称是“提供者”

conditions -> devices -> provider 

我想在数据框中为非常行进行此操作。因此,我创建了一个UDF,我路过持有的jsonString到UDF和UDF里面我想解析JSON字符串,需要 返回一个值的字符串列

我的火花代码:

import org.apache.spark.sql.functions.udf 
import org.apache.spark.sql.functions._ 
import org.json4s._ 
import org.json4s.jackson.JsonMethods 
import org.json4s.jackson.JsonMethods._ 


    // 
    some codes to derive base dataframe 
    // 

    val fetchProvider_udf = udf(fetchProvider _) 
    val result = df.withColumn("provider",fetchProvider_udf(col("rights"))) 
    result.select("event_id,"event_key","rights","provider").show(10) 


    def fetchProvider(jsonStr:String): String = { 

    val json = JsonMethods.parse(jsonStr) 

    val providerData = json \\ "conditions" \\"devices" \\ "provider" 

    compact(render(providerData)) 
    } 

另外如果导航键不可用,我该如何处理?它会抛出异常吗?可以说“条件”在那里,“设备”在那里,但“提供者”键不在json字符串中。那我该如何处理呢?

有人能帮助我

预期输出:

+--------+---------+-----------------------+-------------+ 
|event_id|event_key|    rights  |provider  | 
+--------+---------+-----------------------+-------------+ 
|  410|(unknown)|{"conditions":[{"devic...| TELETV | 
+--------+---------+-----------------------+-------------+ 

但我得到下面的输出

+--------+---------+-----------------------+-------------------------------  ------------------------------------------------------+ 
|event_id|event_key|    rights  |              provider  | 
     +--------+---------+-----------------------+--------------------------  -----------------------------------------------------------+ 
|  410|(unknown)|{"conditions":[{"devic...| {"provider":"TELETV","provider":"TELETV","provider":"TELETV","provider":"TELETV"  } | 
    +--------+---------+-----------------------+-----------------------------  --------------------------------------------------------+ 
+1

过滤器有什么理由不使用spark的'get_json_object'? – Mariusz

+0

要求是使用任何scala语法分析器。 –

回答

0

如果要提取您应该使用下列第一供应商的价值UDF内部代码:

(json \\ "conditions" \\"devices")[0] \\ "provider" 

当前代码仅获取所有提供者(作为Map),然后将其转换为字符串作为UDF结果。

您还应该确保您的UDF不会引发任何异常(因为它会导致整个作业失败)。最简单的方法是将返回null,则:

如果要调查
  • - 由df.provider.isNull()
  • ,如果你想只保留有效的输入滤波器 - 通过df.provider.isNullNull()