2017-04-08 74 views
0

有可能使用updateStateByKey()函数与元组作为值吗?我使用PySpark,我的输入是(word, (count, tweet_id)),这意味着word是一个关键字,而一个元组(count, tweet_id)是一个值。 updateStateByKey的任务是为每个单词总结他们的计数并创建包含该单词的所有tweet_id的列表。Spark Streaming updateStateByKey与元组作为值

我实现了以下更新功能,但我得到了错误列表索引超出范围new_values索引1:

def updateFunc(new_values, last_sum): 
    count = 0 
    tweets_id = [] 
    if last_sum: 
    count = last_sum[0] 
    tweets_id = last_sum[1] 
    return sum(new_values[0]) + count, tweets_id.extend(new_values[1]) 

并调用方法:

running_counts.updateStateByKey(updateFunc) 
+0

可以分享pyspark代码,我可以自己试试这样一个例子。想知道为什么-1被给出 – thebluephantom

+0

我在这个项目中遇到了这个问题https://github.com/dmacjam/twitter-word-cloud/blob/master/processing/trending_words.py –

+0

thx,会尝试并摆脱,如果减1 – thebluephantom

回答

1

我已经找到了解决办法。问题出在checkpointing,这意味着当前状态在发生故障时会保存到磁盘。它造成了一些问题,因为当我改变了我的状态定义时,在检查点它处于没有元组的旧状态。因此,我从磁盘中删除检查点并实现最终的解决方案为:

def updateFunc(new_values, last_sum): 
    count = 0 
    counts = [field[0] for field in new_values] 
    ids = [field[1] for field in new_values] 
    if last_sum: 
    count = last_sum[0] 
    new_ids = last_sum[1] + ids 
    else: 
    new_ids = ids 
    return sum(counts) + count, new_ids 

最后,回答我的问题是:是的,状态可以是一个元组或用于存储更多值任何其它数据类型。