我正在设置火花流光与kinesis和红移。我每10秒钟从kinesis读取数据,处理它,并使用spark-redshift lib将它写入红移。火花红移花费很多时间写红移
问题是它只花了很多时间只写了300行。
这就是它显示我在控制台
[Stage 56:====================================================> (193 + 1)/200]
看我的日志df.write.format是这样做的。
我在带有4GB内存和2个核心的亚马逊EC2的机器上运行spark-setup,使用--master local [*]模式运行。
这是我如何创建流
kinesisStream = KinesisUtils.createStream(ssc, APPLICATION_NAME, STREAM_NAME, ENDPOINT, REGION_NAME, INITIAL_POS, CHECKPOINT_INTERVAL, awsAccessKeyId =AWSACCESSID, awsSecretKey=AWSSECRETKEY, storageLevel=STORAGE_LEVEL)
CHECKPOINT_INTERVAL = 60
storageLevel = memory
kinesisStream.foreachRDD(writeTotable)
def WriteToTable(df, type):
if type in REDSHIFT_PAGEVIEW_TBL:
df = df.groupby([COL_STARTTIME, COL_ENDTIME, COL_CUSTOMERID, COL_PROJECTID, COL_FONTTYPE, COL_DOMAINNAME, COL_USERAGENT]).count()
df = df.withColumnRenamed('count', COL_PAGEVIEWCOUNT)
# Write back to a table
url = ("jdbc:redshift://" + REDSHIFT_HOSTNAME + ":" + REDSHIFT_PORT + "/" + REDSHIFT_DATABASE + "?user=" + REDSHIFT_USERNAME + "&password="+ REDSHIFT_PASSWORD)
s3Dir = 's3n://' + AWSACCESSID + ':' + AWSSECRETKEY + '@' + BUCKET + '/' + FOLDER
print 'Start writing to redshift'
df.write.format("com.databricks.spark.redshift").option("url", url).option("dbtable", REDSHIFT_PAGEVIEW_TBL).option('tempdir', s3Dir).mode('Append').save()
print 'Finished writing to redshift'
请让我知道采取这一多时间
不要只写3行的东西需要4分钟左右的时间很多。此外,即使我有5000行写,仍然4分钟是很多时间 – Nipun
哇,我没有意识到它花了那么长时间。在这种情况下,可能发生的情况是分区太多(从上面的输出中可以看出这种情况)。这可能会导致从机器写入S3的瓶颈。我不确定这是否适用于流媒体,但对于常规的spark工作,如df.coalesce(1).write.format(“com.databricks.spark.redshift”)。option(“url”,url)。选项(“dbtable”,REDSHIFT_PAGEVIEW_TBL)。选项('tempdir',s3Dir).mode('Append')。save()会起作用。你可以玩分区的数量来合并。 – DemetriKots
我尝试过,使用coalesce(4)和缓存,但它花费了相同的时间。这很奇怪,但是4分钟就像写了10条记录或1000条记录一样。我尝试联系AWS,但它也没有帮助。尝试从s3直接加载csv到红移使用命令,看看是否需要时间,但这也是几秒钟。 – Nipun