2016-03-03 146 views
0

我是非常新的Spark Streaming,我正尝试使用pyspark读取和分析来自Kafka的JSON串流。读取流是好的,我也可以打印()RDD。解析与pyspark json串流

{"Address":"22.79.52.79","AlarmProfile":"-1","Amps":"11.98","AmpsLimit":"90","AssetTag":"-1","AssetTag_1":"-1","Blank":"0","CAN":"0","Chain":"2","Config":"\u003cUnknown\u003e",...,"WattsLimit":"-1"} 

我想解析JSON,所以我可以使用,例如,my_parsed_json [“安培”]

但我不知道如何使用json.loads()他们。

我以这种方式运行该脚本:

/data/spark/bin/spark-submit --master spark://des01:7077 --total-executor-cores 2 --jars /data/dev/2.10/spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py pkb01:9092 topicname 

其中“pkb01:9092”是卡夫卡经纪人和“topicname”是卡夫卡的话题。

我的Python代码:

from __future__ import print_function 

import sys 
import json 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

# sc es el Spark Context 

sc = SparkContext(appName="mitest") 
ssc = StreamingContext(sc, 2) 

brokers, topico = sys.argv[1:] 
kvs = KafkaUtils.createDirectStream(ssc, [topico], {"metadata.broker.list": brokers}) 

dstream = kvs.map(lambda x: x[1]) 

dstream.pprint() 

我想有这样的:

my_parsed_json = dstream.map(lambda x: json.loads(x)) 

,但我从星火出现错误。任何帮助?

错误说:

Traceback (most recent call last): 
    File "/home/spark/test.py", line 28, in <module> 
    ssc.start() 
    File "/data/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 237, in start 
    File "/data/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/data/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
    py4j.protocol.Py4JJavaError: An error occurred while calling o21.start. 
    : java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
      at scala.Predef$.require(Predef.scala:233) 
      at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
      at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:551) 
      at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:609) 
      at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:608) 
      at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:623) 
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:606) 
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
      at py4j.Gateway.invoke(Gateway.java:259) 
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
      at py4j.commands.CallCommand.execute(CallCommand.java:79) 
      at py4j.GatewayConnection.run(GatewayConnection.java:207) 
      at java.lang.Thread.run(Thread.java:745) 
+1

请张贴错误。 – javadba

+0

谢谢javadba。我添加了错误,但我认为必须是引用RDD对象的方式,而不是Dstreams。就像在Scala foreachRDD()中一样。 – jcalbo

回答

2

您需要调用下面的操作

https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html

Transformation Meaning 
map(func) Return a new DStream by passing each element of the source DStream through a function func. 
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items. 
filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true. 
repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions. 
count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. 
reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. 
countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. 

之一..等

一个或一个以上的这些都需要在你的dstream上被调用。

+0

感谢您的精彩汇总选项。 – disruptive

2

为什么不这样做的:

dstream = kvs.map(lambda x: json.loads(x[1])) 

dstream.pprint()