到JSON我有一个pairRDD看起来像火花/斯卡拉字符串中的地图
(1, {"id":1, "picture": "url1"})
(2, {"id":2, "picture": "url2"})
(3, {"id":3, "picture": "url3"})
...
,其中第二个元素是一个字符串,我是从功能的get()从http://alvinalexander.com/scala/how-to-write-scala-http-get-request-client-source-fromurl了。这里是该功能:
@throws(classOf[java.io.IOException])
@throws(classOf[java.net.SocketTimeoutException])
def get(url: String,
connectTimeout: Int = 5000,
readTimeout: Int = 5000,
requestMethod: String = "GET") =
{
import java.net.{URL, HttpURLConnection}
val connection = (new URL(url)).openConnection.asInstanceOf[HttpURLConnection]
connection.setConnectTimeout(connectTimeout)
connection.setReadTimeout(readTimeout)
connection.setRequestMethod(requestMethod)
val inputStream = connection.getInputStream
val content = io.Source.fromInputStream(inputStream).mkString
if (inputStream != null) inputStream.close
content
}
现在我想将该字符串转换为json以从中获取图片url。 (从这个https://stackoverflow.com/a/38271732/1456026)
val step2 = pairRDD_1.map({case(x,y)=>{
val jsonStr = y
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)
(x,y("picture"))
}})
但我经常收到
异常线程 “main” org.apache.spark.SparkException:任务不 序列化
当我打印出前20个元素,并尝试将字符串转换为json,然后手动将它们一个接一个地外部化。
val rdd = sc.parallelize(Seq("""{"id":1, "picture": "url1"}"""))
val df = sqlContext.read.json(rdd)
println(df)
>>>[id: string, picture: string]
如何将字符串转换为spark/scala中的json inside .map?