2017-10-12 152 views
0

我试图使用Spark Scala代码流式传输twitter数据。我能够获取数据并创建数据框并查看它。但是,当尝试提取status.getPlace.getCountry()时,我得到显示java.lang.NullPointerException。使用Spark的Twitter流式传输

星火版本:2.0.0, 斯卡拉版本:

2.11.8试图用if条件,检查值等,但不成功。

代码:

val spark = SparkSession.builder().appName("Twitter Spark Example").getOrCreate() 
val ssc = new StreamingContext(spark.sparkContext,Seconds(5)) 

val filters:Seq[String] = Seq("hadoop") 
val cb = new ConfigurationBuilder() 
     .setOAuthConsumerKey("******") 
     .setOAuthConsumerSecret("******") 
     .setOAuthAccessToken("********") 
     .setOAuthAccessTokenSecret("******").build() 

val twitter_auth = new TwitterFactory(cb) 
val a = new OAuthAuthorization(cb) 
val atwitter:Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization()) 

val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2) 
val data = tweetsdstream.map {status => 
     val places = status.getPlace 
     val id = status.getUser.getId 
     val date = status.getUser.getCreatedAt.toString() 
     val user = status.getUser.getName() 
     val place = places.getCountry() 

     (id,date,user,place) 
     } 
data.foreachRDD{rdd => 
     import spark.implicits._ 
     rdd.toDF("id","date","user","place").show() 
    } 

ssc.start() 
ssc.awaitTermination() 

是否有来自Twitter的访问位置信息有任何限制? 任何建议都会有帮助。

感谢

+3

实际上大部分时间'getPlace'和'getCountry'都包含null值,您可以尝试使用geoLocation而不是 –

回答

0

您可以使用Option处理null S:

val data = tweetsdstream.map { 
    status => 
    val place = Option(status.getPlace).map(_.getCountry).orNull 
    val id = status.getUser.getId 
    val user = status.getUser.getName 
    val date = status.getUser.getCreatedAt.toString 
    (id, date, user, place) 
} 

这样一来,你就能够想象的所有微博,无论他们是否有一个国家或没有(和它在国家未定义的情况下将为空)。

Option对于处理可能丢失的数据非常有用,可以将其用于其他可能的空字段。

+0

,您的解决方案适用于我。非常感谢。 –