2011-10-11 65 views
3

我正在斯卡拉建立一个基于演员的服务,消费者可以查询客户端是否被授权,也可以授权客户端。斯卡拉演员:receiveWithin()没有收到消息

如果消费者查询客户端的授权状态并且该客户端尚未获得授权,则参与者应该在指定的超时时间内等待传入的Authorize消息,然后发送回复。 IsAuthorized应该能够在消费者代码中同步执行,以便阻止并等待答复。像

service !? IsAuthorized(client) => { 
    case IsAuthorizedResponse(_, authorized) => // do something 
} 

然而receiveWithin()在我的演员东西永远不会收到一个消息,并始终跑入超时。

这里是我的代码

case object WaitingForAuthorization 
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client]) 
case class IsAuthorized(client: Client) 
case class IsAuthorizedResponse(client: Client, authorized: Boolean) 
case class Authorize(client: Client) 

class ClientAuthorizationService { 
    private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client] 
    private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client] 

    def actor = Actor.actor { 
    loop { 
     react { 
     case IsAuthorized(client: Client) => reply { 
      if (authorized contains client) { 
      IsAuthorizedResponse(client, true) 
      } else { 
      waiting += client 
      var matched = false; 
      val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT) 

      while (!matched && Instant.now.isBefore(end)) { 
       // ERROR HERE: Never receives Authorize messages 
       receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) { 
       case Authorize(authorizedClient: Client) => { 
        authorizeClient(authorizedClient) 
        if (authorizedClient == client) matched = true 
       } 
       case TIMEOUT => // do nothing since we handle the timeout in the while loop 
       } 
      } 

      IsAuthorizedResponse(client, matched) 
      } 
     } 

     case Authorize(client: Client) => authorizeClient(client) 
     case WaitingForAuthorization => reply { 
      WaitingForAuthorizationResponse(immutable.Set() ++ waiting) 
     } 
     } 
    } 
    } 

    private def authorizeClient(client: Client) = synchronized { 
    authorized += client 
    waiting -= client 
    } 
} 

object ClientAuthorizationService { 
    val AUTH_TIMEOUT: Long = 60 * 1000; 
} 

当我发送Authorize消息给演员,而它在receiveWithin块由第二case语句抓住低于实际应该只捕获这些消息时没有消息当时正在等待答复。

我的代码有什么问题?

更新:

下面是相关的代码的简化版本,实际上代表了一个更简单,不同的逻辑,但也许更好地阐明了问题:

loop { 
    react { 
    case IsAuthorized(client: Client) => reply { 
     var matched = false 

     // In the "real" logic we would actually loop here until either the 
     // authorized client matches the requested client or the timeout is hit. 
     // For the sake of the demo we only take the first Authorize message. 

     receiveWithin(60*1000) { 
     // Although Authorize is send to actor it's never caught here 
     case Authorize(authorizedClient: Client) => matched = authorizedClient == client 
     case TIMEOUT => 
     } 

     IsAuthorizedResponse(client, matched) 
    } 

    case Authorize(client: Client) => // this case is hit 
    } 
} 

更新2:

我终于解决了这个问题。我认为问题在于,在前面的IsAuthorized消息的回复中尝试收到Authorize消息时,演员被阻止。

我重写了代码,以便在我们等待Authorized时启动匿名Actor。这是有兴趣的人的代码。 waitingMap[Client, Actor]

loop { 
    react { 
    case IsAuthorized(client: Client) => 
     if (authorized contains client) { 
     sender ! IsAuthorizedResponse(client, true) 
     } else { 
     val receipient = sender 
     // Start an anonymous actor that waits for an Authorize message 
     // within a given timeout and sends a reply to the consumer. 
     // The actor will be notified by the parent actor below. 
     waiting += client -> Actor.actor { 
      val cleanup =() => { 
      waiting -= client 
      exit() 
      } 

      receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) { 
      case Authorize(c) => 
       receipient ! IsAuthorizedResponse(client, true) 
       cleanup() 
      case TIMEOUT => 
       receipient ! IsAuthorizedResponse(client, false) 
       cleanup() 
      } 
     } 
     } 

    case Authorize(client: Client) => 
     authorized += client 

     waiting.get(client) match { 
     case Some(actor) => actor ! Authorize(client) 
     case None => 
     } 

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet) 
    } 
} 

如果有更好的方法来解决这个问题,请让我知道!

+0

你能清理/缩短代码并发布相关部分吗? – Jus12

+0

我已更新我的帖子。 –

+0

为什么不使用'reactWithin'? – Jus12

回答

0

我终于解决了这个问题。我认为问题在于,在前面的IsAuthorized消息的回复中尝试收到Authorize消息时,演员被阻止。

我重写了代码,以便在我们等待Authorized时启动匿名Actor。这是有兴趣的人的代码。 waitingMap[Client, Actor]

loop { 
    react { 
    case IsAuthorized(client: Client) => 
     if (authorized contains client) { 
     sender ! IsAuthorizedResponse(client, true) 
     } else { 
     val receipient = sender 
     // Start an anonymous actor that waits for an Authorize message 
     // within a given timeout and sends a reply to the consumer. 
     // The actor will be notified by the parent actor below. 
     waiting += client -> Actor.actor { 
      val cleanup =() => { 
      waiting -= client 
      exit() 
      } 

      receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) { 
      case Authorize(c) => 
       receipient ! IsAuthorizedResponse(client, true) 
       cleanup() 
      case TIMEOUT => 
       receipient ! IsAuthorizedResponse(client, false) 
       cleanup() 
      } 
     } 
     } 

    case Authorize(client: Client) => 
     authorized += client 

     waiting.get(client) match { 
     case Some(actor) => actor ! Authorize(client) 
     case None => 
     } 

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet) 
    } 
} 

如果有更好的方法来解决这个问题,请让我知道!

0

是不是回复问题?在

case IsAuthorized(client: Client) => reply { ... } 

所有的代码是在参数回复块,因此它被执行(包括在receiveWithing)之前实际被发送的答复。这意味着当你的客户将处理你的回复时,你将不再等待它。

在你的原代码,它可能应该像

case IsAuthorized(client: Client) => 
    if(ok) reply(AuthorizedReply(client, true)) 
    else { 
    reply(AuthorizedReply(client, false)) 
    receiveWithin(...) 
    } 
+0

“,因此它在执行回复之前被执行(包括receiveWithing)” 这正是我想要的:-)答复应该被阻止,直到客户端已被授权或超时。此代码受此示例[此处](http://www.scala-lang.org/node/242#SecondExample)的启发(请参阅“第二个示例”一章的第三个代码段)。 您的代码不正确,因为如果客户端尚未授权,您会立即发送带有“false”的回复,然后等待传入的授权消息。 –