寻找专业知识在下面指导我解决问题。当应用pyspark ALS的“recommendProductsForUsers”(尽管> 300GB Ram集群可用)时,StackOverflow错误
背景:
- 我试图让与灵感的this example
- 作为部署的基础设施我使用谷歌云Dataproc集群基本PySpark脚本去。
- 基石在我的代码是功能“recommendProductsForUsers”记载here这使我回顶的X产品,为所有用户在模型
问题,我承担
的ALS。培训脚本运行平稳,并在GCP上很好地扩展(轻松> 100万用户)。
然而,应用预测:即使用函数'PredictAll'或'recommendationProductsForUsers',根本不缩放。我的脚本平滑运行一个小数据集(< 100 Customer with < 100 products)。但是,它带来的业务相关的大小的时候,我不管理它的规模(例如> 50K客户和> 10K产品)
错误,那么我得到的是如下:
16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 22.0 in stage 411.0 (TID 15139, productrecommendation-high-w-2.c.main-nova-558.internal): java.lang.StackOverflowError at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
我甚至为获得一个300 GB的群集(1个108GB的主节点+2个108 GB RAM的节点)来尝试运行它;它的工作原理为50K的客户,但没有什么更多的
野心是有一个设置在那里我可以为> 80万个客户
运行详细
代码行失败
predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2])))
pprint.pprint(predictions.take(10))
schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)])
dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates()
你如何建议继续?我觉得脚本结尾处的“合并”部分(即,当我将其写入dfToSave时)会导致错误;有没有办法绕开这个零件保存?
嗨丹尼斯,谢谢你的想法。我确实看到了其他线程,我同意ALS.train确实有一个可以自定义的检查点间隔参数。然而,预测所有或者推荐的产品用户功能都有这个参数;然后检查点如何工作? –
更新:已实施检查点(感谢dennis提示)。尽管它可以很好地扩展ALS.train功能(容易超过1百万的用户),但它不适用于预测:i。e。使用PredictAll的功能或推荐产品为用户。对此有何建议? –
在应用检查点之后,当抛出异常或发生异常时,是否还会看到涉及ObjectInputStream的相同堆栈跟踪? –