我有,我用我的项目的一些卡夫卡频道层次:斯卡拉特质类型不匹配
我的基本特点是:
trait SendChannel[A, B] extends CommunicationChannel {
def send(data:A): B
}
现在我有一个共同的卡夫卡发送通道像
trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] {
val channelProps: KafkaSendChannelProperties
val kafkaProducer: Producer[String, B]
override def close(): Unit = kafkaProducer.close()
}
现在有2个CommanKafkaSendChannel变体,一个是回调,一个是Future:
trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] {
override def send(data: A): Future[RecordMetadata] = Future {
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get
}
}
KafkaSendChannelWithCallback
定义:
object KafkaSendChannelWithCallback {
def apply[A, B](oChannelProps: KafkaSendChannelProperties,
oKafkaProducer: Producer[String, B],
oCallback: Callback): KafkaSendChannelWithCallback[A, B] =
new KafkaSendChannelWithCallback[A,B] {
override val channelProps: KafkaSendChannelProperties = oChannelProps
override val kafkaProducer: Producer[String, B] = oKafkaProducer
override val callback: Callback = oCallback
}
}
trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] {
val callback: Callback
override def send(data: A): Unit =
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback)
}
现在基于所述配置值I选择通道的上下面等运行时适当的类型。我创建右信道类型的演员,将数据发送到卡夫卡:
val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
error => {
logger.error("Exception while instantiating the KafkaSendChannel")
throw error
},
success => success
)
actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME)
演员的定义:
object IngestionRouterActor {
def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props =
Props(classOf[IngestionActor[V]], config, sendChannel)
}
问题是,当我使用KafkaSendChannelWithCallback
我的代码正常编译但是当我使用KafkaSendChannelWithFuture
它给我下面的错误actor =
声明:
[error] IngestionActor.scala:32:模式类型与预期类型不兼容; [错误]实测值:KafkaSendChannelWithFuture [字符串,V] [错误]需要:SendChannel [V,单位]
由于两个信道定义是从SendChannel
延长,此代码应编译没有任何错误。我不确定它为什么不编译。谢谢
嗨@chunjef,感谢您的答复。你提到'因为任何是单元和未来的超类型,这是尝试为'任何'是超类型,但为什么SendChannel [V,单元]和SendChannel [V,未来[记录元数据]]的类型都是SendChannel [V,任何]'这是错误的? – Explorer
我做了你所建议的修改,是否有任何需要修改'props'仍然有'SendChannel [V,Unit]'因为我仍然得到相同的错误。 – Explorer
@Explorer:请仔细阅读我的回答。我在回答中解答了你的两条评论。 – chunjef