2017-05-04 69 views
1

我想创建一个使用远程actor的示例Akka应用程序。目标是创建例如16个以顺序方式交换消息的演员(演员16与演员15,15到14等会谈,并且1与演员16会谈)。但是,我在通信方面遇到了问题,因为我连续出现此错误。使用Akka创建消息环的死信

[INFO] [2017年5月4日15:45:58.248] [ActorFlasks-akka.actor.default-调度-4] [阿卡:// ActorFlasks/deadLetters]消息[java.lang中。[String] from Actor [akka:// ActorFlasks/user/16#-2022012132] to Actor [akka:// ActorFlasks/deadLetters] was not delivery。 [1]遇到遇难信 。

为此,我运行应用程序的16个终端实例,并始终使用不同的配置文件。我创建在每种情况下actorsystem像这样:

object Main extends App { 

    val localId = args(0) 

    val configFile = getClass.getClassLoader.getResource(s"application$localId.conf").getFile 
    val config = ConfigFactory.parseFile(new File(configFile)) 
    val system = ActorSystem("ActorFlasks" , config) 
    val remote = system.actorOf(Props[CyclonManager], name=localId) 

    remote ! "START" 
} 

配置文件的一个例子是这样的:

akka { 
    actor { 
    provider = remote 
    } 
    remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
     hostname = "localhost" 
     port = 50001 
    } 
} 
} 

和演员的定义是这样的:

class CyclonManager extends Actor { 

    def propagateMessage(): Unit = { 
    val localId = self.path.name.toInt 
    val currentPort = 50000 + localId 
    val nextHopPort = if (currentPort == 50001) 50016 else currentPort - 1 
    val nextHopId = localId-1 

    val nextHopRef = context.actorSelection(s"akka.tcp://[email protected]:$nextHopPort/user/$nextHopId") 

    nextHopRef ! "NEXT" 
    } 

    override def receive: Receive = { 
    case "START" => 
     if (self.path.name == "16") { 
     propagateMessage() 
     } 
    case "NEXT" => 
     propagateMessage() 
    case _ => 
     println("Unrecognized message") 
    } 
} 

这是一个让我开始的简单例子,但无论我尝试什么,我都无法实现它。有人知道我失败的地方吗?

谢谢你在前进,

编辑:

akka { 
    actor { 
    provider = "akka.remote.RemoteActorRefProvider" 
    } 
    remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
     hostname = "localhost" 
     port = 50015 
    } 
} 
} 
+0

你确定演员#15存在于端口50015吗? – Josef

+0

除非我对如何创建演员和演员系统有错误的理解,是的,我敢肯定。 – PablodeAcero

回答

1

重建并运行你的例子后,我发现在propagateMessage功能一个错误。

val nextHopId = localId-1 

应该

val nextHopId = if (currentPort == 50001) 16 else localId-1 

如果不解决您的问题,请尝试运行我的快速和肮脏的,但工作代码,看看是如何从你这不同:https://gist.github.com/grantzvolsky/4a53ce78610038a9d44788d7151dc416

在我代码我只使用角色14,15和16.你可以运行每个使用sbt "run 16"等。

+0

是的,我只是在提交问题后才看到这个错误,但修复它并没有帮助。我将我的代码更改为您的代码,但它仍然无法工作......问题可能与我的本地主机配置有关吗?或者使用我正在使用的akka​​版本(2.5.0) – PablodeAcero

+0

我添加了'akka-remote'依赖项,它位于我的build.sbt的最后一行。从理论上讲,你的防火墙可能会阻止通信,但似乎不太可能。你会提供所有三个节点的全部输出吗?这里是我的:(注意我首先启动节点14,然后是15,然后是16):http://imgur.com/a/l53pV – Josef

+0

嗨!我试过你的代码和配置。节点14和15具有正常输出。但是,节点16给我以下内容:http://imgur.com/a/tZi6T – PablodeAcero