2016-02-05 147 views
0

我正在尝试使用Pyspark编写Spark RDD到Hbase表。该RDD看起来像下面 使用打印rdd.take(rdd.count())命令保存pyspark rdd到hbase引发属性错误

[Decimal('0.39326837'), Decimal('0.03643601'), Decimal('0.06031798'), Decimal('0.08885452')] 

当我尝试写使用功能的RDD到HBase的表saveRecord

def SaveRecord(tx_fee_rdd): 
    host = 'localhost' #sys.argv[1] 
    table = 'tx_fee_table' #needs to be created before hand in hbase shell 
    conf = {"hbase.zookeeper.quorum": host, 
      "hbase.mapred.outputtable": table, 
      "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", 
      "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
      "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} 
    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" 
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" 
    #row key id,id, cfamily=tx_fee_col,column_name = tx_fee, column_value=x 
    datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x)) 
    datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv) 


tx_fee_rdd.foreach(SaveRecord) 

我得到的以下错误

AttributeError: 'Decimal' object has no attribute 'map' 

如何处理此问题?

继@ zeros323建议,我收到以下错误

Traceback (most recent call last): 
    File "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 66, in <module> 
    SaveRecord(tx_fee_rdd) 
    File "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 29, in SaveRecord 
    datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv) 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1348, in saveAsNewAPIHadoopDataset 
    File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset. 
: org.apache.spark.SparkException: RDD element of type [Ljava.lang.Object; cannot be used 
    at org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:237) 
    at org.apache.spark.api.python.PythonRDD$.saveAsHadoopDataset(PythonRDD.scala:801) 
    at org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 
+0

一个工作代码不应该-ve代表没有正当理由。每个人都不能完全说英语,也不能很好地表达自己的问题。让我们成为一个热情的社区。 – user2979872

+0

如果我可能会问,你如何提交这个脚本?你需要下载一个特定的jar吗? –

回答

2

foreach个人记录操作,因此接收Decimal对象不RDD秒。你不能映射这些更不用说使用saveAsNewAPIHadoopDataset方法。

如果你想用你的saveAsNewAPIHadoopDataset功能应该在RDD直接操作:

SaveRecord(tx_fee_rdd) 

另一个可能的问题是以下部分:

datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x)) 

saveAsNewAPIHadoopDataset预计不会对三胞胎。它也可能不适用于Decimal对象。详情请参阅hbase_outputformat.py example

+0

链接已损坏,请检查 – Bg1850

0

@zeros,您对关键的声明,价值是正确的,但不是关于foreach。你可以看到这个blog

下面是写入HBase的

from pyspark import SparkContext 
from jsonrpc.authproxy import AuthServiceProxy 
from pyspark.streaming import StreamingContext 
import json 


# Create a local StreamingContext with * working thread and batch interval of 1 second 
sc = SparkContext("local[*]", "txcount") 
ssc = StreamingContext(sc, 0.5) #0.001 did 9710 blocks in 12 minutes 

#function SaveRecord: saves tx_fee for a block to hbase database 
def SaveRecord(tx_fee_rdd): 
    host = 'localhost' #sys.argv[1] 
    table = 'transaction_fee_table' #needs to be created before hand in hbase shell 
    conf = {"hbase.zookeeper.quorum": host, 
      "hbase.mapred.outputtable": table, 
      "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", 
      "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
      "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} 
    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" 
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" 
    #row key id,id, cfamily=tx_fee_col,column_name = tx_fee, column_value=x 
    #datamap = tx_fee_rdd.map(lambda x: ("tx_fee",x)) 
    #(rowkey , [ row key , column family , column name , value ]) 
    datamap = tx_fee_rdd.map(lambda x: (str(x[0]), 
          [str(x[0]),"tx_fee_col","tx_fee",str(x[1])]) 
          ) 

    datamap.saveAsNewAPIHadoopDataset(conf=conf, 
             keyConverter=keyConv, 
             valueConverter=valueConv) 






lines = ssc.socketTextStream("localhost", 8888) 
dump_rdd = lines.map(lambda x: json.dumps(x)) 
#print dump_rdd.take(2) 
load_rdd = dump_rdd.map(lambda x: json.loads(x)).map(lambda x : x.decode('unicode_escape').encode('ascii','ignore')) 
#load_rdd.pprint(2) 

#load_rdd.pprint(100) 
#tx = load_rdd.flatMap(lambda x: x.split(":")) #this works 
split_blk_rdd = load_rdd.map(lambda x: x.split(":")) 
#split_blk_rdd.pprint() 

tx_fee_rdd = split_blk_rdd.map(lambda x : (x[14][1:7],x[15][1:-15])) #this gets transaction fee 
#tx_fee_rdd.pprint(200)  #works 
tx_fee_rdd.foreachRDD(SaveRecord)  #function call 
#tx_fee_rdd.saveAsTextFiles("hdfs://ec2-52-21-47-235.compute-1.amazonaws.com:9000/bitcoin/","txt") 

######tx_fee_rdd.pprint(1000) 

#gen_tx = tx.map(lambda x: x[8]) 

#gen_tx.pprint(100) 
#gen_tx = tx.map(parse_gen_tx) 
#print type(tx) 
#lst_tx = pprint.pprint(tx) 
#print lst_tx 
#print lst_tx[8][0] 
#print type(lst_tx[8]) 
#print "here" 
#print str(tx[8])[4:-4] #gives tx_id without the enclosing quotes 

#print type(lst_tx) 

ssc.start()    # Start the computation 
#ssc.awaitTermination() # Wait for the computation to terminate 
ssc.awaitTerminationOrTimeout(15000) #13000#time out in 3 hours 
#ssc.stop() # Wait for the computation to terminate 
+0

“foreachRDD”与“foreach”不同,“DStream”不是“RDD”,并且您粘贴的流式处理代码与您显示的代码不匹配该问题或错误消息。 – zero323

+0

@ zero323,感谢您的评论。你可以评论foreachRDD和foreach之间的区别。在DStream和RDD之间也是?再次感谢。 – user2065276

+0

流是Spark Streaming中使用的RDD的连续序列。 'foreachRDD'对流中的每个RDD执行一个动作(意外;),其类型为'RDD [T] => Unit''。 RDD(弹性分布式数据集)是Spark中的一个并行单元(不仅是Streaming)。 'foreach'是对RDD的一个操作,并且具有'T => Unit'类型,并对RDD的每个元素执行一些操作(副作用)。 – zero323