2012-04-10 63 views
6

我建模Scala和阿卡一个简单的P2P:阿卡演员“问”和“等待”与TimeoutException异常

class Node() extends Peer with Actor { 

    var peers: List[ActorRef] = List() 

    def receive = { 
    case _register(peer: ActorRef, p: Option[Int]) => { 
     println("registering [" + peer + "] for [" + this + "]") 
     peers = peer :: peers 
    } 
    } 

} 

sealed case class _register(val peer: ActorRef, var p: Option[Int] = None) 

,然后一个简单的网络:

class Network() extends Actor { 

    def this(name: String) = { 

    this() 

    val system = ActorSystem(name) 

    val s1 = system.actorOf(Props(new Node()), name = "s1") 
    val s2 = system.actorOf(Props(new Node()), name = "s2") 

    val c1 = system.actorOf(Props(new Node()), name = "c1") 
    val c2 = system.actorOf(Props(new Node()), name = "c2") 
    val c3 = system.actorOf(Props(new Node()), name = "c3") 
    val c4 = system.actorOf(Props(new Node()), name = "c4") 

    implicit val timeout = Timeout(5 second) 

    s1 ? _register(c1) 
    s1 ? _register(c2) 
    s1 ? _register(c3) 
    val lastRegistered = s2 ? _register(c4) 
    Await.ready(lastRegistered, timeout.duration) 

    println("initialized nodes") 
    } 
} 

的输出,我“M得到的是总是这样的:

registering [Actor[akka://p2p/user/c1]] for [[email protected]] 
registering [Actor[akka://p2p/user/c2]] for [[email protected]] 
registering [Actor[akka://p2p/user/c3]] for [[email protected]] 
registering [Actor[akka://p2p/user/c4]] for [[email protected]] 
[ERROR] [04/10/2012 22:07:04.34] [main-akka.actor.default-dispatcher-1] [akka://main/user/p2p] error while creating actor 
java.util.concurrent.TimeoutException: Futures timed out after [5000] milliseconds 
    at akka.dispatch.DefaultPromise.ready(Future.scala:834) 
    at akka.dispatch.DefaultPromise.ready(Future.scala:811) 
    at akka.dispatch.Await$.ready(Future.scala:64) 
    at nl.cwi.crisp.examples.p2p.scala.Network.<init>(Node.scala:136) 
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164) 
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164) 
    at akka.actor.ActorCell.newActor(ActorCell.scala:488) 
    at akka.actor.ActorCell.create$1(ActorCell.scala:506) 
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:591) 
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:191) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:160) 
    at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505) 
    at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259) 
    at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:997) 
    at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1495) 
    at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) 

我遵循的文档Akka参考文档中的。交换Await.readyAwait.result不起任何作用。该日志显示上次注册已成功。

我该如何解决这个问题?

回答

8

您正在等待节点actor返回的消息,但节点actor不会将消息发送回sender actorRef,因此s1 ? _register创建的Future [Any]将永远不会收到响应,所以未来永远不会完成。您可以从节点receive方法中添加sender ! something来发送响应,我不确定在这种情况下something是什么意思。

+1

谢谢,这是一个马虎的错误! '发件人!没有'会为我工作。 – nobeh 2012-04-10 20:41:09

8

炖的这样做是正确的,但你有一些令人担忧的代码在你的网络中演员:

val system = ActorSystem(name) 

val s1 = system.actorOf(Props(new Node()), name = "s1") 
val s2 = system.actorOf(Props(new Node()), name = "s2") 

val c1 = system.actorOf(Props(new Node()), name = "c1") 
val c2 = system.actorOf(Props(new Node()), name = "c2") 
val c3 = system.actorOf(Props(new Node()), name = "c3") 
val c4 = system.actorOf(Props(new Node()), name = "c4") 

你为什么要创建一个新的ActorSystem,为什么是你创建的演员系统内的顶级演员?

如果您需要访问演员的系统,你简单的调用:

context.system 

而且你应该避免创建顶级演员“只是因为”对,你不应该弄乱根同样的原因通过将您的所有文件放置在您的文件系统中。 要建立儿童演员到网络,只是做:

context.actorOf(...) 

现在你只要你创建在同一个系统中有多个网络,演员有一个问题,因为它会尝试创建顶部同名的演员。

+1

感谢您提供清晰的提示。 – nobeh 2012-04-11 09:41:58

+1

非常欢迎你。快乐hkk! – 2012-04-11 11:08:26