2016-12-02 99 views
0

到JSON我有一个pairRDD看起来像火花/斯卡拉字符串中的地图

(1, {"id":1, "picture": "url1"}) 
(2, {"id":2, "picture": "url2"}) 
(3, {"id":3, "picture": "url3"}) 
... 

,其中第二个元素是一个字符串,我是从功能的get()从http://alvinalexander.com/scala/how-to-write-scala-http-get-request-client-source-fromurl了。这里是该功能:

@throws(classOf[java.io.IOException]) 
@throws(classOf[java.net.SocketTimeoutException]) 
def get(url: String, 
     connectTimeout: Int = 5000, 
     readTimeout: Int = 5000, 
     requestMethod: String = "GET") = 
{ 
    import java.net.{URL, HttpURLConnection} 
    val connection = (new URL(url)).openConnection.asInstanceOf[HttpURLConnection] 
    connection.setConnectTimeout(connectTimeout) 
    connection.setReadTimeout(readTimeout) 
    connection.setRequestMethod(requestMethod) 
    val inputStream = connection.getInputStream 
    val content = io.Source.fromInputStream(inputStream).mkString 
    if (inputStream != null) inputStream.close 
    content 
} 

现在我想将该字符串转换为json以从中获取图片url。 (从这个https://stackoverflow.com/a/38271732/1456026

val step2 = pairRDD_1.map({case(x,y)=>{ 
val jsonStr = y 
val rdd = sc.parallelize(Seq(jsonStr)) 
val df = sqlContext.read.json(rdd) 
(x,y("picture")) 
}}) 

但我经常收到

异常线程 “main” org.apache.spark.SparkException:任务不 序列化

当我打印出前20个元素,并尝试将字符串转换为json,然后手动将它们一个接一个地外部化。

val rdd = sc.parallelize(Seq("""{"id":1, "picture": "url1"}""")) 
val df = sqlContext.read.json(rdd) 
println(df) 
>>>[id: string, picture: string] 

如何将字符串转换为spark/scala中的json inside .map?

回答

0

通常,当您看到此消息时,这是因为您正在使用地图函数中的资源(读取匿名函数),该资源是在其外部定义的,并且无法序列化。

以集群模式运行时,匿名函数将在不同的机器上运行。在这个独立的机器上,你的应用程序的一个新实例被实例化,并且它的状态(变量/值/ etc)由驱动程序序列化并发送到新实例的数据设置。如果你的匿名函数是一个闭包(即利用它范围之外的变量),那么这些资源必须是可序列化的,以便发送给工作节点。

例如,map函数可能会尝试使用数据库连接来获取RDD中每个记录的一些信息。该数据库连接仅在创建它的主机上有效(当然,从网络的角度来看),这通常是驱动程序,所以它不能被序列化,发送和从不同的主机使用。在这个特定的例子中,你会做一个mapPartitions()来实例化一个来自工作者本身的数据库连接,然后map()该分区中的每个记录来查询数据库。

如果没有完整的代码示例,我无法提供更多帮助,以查看哪些潜在值或变量无法序列化。

1

在分布式操作中不能使用SparkContext。在上面的代码中,您无法访问map操作pairRDD_1上的SparkContext。

考虑使用JSON库来执行转换。

0

其中一个答案是使用json4s库。 来源:http://muster.json4s.org/docs/jawn_codec.html

//case class defined outside main() 
case class Pictures(id: String, picture: String) 

// import library 
import muster._ 
import muster.codec.jawn._ 

// here all the magic happens 
val json_read_RDD = pairRDD_1.map({case(x,y) => 
     { 
      val json_read_to_case_class = JawnCodec.as[Pictures](y) 
      (x, json_read_to_case_class.picture) 
    }}) 

// add to build.sbt 
libraryDependencies ++= Seq(
"org.json4s" %% "muster-codec-json" % "0.3.0", 
"org.json4s" %% "muster-codec-jawn" % "0.3.0") 

学分去特拉维斯HEGNER,谁解释了为什么原来的代码没有工作 和安东Okolnychyi使用JSON库的建议。