我有一个PySpark应用程序必须详细说明5GB的压缩数据(字符串)。我正在使用一个带有12个内核(24个线程)和72Gb RAM的小型服务器。我的PySpark程序仅包含2个地图操作,由3个非常大的正则表达式(每个3gb已编译)和pickle
加载。 Spark工作在独立模式下,工人和主人在同一台机器上。环境有多少火花能做到?
我的问题是:是否为每个执行器核心引发复制每个变量?因为它使用所有可用的内存,然后使用大量的交换空间。或者它可能会加载RAM中的所有分区? RDD包含大约1000万字符串,必须由3个正则表达式搜索。 RDD计算大约1000个分区。我在完成这项任务时遇到了麻烦,因为在几分钟后内存已满并且使用交换空间的启动开始变得非常缓慢。 我注意到没有正则表达式的情况是一样的。
这是我的代码,它会删除Twitter的鸣叫的所有无用的领域和扫描鸣叫的文本和说明特定的词:
import json
import re
import twitter_util as twu
import pickle
from pyspark import SparkContext
sc = SparkContext()
prefix = '/home/lucadiliello'
source = prefix + '/data/tweets'
dest = prefix + '/data/complete_tweets'
#Regex's path
companies_names_regex = prefix + '/data/comp_names_regex'
companies_names_dict = prefix + '/data/comp_names_dict'
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal'
#Loading the regex's
comp_regex = pickle.load(open(companies_names_regex))
comp_dict = pickle.load(open(companies_names_dict))
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal))
#Loading the RDD from textfile
tx = sc.textFile(source).map(lambda a: json.loads(a))
def get_device(input_text):
output_text = re.sub('<[^>]*>', '', input_text)
return output_text
def filter_data(a):
res = {}
try:
res['mentions'] = a['entities']['user_mentions']
res['hashtags'] = a['entities']['hashtags']
res['created_at'] = a['created_at']
res['id'] = a['id']
res['lang'] = a['lang']
if 'place' in a and a['place'] is not None:
res['place'] = {}
res['place']['country_code'] = a['place']['country_code']
res['place']['place_type'] = a['place']['place_type']
res['place']['name'] = a['place']['name']
res['place']['full_name'] = a['place']['full_name']
res['source'] = get_device(a['source'])
res['text'] = a['text']
res['timestamp_ms'] = a['timestamp_ms']
res['user'] = {}
res['user']['created_at'] = a['user']['created_at']
res['user']['description'] = a['user']['description']
res['user']['followers_count'] = a['user']['followers_count']
res['user']['friends_count'] = a['user']['friends_count']
res['user']['screen_name'] = a['user']['screen_name']
res['user']['lang'] = a['user']['lang']
res['user']['name'] = a['user']['name']
res['user']['location'] = a['user']['location']
res['user']['statuses_count'] = a['user']['statuses_count']
res['user']['verified'] = a['user']['verified']
res['user']['url'] = a['user']['url']
except KeyError:
return []
return [res]
results = tx.flatMap(filter_data)
def setting_tweet(tweet):
text = tweet['text'] if tweet['text'] is not None else ''
descr = tweet['user']['description'] if tweet['user']['description'] is not None else ''
del tweet['text']
del tweet['user']['description']
tweet['text'] = {}
tweet['user']['description'] = {}
del tweet['mentions']
#tweet
tweet['text']['original_text'] = text
tweet['text']['mentions'] = twu.find_retweet(text)
tweet['text']['links'] = []
for j in twu.find_links(text):
tmp = {}
try:
tmp['host'] = twu.get_host(j)
tmp['link'] = j
tweet['text']['links'].append(tmp)
except ValueError:
pass
tweet['text']['companies'] = []
for x in comp_regex.findall(text.lower()):
tmp = {}
tmp['id'] = comp_dict[x.lower()]
tmp['name'] = x
tmp['legalName'] = comp_dict_legal[x.lower()]
tweet['text']['companies'].append(tmp)
# descr
tweet['user']['description']['original_text'] = descr
tweet['user']['description']['mentions'] = twu.find_retweet(descr)
tweet['user']['description']['links'] = []
for j in twu.find_links(descr):
tmp = {}
try:
tmp['host'] = twu.get_host(j)
tmp['link'] = j
tweet['user']['description']['links'].append(tmp)
except ValueError:
pass
tweet['user']['description']['companies'] = []
for x in comp_regex.findall(descr.lower()):
tmp = {}
tmp['id'] = comp_dict[x.lower()]
tmp['name'] = x
tmp['legalName'] = comp_dict_legal[x.lower()]
tweet['user']['description']['companies'].append(tmp)
return tweet
res = results.map(setting_tweet)
res.map(lambda a: json.dumps(a)).saveAsTextFile(dest, compressionCodecClass="org.apache.hadoop.io.compress.BZip2Codec")
UPDATE 约1小时后,内存(72GB )完全满并且交换(72gb)。在我的情况下使用广播不是一个解决方案。
UPDATE 2 如果不加载3个变量和pickle,使用高达10GB的内存而不是144GB的内存就不会出现问题! (72GB RAM + 72Gb交换)
代码会很好,但如果没有它,你就会回答你的问题--Spark使用与你分配给Python工作者的许多线程(核心)一样多的本地变量副本。有一些解决方法,但通常相当复杂。 – zero323
鉴于代码,您应该为驱动程序副本添加+1,在驱动程序上为pickle版本添加+1,为每个执行程序JVM添加+1(或多或少)。您可以通过使用广播或直接从执行者加载数据来稍微改善这一点。 – zero323
对于每个执行程序进程,在内存中使用相同的正则表达式实例没有窍门吗?如果不是我认为我会减少执行者的数量..... –