2016-08-05 93 views
1

我有csv数据并使用read_csv创建了pnadas数据帧并强制所有列作为字符串。 然后,当我尝试从熊猫数据框创建火花数据帧时,我收到下面的错误消息。pandas数据帧触发数据帧“无法合并类型错误”

from pyspark import SparkContext 
    from pyspark.sql import SQLContext 
    from pyspark.sql.types import * 
    z=pd.read_csv("mydata.csv", dtype=str) 
    z.info() 
<class 'pandas.core.frame.DataFrame'> 
Int64Index: 74044003 entries, 0 to 74044002 
Data columns (total 12 columns): 
primaryid  object 
event_dt  object 
age    object 
age_cod   object 
age_grp   object 
sex    object 
occr_country object 
drug_seq  object 
drugname  object 
route   object 
outc_cod  object 
pt    object 

q= sqlContext.createDataFrame(z) 

File "<stdin>", line 1, in <module> 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 425, in createDataFrame 
rdd, schema = self._createFromLocal(data, schema) 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 341, in _createFromLocal 
struct = self._inferSchemaFromList(data) 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 241, in _inferSchemaFromList 
schema = reduce(_merge_type, map(_infer_schema, data)) 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 862, in _merge_type 
for f in a.fields] 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 856, in _merge_type 
raise TypeError("Can not merge type %s and %s" % (type(a), type(b))) 
TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'> 

这里是一个例子。我正在下载公共数据并创建pandas数据框,但spark并不会从pandas数据框中创建spark数据框。

 import pandas as pd 
     from pyspark import SparkContext 
     from pyspark.sql import SQLContext 
     from pyspark.sql.types import * 

     url ="http://www.nber.org/fda/faers/2016/demo2016q1.csv.zip" 

     import requests, zipfile, StringIO 
     r = requests.get(url, stream=True) 
     z = zipfile.ZipFile(StringIO.StringIO(r.content)) 
     z.extractall() 


     z=pd.read_csv("demo2016q1.csv") # creates pandas dataframe 

    Data_Frame = sqlContext.createDataFrame(z) 
+0

一)为什么你读本地数据只是并行。这是反模式。 b)被标记为“object”的所有列都会显示一些Spark DataFrames不支持的异构数据。 – zero323

+0

你是对的,这是不正确的方式来本地阅读,但由于其他选项失败我希望来自熊猫的数据框将很容易火花处理。正如你所说,这些列是异构的。有没有可以尝试的解决方法? –

+0

你能提供[mcve]吗?一些玩具样品,将说明那里正在发生什么... – zero323

回答

3

长话短说不依赖于模式推理。一般而言,这是昂贵和棘手的。特别是数据中的某些列(例如event_dt_num)缺少值,这会推动Pandas将它们表示为混合类型(字符串不丢失,NaN缺失值)。

如果您有疑问,最好将所有数据作为字符串读取并在之后进行投射。如果您有权访问代码簿,则应始终提供架构以避免问题并降低总体成本。

最后从驱动程序传递数据是反模式。你应该能够读取该数据直接使用csv格式(火花2.0.0+)或spark-csv库(星火1.6及以下):

df = (spark.read.format("csv").options(header="true") 
    .load("/path/tp/demo2016q1.csv")) 

## root 
## |-- primaryid: string (nullable = true) 
## |-- caseid: string (nullable = true) 
## |-- caseversion: string (nullable = true) 
## |-- i_f_code: string (nullable = true) 
## |-- i_f_code_num: string (nullable = true) 
## ... 
## |-- to_mfr: string (nullable = true) 
## |-- occp_cod: string (nullable = true) 
## |-- reporter_country: string (nullable = true) 
## |-- occr_country: string (nullable = true) 
## |-- occp_cod_num: string (nullable = true) 

在这种特殊情况下增加inferSchema="true"选项应该工作以及,但它是更好地避免它。直接

from pyspark.sql.types import StructType 

schema = StructType.fromJson({'fields': [{'metadata': {}, 
    'name': 'primaryid', 
    'nullable': True, 
    'type': 'integer'}, 
    {'metadata': {}, 'name': 'caseid', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'caseversion', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'i_f_code', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'i_f_code_num', 
    'nullable': True, 
    'type': 'integer'}, 
    {'metadata': {}, 'name': 'event_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'event_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'mfr_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'mfr_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'init_fda_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 
    'name': 'init_fda_dt_num', 
    'nullable': True, 
    'type': 'string'}, 
    {'metadata': {}, 'name': 'fda_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'fda_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'rept_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'rept_cod_num', 
    'nullable': True, 
    'type': 'integer'}, 
    {'metadata': {}, 'name': 'auth_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'mfr_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'mfr_sndr', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'lit_ref', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'age', 'nullable': True, 'type': 'double'}, 
    {'metadata': {}, 'name': 'age_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'age_grp', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'age_grp_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'sex', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'e_sub', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'wt', 'nullable': True, 'type': 'double'}, 
    {'metadata': {}, 'name': 'wt_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'rept_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'rept_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'to_mfr', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'occp_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'reporter_country', 
    'nullable': True, 
    'type': 'string'}, 
    {'metadata': {}, 'name': 'occr_country', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'occp_cod_num', 
    'nullable': True, 
    'type': 'integer'}], 
'type': 'struct'}) 

读者:您也可以提供架构如下

(spark.read.schema(schema).format("csv").options(header="true") 
    .load("/path/to/demo2016q1.csv")) 
+0

谢谢你的解释。实际上,我转而使用Pandas是因为我无法成功地将spark-csv库添加到Jupyter。我使用HDP 2.4(Spark 1.6),我安装了Jupyter。我下载了spark-csv和commons-csv,并在Jupyter笔记本启动程序中指定了这些jar的路径,但是当我尝试读取csv数据时,它失败,说它无法获取库。现在,我尝试了火花外壳,一切正常。你有没有在Jupyter(ipython)笔记本中使用spark-csv库? –

+0

当然,这种方法应该工作得很好http://stackoverflow.com/a/35762809/1560062 – zero323

+0

它的作品像魅力!谢谢你一百万。我尝试了很多其他的选择,花了很多时间。您提供的链接帮助我在几分钟内完成它。 –