2017-02-23 140 views
0

在Akka集群应用的上下文中,我遇到了一个关于Akka预期的一个属性的问题:每个(cas)类和每个使用的消息必须是可序列化的。我有以下上下文:我想要使用来自redis群集的数据,为此,我决定采用群集感知路由器池来添加节点以增加工作人员。工作人员从redis中读取数据并在mongodb中存储一些元数据。在第一个版本,我这样做:Akka集群感知路由器 - 共享redis实例到所有路由器

object MasterWorkers { 

    def props 
    ( awsBucket : String, 
    gapMinValueMicroSec : Long, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) : Props = 
    Props(MasterWorkers(awsBucket, gapMinValueMicroSec, persistentCache, mongoURI, mongoDBName, mongoCollectioName)) 

    case class JobRemove(deviceId: DeviceId, from : Timestamp, to : Timestamp) 
} 

case class MasterWorkers 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

    val workerRouter = 
    context.actorOf(FromConfig.props(Props(classOf[Worker],awsBucket,gapMinValueMicroSec, self, persistentCache, mongoURI, mongoDBName, mongoCollectioName)), 
    name = "workerRouter") 

Worker类:​​

object Worker { 

    def props 
    (
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) : Props = 
    Props(Worker(awsBucket, gapMinValueMicroSec, replyTo, persistentCache, mongoURI, mongoDBName, mongoCollectioName)) 

    case class JobDumpFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp) 
    case class JobDumpSuccess(deviceId : DeviceId, from: Timestamp, to: Timestamp) 

    case class JobRemoveFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp) 
} 

case class Worker 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

但是这引发了下面的异常,当我开始两个节点:

[info] akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer]. 
[info] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61) 
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895) 
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895) 
[info] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
[info] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:894) 
[info] at akka.remote.EndpointWriter.writeSend(Endpoint.scala:786) 
[info] at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:761) 
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:497) 
[info] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452) 
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) 
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:495) 
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) 
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
[info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[info] Caused by: java.io.NotSerializableException: akka.actor.ActorSystemImpl 

Redis的缓存是简单案例类与伴随对象实现如下接口:

​​

然后解决这个问题,我曾在工人的redisCache,我不把它给主节点​​:

case class Worker 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

// redis cache here now 
val redisCache = ... 

但是,这样的设计,每一个routee将创建的Redis的新实例缓存,它不是预期的行为。我想要的是让我的redis缓存有一个实例,然后与我的所有路由共享,但是在集群应用程序的环境中,似乎无法实现,因此我不知道这是设计失败还是某些缺失的体验与Akka。如果有人遇到类似的问题,我乐意提出建议!

回答

0

问题是你的RedisCache不是那么简单。它带有一个ActorSystem,它不能被序列化。

我想这是因为它包含RedisClient例如 - - rediscala库,这些需要ActorSystem

您需要从actor系统中抽象出来,并且只传递给您的工作人员Redis集群的裸露细节(即RedisServer对象)。

然后工人将自己实例化RedisClient - 使用他们的context.system

case class Worker 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    redisMaster: RedisServer, 
    redisSlaves: Seq[RedisServer], 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

    val masterSlaveClient = ??? //create from the RedisServer details 

} 

这将允许每个工作人员与redis集群建立自己的连接。

或者,如果您只想在主人中连接一次并与工作人员分享连接,则需要传递嵌入连接的RedisClientActorsource)。这是一个ActorRef,可以远程共享。

这可以通过致电client.redisConnection获得。

工人则可以围绕它建立一个ActorRequest,例如

case class Worker 
    (
     awsBucket : String, 
     gapMinValueMicroSec : Long, 
     replyTo : ActorRef, 
     redisConnection: ActorRef, 
     mongoURI : String, 
     mongoDBName : String, 
     mongoCollectioName : String 
    ) extends Actor with ActorLogging with ActorRequest { 

     // you will need to implement the execution context that ActorRequest needs as well.. 

     send(redisCommand) 

    } 
+0

关于本解决方案,这意味着每个工人都会实例化一个Redis的客户端? – alifirat

+0

是的。我刚刚添加了一个涉及共享一个连接的方法,如果这是您感兴趣的内容。 –

+0

是的,我没有想到替代。我会试一试,让你知道! – alifirat

相关问题