我正在尝试使用Pyspark编写Spark RDD到Hbase表。该RDD看起来像下面 使用打印rdd.take(rdd.count())命令保存pyspark rdd到hbase引发属性错误
[Decimal('0.39326837'), Decimal('0.03643601'), Decimal('0.06031798'), Decimal('0.08885452')]
当我尝试写使用功能的RDD到HBase的表saveRecord
def SaveRecord(tx_fee_rdd):
host = 'localhost' #sys.argv[1]
table = 'tx_fee_table' #needs to be created before hand in hbase shell
conf = {"hbase.zookeeper.quorum": host,
"hbase.mapred.outputtable": table,
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
#row key id,id, cfamily=tx_fee_col,column_name = tx_fee, column_value=x
datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x))
datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
tx_fee_rdd.foreach(SaveRecord)
我得到的以下错误
AttributeError: 'Decimal' object has no attribute 'map'
如何处理此问题?
继@ zeros323建议,我收到以下错误
Traceback (most recent call last):
File "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 66, in <module>
SaveRecord(tx_fee_rdd)
File "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 29, in SaveRecord
datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1348, in saveAsNewAPIHadoopDataset
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset.
: org.apache.spark.SparkException: RDD element of type [Ljava.lang.Object; cannot be used
at org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:237)
at org.apache.spark.api.python.PythonRDD$.saveAsHadoopDataset(PythonRDD.scala:801)
at org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
一个工作代码不应该-ve代表没有正当理由。每个人都不能完全说英语,也不能很好地表达自己的问题。让我们成为一个热情的社区。 – user2979872
如果我可能会问,你如何提交这个脚本?你需要下载一个特定的jar吗? –