2017-09-27 92 views
2

我正在运行一个Akka Streams Reactive Kafka应用程序,它应该在重负载下正常工作。运行该应用程序大约10分钟后,该应用程序将以OutOfMemoryError停机。我试图调试堆转储,发现akka.dispatch.Dispatcher正在占用〜5GB的内存。以下是我的配置文件。Akka Streams反应性Kafka - OutOfMemoryError在高负载下

阿卡版本:2.4.18

反应卡夫卡版本:2.4.18

1. application.conf

consumer { 
num-consumers = "2" 
c1 { 
    bootstrap-servers = "localhost:9092" 
    bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1} 
    groupId = "testakkagroup1" 
    subscription-topic = "test" 
    subscription-topic=${?SUBSCRIPTION_TOPIC1} 
    message-type = "UserEventMessage" 
    poll-interval = 100ms 
    poll-timeout = 50ms 
    stop-timeout = 30s 
    close-timeout = 20s 
    commit-timeout = 15s 
    wakeup-timeout = 10s 
    use-dispatcher = "akka.kafka.default-dispatcher" 
    kafka-clients { 
    enable.auto.commit = true 
    } 
} 

2. build.sbt

java -Xmx6g \ 
-Dcom.sun.management.jmxremote.port=27019 \ 
-Dcom.sun.management.jmxremote.authenticate=false \ 
-Dcom.sun.management.jmxremote.ssl=false \ 
-Djava.rmi.server.hostname=localhost \ 
-Dzookeeper.host=$ZK_HOST \ 
-Dzookeeper.port=$ZK_PORT \ 
-jar ./target/scala-2.11/test-assembly-1.0.jar 

3. SourceSink演员:

class EventStream extends Actor with ActorLogging { 

    implicit val actorSystem = context.system 
    implicit val timeout: Timeout = Timeout(10 seconds) 
    implicit val materializer = ActorMaterializer() 
    val settings = Settings(actorSystem).KafkaConsumers 

    override def receive: Receive = { 
    case StartUserEvent(id) => 
     startStreamConsumer(consumerConfig("EventMessage"+".c"+id)) 
    } 

    def startStreamConsumer(config: Map[String, String]) = { 
    val consumerSource = createConsumerSource(config) 

    val consumerSink = createConsumerSink() 

    val messageProcessor = startMessageProcessor(actorA, actorB, actorC) 

    log.info("Starting The UserEventStream processing") 

    val future = consumerSource.map { message => 
     val m = s"${message.record.value()}" 
     messageProcessor ? m 
    }.runWith(consumerSink) 

    future.onComplete { 
     case _ => actorSystem.stop(messageProcessor) 
    } 
    } 

    def startMessageProcessor(actorA: ActorRef, actorB: ActorRef, actorC: ActorRef) = { 
    actorSystem.actorOf(Props(classOf[MessageProcessor], actorA, actorB, actorC)) 
    } 

    def createConsumerSource(config: Map[String, String]) = { 
    val kafkaMBAddress = config("bootstrap-servers") 
    val groupID = config("groupId") 
    val topicSubscription = config("subscription-topic").split(',').toList 
    println(s"Subscriptiontopics $topicSubscription") 

    val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(kafkaMBAddress) 
     .withGroupId(groupID) 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
     .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true") 

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription:_*)) 
    } 

    def createConsumerSink() = { 
    Sink.foreach(println) 
    } 
}  

在这种情况下actorAactorBactorC正在做一些业务逻辑处理与数据库交互。在处理Akka Reactive Kafka消费者(如提交,错误或限制配置)时是否有任何缺失?因为查看堆转储,我可以猜测消息堆积如山。

回答

5

有一件事我会改变如下:

val future = consumerSource.map { message => 
    val m = s"${message.record.value()}" 
    messageProcessor ? m 
}.runWith(consumerSink) 

在上面的代码,您正在使用ask将消息发送到messageProcessor演员和期待答复,但为了ask发挥作用作为背压机制,您需要与mapAsync一起使用(更多信息请见documentation)。类似如下:

val future = 
    consumerSource 
    .mapAsync(parallelism = 5) { message => 
     val m = s"${message.record.value()}" 
     messageProcessor ? m 
    } 
    .runWith(consumerSink) 

根据需要调整并行度。

+0

感谢您的解决方案。保存了我的一天。 – Deepakkumar