2016-09-14 141 views
0

我在使用updateStateByKey函数并同时缓存一些大数据时遇到了问题。这是一个例子。Spark Streaming - updateStateByKey和缓存数据

可以说我从卡夫卡获取数据(姓氏,年龄)。我想保持每个人的实际年龄,所以我使用updateStateByKey。我也想知道每个人的名字,所以我加入输出与外部表(姓,名),例如来自Hive。让我们假设它是真正的大桌子,所以我不想在每批中加载它。还有一个问题。

当我在每批中加载表时,但是当我试图缓存表时,StreamingContext未启动,所有工作都很好。我也尝试使用registerTempTable,后来用sql连接数据,但我得到了同样的错误。

似乎问题是updateStateByKey所需的检查点。当我删除updateStateByKey并离开检查点时,我得到了错误,但是当我删除它时,它的工作。

错误我越来越:pastebin

下面是代码:

import sys 

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext, HiveContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

# function to keep actual state  
def updateFunc(channel, actualChannel): 
    if (actualChannel is None or not channel is None): 
     try: 
      actualChannel = channel[-1] 
     except Exception: 
      pass 
    if channel is None: 
     channel = actualChannel 
    return actualChannel 

def splitFunc(row): 
    row = row.strip() 
    lname,age = row.split() 
    return (lname,age)  


def createContext(brokers,topics): 
    # some conf 
    conf = SparkConf().setAppName(appName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.dynamicAllocation.enabled","false").\ 
    set("spark.serializer","org.apache.spark.serializer.KryoSerializer").set("spark.sql.shuffle.partitions",'100') 
    # create SparkContext 
    sc = SparkContext(conf=conf) 

    # create HiveContext 
    sqlContext = HiveContext(sc) 

    # create Streaming Context 
    ssc = StreamingContext(sc, 5) 

    # read big_df and cache (not work, Streaming Context not start) 
    big_df = sqlContext.sql('select lastname,name from `default`.`names`') 
    big_df.cache().show(10) 

    # join table 
    def joinTable(time,rdd): 
     if rdd.isEmpty()==False: 
      df = HiveContext.getOrCreate(SparkContext.getOrCreate()).createDataFrame(rdd,['lname','age']) 

      # read big_df (work) 
      #big_df = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`') 

      # join DMS 
      df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer") 

      return df2.map(lambda row:row) 

    # streaming 
    kvs = KafkaUtils.createDirectStream(ssc, [topics], {'metadata.broker.list': brokers})   
    kvs.map(lambda (k,v): splitFunc(v)).updateStateByKey(updateFunc).transform(joinTable).pprint() 

    return ssc 

if __name__ == "__main__": 
    appName="SparkCheckpointUpdateSate" 
    if len(sys.argv) != 3: 
     print("Usage: SparkCheckpointUpdateSate.py <broker_list> <topic>") 
     exit(-1) 

    brokers, topics = sys.argv[1:] 

    # getOrCreate Context 
    checkpoint = 'SparkCheckpoint/checkpoint' 
    ssc = StreamingContext.getOrCreate(checkpoint,lambda: createContext(brokers,topics)) 

    # start streaming 
    ssc.start() 
    ssc.awaitTermination() 

你能告诉我如何正确缓存数据时启用检查点?也许有一些我不知道的解决方法。

Spark ver。 1.6

回答

0

我用big_df的lazily实例化的全局实例得到了这个工作。类似的事情在recoverable_network_wordcount.py 中完成。

def getBigDf(): 
    if ('bigdf' not in globals()): 
     globals()['bigdf'] = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`') 
    return globals()['bigdf'] 

def createContext(brokers,topics): 
    ... 
    def joinTable(time,rdd): 
     ... 
     # read big_df (work) 
     big_df = getBigDF() 

     # join DMS 
     df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer") 

     return df2.map(lambda row:row) 
    ... 

似乎在流中,所有数据必须缓存在流处理中,而不是之前。