2017-10-15 117 views
0

出于某种原因,我必须同时使用gRPC和Akka。当这个演员成为顶级演员时,没有任何问题(在这个小演示中)。但是,当它成为一个儿童演员,也没有收到任何消息,并且以下记录:ExecutionContext导致Akka死信

[default-akka.actor.default-dispatcher-6] [akka://default/user/Grpc] Message [AkkaMessage.package$GlobalStart] from Actor[akka://default/user/TrackerCore#-808631363] to Actor[akka://default/user/Grpc#-1834173068] was not delivered. [1] dead letters encountered. 

的例子核心:

class GrpcActor() extends Actor { 
    val ec = scala.concurrent.ExecutionContext.global 
    val service = grpcService.bindService(new GrpcServerImpl(), ec) 
    override def receive: Receive = { 
     case GlobalStart() => { 
      println("GlobalStart") 
     } 
     ... 
    } 
} 

我试图创建一个新的ExecutionContext,如:

scala.concurrent.ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) 

为什么会发生这种情况,以及如何调试像这样的死信问题(不会引发异常)?

更新:

对不起,我没有在这里列出一切。我使用普通的Main方法来测试GrpcActor作为顶级演员,而ScalaTest将其测试为小孩演员,这是一个错误。

class GrpcActorTest extends FlatSpec with Matchers{ 
    implicit val system = ActorSystem() 
    val actor: ActorRef = system.actorOf(Props[GrpcActor]) 
    actor ! GlobalStart() 
} 

这是一个空的测试套件,主动关闭整个actor系统。但问题出在这条线上

val service = grpcService.bindService(new GrpcServerImpl(), ec) 

GlobalStart()的交付延迟后关闭。

没有该行,消息可以在关闭之前传递。

这是正常行为吗?

(我的猜测:它发生的GlobalStart()与该行,其中做了一些繁重的工作和取得的时间差的停产消息后排队)

+0

感谢@chunjef纠正我的语法错误,并完善这个问题。 – Skye347

回答

0

一种方法来解决这个问题是让service一个lazy val

class GrpcActor extends Actor { 
    ... 
    lazy val service = grpcService.bindService(new GrpcServerImpl(), ec) 
    ... 
} 

一个lazy val是有用的长期运行的操作:在在这种情况下,它首先推迟service的初始化,直到它被首次使用。如果没有lazy修改器,则在创建actor时初始化service

另一种方法是在您的测试,以防止演员系统从演员之前关停添加Thread.sleep完全初始化:

class GrpcActorTest extends FlatSpec with Matchers { 
    ... 
    actor ! GlobalStart() 
    Thread.sleep(5000) // or whatever length of time is needed to initialize the actor 
} 

(作为一个方面说明,考虑使用阿卡Testkit您演员测试。)

0

添加主管战略,其母公司,添加的println到演员生命周期。有什么会杀死你的演员。最后,如果你提供一个完整的例子也许我可以多说:)