1

I have some Kafka Channel hierarchy that I am using in my project:

My base trait is:

trait SendChannel[A, B] extends CommunicationChannel {
  def send(data:A): B
}

Now I have a common kafka send Channel like

trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] {
val channelProps: KafkaSendChannelProperties
val kafkaProducer: Producer[String, B]
 override def close(): Unit = kafkaProducer.close()
}

Now there are 2 variants of CommanKafkaSendChannel, one is with callback and one is with Future:

trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] {
override def send(data: A): Future[RecordMetadata] = Future {
  kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get
}
}

KafkaSendChannelWithCallback definition:

object KafkaSendChannelWithCallback {

def apply[A, B](oChannelProps: KafkaSendChannelProperties,
              oKafkaProducer: Producer[String, B],
              oCallback: Callback): KafkaSendChannelWithCallback[A, B] =
new KafkaSendChannelWithCallback[A,B] {
  override val channelProps: KafkaSendChannelProperties = oChannelProps
  override val kafkaProducer: Producer[String, B] = oKafkaProducer
  override val callback: Callback = oCallback
}
}

trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] {

  val callback: Callback

override def send(data: A): Unit =
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback)
}

Now based on the configuration value I select the proper type of channel on run time like below. I am creating actor with right type of channel which will send the data to kafka:

  val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
    error => {
      logger.error("Exception while instantiating the KafkaSendChannel")
      throw error
    },
    success => success
  )

actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME)

Actor definition:

object IngestionRouterActor {
  def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props =
Props(classOf[IngestionActor[V]], config, sendChannel)
}

The problem is when I use KafkaSendChannelWithCallback my code compiles properly however when I use KafkaSendChannelWithFuture it gives me below error on actor = declaration:

[error]IngestionActor.scala:32: pattern type is incompatible with expected type; [error] found : KafkaSendChannelWithFuture[String,V] [error] required: SendChannel[V,Unit]

As both the channel definitions are extended from SendChannel, this code should have compiled without any error. I am not sure why it is not compiling. Thanks

4

1 回答 1

1

PropsforIngestionActor需要SendChannel[V, Unit]一个. 将 a 传递KafkaSendChannelWithCallback给此参数有效,因为它是 a SendChannel[V, Unit]

另一方面,KafkaSendChannelWithFuture是一个SendChannel[V, Future[RecordMetadata]]。ASendChannel[V, Future[RecordMetadata]]不是. _ _SendChannel[V, Unit]

一种选择是重新定义Props以采用 a SendChannel[V, Any],因为Any它是Unitand的超类型Future

def props[V](config: Config, sendChannel: SendChannel[V, Any]): Props = ???

此时,编译器仍然不满意SendChannel,因为作为泛型类型,默认情况下是不变的。换句话说,既不是SendChannel[V, Unit]也不SendChannel[V, Future[RecordMetadata]]是类型SendChannel[V, Any]

要改变这一点,SendChannel请对第二个类型参数进行协变(通过+在 前面添加 a B):

trait SendChannel[A, +B] extends CommunicationChannel {
  def send(data: A): B
}
于 2017-09-25T18:22:27.137 回答