2017-05-26 152 views
0

我有一个看起来像这样多个XML文件计数行:PySpark包含字符串

<?xml version="1.0" encoding="UTF-8"?> 
<parent> 
    <row AcceptedAnswerId="15" AnswerCount="5" Body="&lt;p&gt;How should 
I elicit prior distributions from experts when fitting a Bayesian 
model?&lt;/p&gt;&#10;" CommentCount="1" CreationDate="2010-07- 
19T19:12:12.510" FavoriteCount="17" Id="1" LastActivityDate="2010-09- 
15T21:08:26.077" OwnerUserId="8" PostTypeId="1" Score="26" 
Tags="&lt;bayesian&gt;&lt;prior&gt;&lt;elicitation&gt;" 
Title="Eliciting priors from experts" ViewCount="1457" /> 

我想能够使用PySpark数不包含该字符串的行:<row

我目前的想法:

def startWithRow(line): 
    if line.strip().startswith("<row"): 
     return True 
    else: 
     return False 

sc.textFile(localpath("folder_containing_xmg.gz_files")) \ 
    .filter(lambda x: not startWithRow(x)) \ 
    .count() 

我试图验证这一点,但我从甚至一个简单的计数线是没有意义得到结果(我下载的XML文件,并做了一个wc它与PySpark的字数不匹配。)

关于我上面的方法有什么突出的错/奇怪吗?

+0

的可能的复制[如何解析Apache中星火XML文件?(https://stackoverflow.com/questions/33280821/how-to-parse-xml-files-in-apache-spark) –

回答

0

我将只使用lxml库星火相结合来算符合row或筛选出来的东西。

from lxml import etree 

def find_number_of_rows(path): 
    try: 
     tree = etree.fromstring(path) 
    except: 
     tree = etree.parse(path) 
    return len(tree.findall('row')) 

rdd = spark.sparkContext.parallelize(paths) # paths is a list to all your paths 
rdd.map(lambda x: find_number_of_rows(x)).collect() 

举例来说,如果你有列表或XML字符串(只是玩具的例子),你可以做到以下几点:

text = [ 
    """ 
    <parent> 
     <row ViewCount="1457" /> 
     <row ViewCount="1457" /> 
    </parent> 
    """, 
    """ 
    <parent> 
     <row ViewCount="1457" /> 
     <row ViewCount="1457" /> 
     <row ViewCount="1457" /> 
    </parent> 
    """ 
] 

rdd = spark.sparkContext.parallelize(text) 
rdd.map(lambda x: find_number_of_rows(x)).collect() 

在你的情况,你的函数必须采取的路径,而不是文件。然后,您可以对这些行进行计数或过滤。我没有一个完整的文件来测试。让我知道你是否需要额外的帮助!

+0

感谢您及时的回复。但是,如果我跑你的榜样,它返回[0,0] –

+0

可以尝试只是做'find_number_of_rows(文[0])'在Python,看看功能为你的作品? – titipata

+1

感谢您的帮助,我终于明白了。我没有正确减少 –

0
def badRowParser(x):  
    try: 
     line = ET.fromstring(x.strip().encode('utf-8')) 
     return True 
    except: 
     return False 
posts = sc.textFile(localpath('folder_containing_xml.gz_files')) 
rejected = posts.filter(lambda l: "<row" in l.encode('utf- 
8')).map(lambda x: not badRowParser(x)) 
ans = rejected.collect() 

from collections import Counter 
Counter(ans)