2017-09-02 62 views
0

这是一个在YARN集群模式下运行的Spark Streaming应用程序,它在三个Kafka Brokers中生成消息。YARN上的Spark流 - 没有足够的内存让Java运行时环境继续

只要达到150K打开的文件失败:

There is insufficient memory for the Java Runtime Environment to continue 
Native memory allocation (mmap) failed to map 12288 bytes for committing reserved memory. 

Job aborted due to stage failure ... : 
org.apache.kafka.common.KafkaException: Failed to construct kafka producer 
..... 
Caused by: java.lang.OutOfMemoryError: unable to create new native thread 

当运行该遗嘱执行人我可以看到TCP连接吨(高达90K)从主机服务器的Java进程做lsof -p <PID>卡夫卡经纪人:

host:portXXX->kafkabroker1:XmlIpcRegSvc (ESTABLISHED)

host:portYYY->kafkabroker2:XmlIpcRegSvc (ESTABLISHED)

host:portZZZ->kafkabroker3:XmlIpcRegSvc (ESTABLISHED)

我试着将执行程序内核的数量从8个减少到6个,但打开的文件数量没有一个差异(仍然达到150K),然后保持失败。

该库从星火流连接到卡夫卡是:

org.apache.spark.streaming.kafka010.KafkaUtils 
org.apache.spark.streaming.dstream.InputDStream 
org.apache.kafka.clients.producer.kafkaproducer 

代码:

foreachRDD{ 
    get kafkaProducer 
    do some work on each RDD... 
    foreach(record => { 
     kafkaProducer.send(record._1,record._2) 
    } 
    kafkaProducer.close() 
} 
+0

您使用什么库来连接来自Spark的Kafka?你能告诉我们一些代码吗? –

+0

更新了更多信息 –

+0

你见过这个答案吗? https://stackoverflow.com/a/16789621/2796894 – dawsaw

回答

0

这是一个小学生的错误。这很好的解释article帮助解决了这个问题。卡夫卡制片人没有关闭连接,所以我们使用广播和懒惰评估技术来解决问题。

相关问题