2017-02-22 57 views
1

我在conf文件中有以下条目。但我不知道这是否调度程序设置被拾起,什么是正在使用如何检查akka应用程序中配置了哪个调度程序

 akka{ 
     actor{ 
     default-dispatcher { 
       type = Dispatcher 
       executor = "fork-join-executor" 
       throughput = 3 
       fork-join-executor { 
       parallelism-min = 40 
       parallelism-factor = 10 
       parallelism-max = 100 
       } 
      } 
     } 
    } 

终极并行数值我已经8核的机器,所以我希望并行线程是处于就绪状态 40分钟(8×10因子)< 100max。我想看看akka使用最大并行线程的价值。 我创建了45个子actor,在我的日志中,我正在打印线程ID [application-akka.actor.default-dispatcher-xx],并且看不到超过20个线程并行运行。

回答

0

假设您有一个ActorSystem实例,您可以检查在其配置中设置的值。这是你可以如何得到你在配置文件中设置的值:

val system = ActorSystem() 
val config = system.settings.config.getConfig("akka.actor.default-dispatcher") 
config.getString("type") 
config.getString("executor") 
config.getString("throughput") 
config.getInt("fork-join-executor.parallelism-min") 
config.getInt("fork-join-executor.parallelism-max") 
config.getDouble("fork-join-executor.parallelism-factor") 

我希望这有助于。您也可以参考this页面了解有关特定配置设置的更多详细信息。

更新

我已经挖了一点在阿卡找出到底是什么,它使用了您的设置。正如你可能已经预计它使用ForkJoinPool。用于构建它的并行计算公式如下:

object ThreadPoolConfig { 
    ... 
    def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int = 
math.min(math.max((Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt, floor), ceiling) 
    ... 
} 

该功能用于在某个点建立一个ForkJoinExecutorServiceFactory

new ForkJoinExecutorServiceFactory(
    validate(tf), 
    ThreadPoolConfig.scaledPoolSize(
    config.getInt("parallelism-min"), 
    config.getDouble("parallelism-factor"), 
    config.getInt("parallelism-max")), 
    asyncMode) 

无论如何,这是将要使用的并行创建ForkJoinPool,这实际上是java.lang.ForkJoinPool的一个实例。现在我们必须问这个池有多少个线程?简单的答案是,只有在需要时才会使用整个容量(在我们的案例中为80个)。

为了说明这种情况,我已经在actor中使用了Thread.sleep的各种用法,进行了一些测试。我发现的是,它可以在大约10个线程(如果没有进行睡眠呼叫)的某处使用到最多80个线程(如果我叫睡1秒)。测试是在一台8芯机器上进行的。

综上所述,您需要检查Akka使用的实现,以确切了解如何使用这种并行性,这就是我查看ForkJoinPool的原因。除了看配置文件,然后检查特定的实现,我不认为你可以做不幸:(

我希望这个澄清答案 - 最初我以为你想看看如何配置演员系统的调度。

+0

是的,这会给出由akka系统从配置读取的值。不过,我很想知道是否有办法知道akka调度员最终从3个字段中选择的并行度 - min,factor和max。 – Sree

+0

我在运行一些测试后更新了响应。另外,@Stefano Bonetti是对的 - 为了最大限度地发挥所有演员需要同时工作的并行性:) –

0

为了最大化并行性因子,所有参与者都需要同时处理一些消息。您确定这是您的应用程序中的情况吗?

就拿下面的代码

object Test extends App { 

    val system = ActorSystem() 

    (1 to 80).foreach{ _ => 
    val ref = system.actorOf(Props[Sleeper]) 
    ref ! "hello" 
    } 
} 

class Sleeper extends Actor { 
    override def receive: Receive = { 
    case msg => 
     //Thread.sleep(60000) 
     println(msg) 
    } 
} 

如果你认为你的配置和8个内核,你会看到正在产生的线程少量(4,5?)作为消息的处理过快速建立一些真正的并行性。

相反,如果你把你的演员CPU忙取消对讨厌Thread.sleep,你会看到线程数会一下子提高到80。然而,这只会持续1分钟,在此之后,线程将被逐渐被从游泳池退休。

我想主要的诀窍是:不要想到每个演员都在一个单独的线程上运行。每当一个或多个消息出现在调度员醒来的参与者邮箱上,并且确实将消息处理任务分派到指定的池中。

+0

我有相当类似的代码。我在for循环中产生了45名演员。根据我的日志,在儿童演员中完成计算大约需要1到1.5秒。我希望能有一个线程派发给每个孩子演员并行。我想这不会发生。任何方式来执行它? – Sree

+0

如果没有足够的任务,则无法强制执行45个正在生成的线程。机会是,在你的情况下没有。没有看到你的代码,很难说。 –

相关问题