1

寻找专业知识在下面指导我解决问题。当应用pyspark ALS的“recommendProductsForUsers”(尽管> 300GB Ram集群可用)时,StackOverflow错误

背景:

  • 我试图让与灵感的this example
  • 作为部署的基础设施我使用谷歌云Dataproc集群基本PySpark脚本去。
  • 基石在我的代码是功能“recommendProductsForUsers”记载here这使我回顶的X产品,为所有用户在模型

问题,我承担

  • 的ALS。培训脚本运行平稳,并在GCP上很好地扩展(轻松> 100万用户)。

  • 然而,应用预测:即使用函数'PredictAll'或'recommendationProductsForUsers',根本不缩放。我的脚本平滑运行一个小数据集(< 100 Customer with < 100 products)。但是,它带来的业务相关的大小的时候,我不管理它的规模(例如> 50K客户和> 10K产品)

  • 错误,那么我得到的是如下:

    16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager: 
        Lost task 22.0 in stage 411.0 (TID 15139, 
        productrecommendation-high-w-2.c.main-nova-558.internal): 
        java.lang.StackOverflowError 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
         at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
         at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) 
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
         at java.lang.reflect.Method.invoke(Method.java:498) 
         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
         at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
    
  • 我甚至为获得一个300 GB的群集(1个108GB的主节点+2个108 GB RAM的节点)来尝试运行它;它的工作原理为50K的客户,但没有什么更多的

  • 野心是有一个设置在那里我可以为> 80万个客户

运行详细

代码行失败

predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2]))) 
pprint.pprint(predictions.take(10)) 
schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)]) 
dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates() 

你如何建议继续?我觉得脚本结尾处的“合并”部分(即,当我将其写入dfToSave时)会导致错误;有没有办法绕开这个零件保存?

回答

1

从堆栈跟踪Spark gives a StackOverflowError when training using ALS

基本上,星火表示RDD血统递归让你深深嵌套对象结束时,事情还没有偷懒过评估的过程中,这似乎是同样的问题,迭代工作量。调用sc.setCheckpointDir并调整检查点间隔将减少此RDD沿袭的长度。

+0

嗨丹尼斯,谢谢你的想法。我确实看到了其他线程,我同意ALS.train确实有一个可以自定义的检查点间隔参数。然而,预测所有或者推荐的产品用户功能都有这个参数;然后检查点如何工作? –

+0

更新:已实施检查点(感谢dennis提示)。尽管它可以很好地扩展ALS.train功能(容易超过1百万的用户),但它不适用于预测:i。e。使用PredictAll的功能或推荐产品为用户。对此有何建议? –

+0

在应用检查点之后,当抛出异常或发生异常时,是否还会看到涉及ObjectInputStream的相同堆栈跟踪? –