2017-05-05 138 views
1

我跟着这个article向AWS ES发送了一些数据,并且我使用了jar elasticsearch-hadoop。这里是我的脚本:将数据从pyspark写入ElasticSearch

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
if __name__ == "__main__": 
    conf = SparkConf().setAppName("WriteToES") 
    sc = SparkContext(conf=conf) 
    sqlContext = SQLContext(sc) 
    es_conf = {"es.nodes" : "https://search-elasticsearchdomaine.region.es.amazonaws.com/", 
    "es.port" : "9200","es.nodes.client.only" : "true","es.resource" : "sensor_counts/metrics"} 
    es_df_p = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv") 
    es_df_pf= es_df_p.groupBy("network_key") 
    es_df_pf.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf) 

然后我运行此命令行:

spark-submit --jars elasticsearch-spark-20_2.11-5.3.1.jar write_to_es.py 

其中write_to_es.py是上面的脚本。

这是我得到的错误:

17/05/05 17:51:52 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 
17/05/05 17:51:52 INFO HadoopRDD: Input split: file:/home/user/spark-2.1.0-bin-hadoop2.7/output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv:0+178633 
17/05/05 17:51:52 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1143 bytes result sent to driver 
17/05/05 17:51:52 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 11 ms on localhost (executor driver) (1/1) 
17/05/05 17:51:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/05/05 17:51:52 INFO DAGScheduler: ResultStage 1 (load at NativeMethodAccessorImpl.java:0) finished in 0,011 s 
17/05/05 17:51:52 INFO DAGScheduler: Job 1 finished: load at NativeMethodAccessorImpl.java:0, took 0,018727 s 
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.26:39609 in memory (size: 2.1 KB, free: 366.3 MB) 
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 192.168.1.26:39609 in memory (size: 22.9 KB, free: 366.3 MB) 
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.1.26:39609 in memory (size: 2.1 KB, free: 366.3 MB) 
Traceback (most recent call last): 
    File "/home/user/spark-2.1.0-bin-hadoop2.7/write_to_es.py", line 11, in <module> 
    es_df_pf.saveAsNewAPIHadoopFile(
    File "/home/user/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 964, in __getattr__ 
AttributeError: 'DataFrame' object has no attribute 'saveAsNewAPIHadoopFile' 
17/05/05 17:51:53 INFO SparkContext: Invoking stop() from shutdown hook 
17/05/05 17:51:53 INFO SparkUI: Stopped Spark web UI at http://192.168.1.26:4040 
17/05/05 17:51:53 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
17/05/05 17:51:53 INFO MemoryStore: MemoryStore cleared 
17/05/05 17:51:53 INFO BlockManager: BlockManager stopped 
17/05/05 17:51:53 INFO BlockManagerMaster: BlockManagerMaster stopped 
17/05/05 17:51:53 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
17/05/05 17:51:53 INFO SparkContext: Successfully stopped SparkContext 
17/05/05 17:51:53 INFO ShutdownHookManager: Shutdown hook called 
17/05/05 17:51:53 INFO ShutdownHookManager: Deleting directory /tmp/spark-501c4efa-5402-430e-93c1-aaff4caddef0 
17/05/05 17:51:53 INFO ShutdownHookManager: Deleting directory /tmp/spark-501c4efa-5402-430e-93c1-aaff4caddef0/pyspark-52406fa8-e8d1-4aca-bcb6-91748dc87507 

如何解决这个问题:

AttributeError: 'DataFrame' object has no attribute 'saveAsNewAPIHadoopFile' 

任何帮助或建议非常赞赏。

回答

0

我有同样的问题。

看完this article后,我找到了答案!

你必须转换为PythonRDD类型是这样的:

>>> type(df) 
<class 'pyspark.sql.dataframe.DataFrame'> 

>>> type(df.rdd) 
<class 'pyspark.rdd.RDD'> 

>>> df.rdd.saveAsNewAPIHadoopFile(...) # Got the same error message 

>>> df.printSchema() # My schema 
root 
|-- id: string (nullable = true) 
... 

# Let's convert to PythonRDD 
>>> python_rdd = df.map(lambda item: ('key', { 
... 'id': item['id'], 
    ... 
... })) 

>>> python_rdd 
PythonRDD[42] at RDD at PythonRDD.scala:43 

>>> python_rdd.saveAsNewAPIHadoopFile(...) # Now, success 
0

saveAsNewAPIHadoopFile是RDD,

http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

我想这条线应该是

es_df_pf.rdd.saveAsNewAPIHadoopFile 
+0

当我尝试它,它给了我一个巨大的错误:'17/05/09 9时40分21秒ERROR执行人:异常的任务0.0在阶段2.0(TID 2) net.razorvine.pickle.PickleException:构建ClassDict的预期零参数(用于pyspark.sql.types._create_row)' – Somar