2017-06-21 100 views
0

我正在使用分布式算法,并决定使用Akka在机器间进行缩放。机器需要非常频繁地交换消息,并且这些消息引用每台机器上存在的一些不可变对象。因此,从消息中共享的,复制的对象不应被序列化的意义上来说,“压缩”消息似乎是明智的。这不仅可以节省网络带宽,而且还可以避免在消息反序列化时在接收端创建重复对象。串行器是否是从Akka消息中移除共享状态的正确位置?

现在,我的问题是如何正确地做到这一点。到目前为止,我能想到的两个选项:

  1. 处理这种情况“业务层”上,即,将我原来的消息对象,以通过一些符号引用替换为共享,复制的对象的引用有一定的参考对象。然后,我会发送这些参考对象而不是原始消息。把它想象成用URL替换一些实际的网络资源。这样做在编码方面似乎相当直接,但它也将串行化问题拖入实际的业务逻辑。

  2. 编写自定义序列化程序,了解共享的,复制的对象。就我而言,这个解决方案可以通过序列化器将复制的共享对象作为全局状态引入参与者系统。但是,Akka文档没有描述如何以编程方式添加自定义序列化器,这对于使用序列化器编织共享对象来说是必需的。此外,我可以想象,有几个原因,为什么这样的解决方案会受到阻碍。所以,我在这里问。

非常感谢!

+0

该文档说明了如何编写自定义序列化器/反序列化器(http://doc.akka.io/docs/akka/current/java/serialization.html#customization)。去与方法2 imho。你可以通过使用静态东西“编程”来实现 –

+0

谢谢!我遇到了[SerializationSetup](http://doc.akka.io/api/akka/current/akka/serialization/SerializationSetup$.html)对象,我认为这是“以编程方式”添加串行器的正确方式在运行时。但是,在文档中找不到那个。 –

+0

我是否应该在答案中转换评论,并且关闭该问题或者是否需要更多信息? –

回答

1

它可以编写你自己的定制序列,让他们做各种奇怪的事情,那么你就可以在配置水平和往常一样将它们绑定:

class MyOwnSerializer extends Serializer { 

    // If you need logging here, introduce a constructor that takes an ExtendedActorSystem. 
    // class MyOwnSerializer(actorSystem: ExtendedActorSystem) extends Serializer 
    // Get a logger using: 
    // private val logger = Logging(actorSystem, this) 

    // This is whether "fromBinary" requires a "clazz" or not 
    def includeManifest: Boolean = true 

    // Pick a unique identifier for your Serializer, 
    // you've got a couple of billions to choose from, 
    // 0 - 40 is reserved by Akka itself 
    def identifier = 1234567 

    // "toBinary" serializes the given object to an Array of Bytes 
    def toBinary(obj: AnyRef): Array[Byte] = { 
    // Put the code that serializes the object here 
    //#... 
    Array[Byte]() 
    //#... 
    } 

    // "fromBinary" deserializes the given array, 
    // using the type hint (if any, see "includeManifest" above) 
    def fromBinary(
    bytes: Array[Byte], 
    clazz: Option[Class[_]]): AnyRef = { 
    // Put your code that deserializes here 
    //#... 
    null 
    //#... 
    } 
} 

但是这提出了一个重要的问题:如果你的消息都引用了已经在机器上共享的数据,为什么你要在消息中放入指向该对象的指针(非常糟糕!消息应该是不可变的,而指针不是!),而不是某些一些不可变的字符串objectId(有点你的选择1)?这是一个更好的选择,当谈到保护消息的不变性,并且在你的业务逻辑变化不大(只是把一个包装过的共享状态储存单元)

更多信息,请参阅the documentation

0

我终于与Diego提出的解决方案一起使用,并希望分享我的推理和解决方案的更多细节。

首先,我也赞成方案1(处理邮件的“压实”在业务层)对于这些原因的:

  1. 序列化是全局的演员系统。让它们成为有状态实际上是对阿卡非常哲学的最严重的违反,因为它违背了演员中行为和状态的封装。
  2. 无论如何,串行器必须先前创建(即使以“编程方式”添加它们)。
  3. 在设计方面,人们可能会争辩说:“消息压缩并不是串行器的责任。严格意义上说,序列化仅仅是将特定于运行时的数据转换为紧凑,可交换的表示形式。不过,更改串行化内容不是串行化程序的任务。

我已经解决了这个问题,我仍然努力在演员中明确区分“信息压缩”和实际业务逻辑。我想出了一个在斯卡拉这样做的简单方法,我想在此分享。其基本思想是让消息本身看起来像一个普通的案例类,但仍然允许这些消息“自我约束”。下面是一个抽象的例子:

class Sender extends ActorRef { 
    def context: SharedContext = ... // This is the shared data present on every node. 

    // ... 

    def someBusinessLogic(receiver: ActorRef) { 
    val someData = computeData 
    receiver ! MyMessage(someData) 
    } 
} 

class Receiver extends ActorRef { 
    implicit def context: SharedContext = ... // This is the shared data present on every node. 

    def receiver = { 
    case MyMessage(someData) => 
     // ... 
    } 
} 

object Receiver { 
    object MyMessage { 
    def apply(someData: SomeData) = MyCompactMessage(someData: SomeData) 
    def unapply(myCompactMessage: MyCompactMessage)(implicit context: SharedContext) 
    : Option[SomeData] = 
     Some(myCompactMessage.someData(context)) 
    } 
} 

正如你所看到的,发送者和接收者的代码感觉就像使用案例类,事实上,MyMessage可能是一个案例类。 但是,通过手动实现applyunapply,可以插入其自己的“压缩”逻辑,并隐式注入执行“取消整理”所需的共享数据,而不触及发送方和接收方。为了定义MyCompactMessage,我发现Protocol Buffers特别适合,因为它已经是Akka的依赖关系,并且在空间和计算方面都很高效,但是其他任何解决方案都可以。

相关问题