2016-04-22 61 views
2

为什么线数继续增加?Akka的线程数量不断增加。什么可能是错的?

查看此图片的右下角

Thread count keeps increasing

整个流程是这样的:

Akka HTTP Server API 
    -> on http request, sendMessageTo DataProcessingActor 
     -> sendMessageTo StorageActor 
      -> sendMessageTo DataBaseActor 
      -> sendMessageTo IndexActor 

这是阿卡HTTP API的定义(在伪代码):

Main { 
    path("input/") { 
    post { 
     dataProcessingActor forward message 
    } 
    } 
} 

以下是在角色定义(在伪代码中):

DataProcessingActor { 
    case message => 
    message = parse message 
    storageActor ! message 
} 


StorageActor { 
    case message => 
    indexActor ! message 
    databaseActor ! message 
} 


DataBaseActor { 
    case message => 
    val c = get monogCollection 
    c.store(message) 
} 

IndexActor { 
    case message => 
    elasticSearch.index(message) 
} 

后,我运行此安装程序,并在发送多个HTTP requsts为 “输入/” HTTP终结,我得到的错误:

for(i <- 0 until 1000000) { 
    post("input/", someMessage+i) 
} 

错误:

[ERROR] [04/22/2016 13:20:54.016] [Main-akka.actor.default-dispatcher-15] [akka.tcp://[email protected]:2558/system/IO-TCP/selectors/$a/0] Accept error: could not accept new connection 
java.io.IOException: Too many open files 
    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) 
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) 
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) 
    at akka.io.TcpListener.acceptAllPending(TcpListener.scala:107) 
    at akka.io.TcpListener$$anonfun$bound$1.applyOrElse(TcpListener.scala:82) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480) 
    at akka.io.TcpListener.aroundReceive(TcpListener.scala:32) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:495) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

编辑1

这里是使用的application.conf文件:

akka { 
    loglevel = "INFO" 
    stdout-loglevel = "INFO" 
    logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 

    actor { 
    default-dispatcher { 
     throughput = 10 
    } 
    } 

    actor { 
    provider = "akka.remote.RemoteActorRefProvider" 
    } 

    remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
     hostname = "127.0.0.1" 
     port = 2558 
    } 
    } 
} 
+0

可能是蒙戈驱动 – Haspemulator

+0

只有正在使用一个蒙戈数据库连接,并且它是在一个阶对象'对象DB {lazy val db = mongodb.connect()}'。基本上,在一个演员内部没有新的连接。新连接仅在首次初始化DB对象时生成,并引用'db'。它认为它不能是MongoDB连接。我错过了什么吗? – tuxdna

+0

我会附加一个调试器/分析器,看看那些成千上万的线程正在做什么。 – Haspemulator

回答

0

我发现ElasticSearch是个问题。我正在使用Java API for ElasticSearch,并且因为它从Java API使用的方式而泄露套接字。现在按照此处所述解决。

下面是使用Java API

trait ESClient { def getClient(): Client } 

case class ElasticSearchService() extends ESClient { 
    def getClient(): Client = { 
    val client = new TransportClient().addTransportAddress(
     new InetSocketTransportAddress(Config.ES_HOST, Config.ES_PORT) 
    ) 
    client 
    } 
} 

这是这是造成泄漏的演员弹性搜索客户服务:

class IndexerActor() extends Actor { 

    val elasticSearchSvc = new ElasticSearchService() 
    lazy val client = elasticSearchSvc.getClient() 

    override def preStart = { 
    // initialize index, and mappings etc. 
    } 

    def receive() = { 
    case message => 
     // do indexing here 
     indexMessage(ES.client, message) 
    } 
} 

注意:每次创建一个主角实例,正在建立新的连接。

每次调用new ElasticSearchService()都会创建一个到ElasticSearch的新连接。我动议到一个单独的对象,如下所示,也演员使用该对象,而不是:

object ES { 
    val elasticSearchSvc = new ElasticSearchService() 
    lazy val client = elasticSearchSvc.getClient() 
} 


class IndexerActor() extends Actor { 

    override def preStart = { 
    // initialize index, and mappings etc. 
    } 

    def receive() = { 
    case message => 
     // do indexing here 
     indexMessage(ES.client, message) 
    } 
} 
相关问题