1

我一直在关注fs2-kafka的示例。但是,我非常坚持以消费者为例。我得到的问题是 fs2.stream 和cats.effect.IO 之间的类型不匹配(下面的错误)

代码:注意:现在根据@AlexeyNovakov 的建议进行了更新,以提供一个工作示例

package pb.streams

import cats.effect.{ContextShift, Timer}

import fs2.kafka._
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.duration._

import cats.implicits._
import cats.effect.IO

object Consumer {

  implicit val ec: ExecutionContextExecutor =
    ExecutionContext.fromExecutor(new ForkJoinPool(4))

  implicit val contextShift: ContextShift[IO] = IO.contextShift(ec)
  implicit val timer: Timer[IO] = IO.timer(ec)


  def main(args: Array[String]): Unit = {
    consumeFeed()
    ()
  }

  def processRecord(record: ConsumerRecord[String, String]): IO[Unit] = {
    println(s"${record.key()} => ${record.value()}")

    IO.unit
  }

  def consumeFeed()= {


    val consumerSettings = (executionContext: ExecutionContext) ⇒
      ConsumerSettings(
        keyDeserializer   = new StringDeserializer,
        valueDeserializer = new StringDeserializer,
        executionContext  = executionContext
      )
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withBootstrapServers("localhost:9092")
        .withPollTimeout(250.milliseconds)
        .withGroupId("group")

      for {
        executionContext    ← consumerExecutionContextStream[IO]
        consumer            ← consumerStream[IO].using(consumerSettings(executionContext))
        _                   ← fs2.Stream.eval(consumer.subscribeTo("topic-inbox"))
        _                   ← consumer.stream
                              .mapAsync( 4) { message ⇒
                                processRecord(message.record)
                                  .as(message.committableOffset)
                              }
                              .groupWithin(500, 15.seconds)
                              .map(_.foldLeft(CommittableOffsetBatch.empty[IO])(_ updated _))
                              .evalMap(_.commit)
      } yield ()

  }
}

我似乎在编译时遇到的错误是:

Error:(55, 29) type mismatch;
 found   : fs2.Stream[[x]cats.effect.IO[x],Unit]
 required: cats.effect.IO[?]
        _                   ← consumer.stream

Error:(54, 29) type mismatch;
 found   : cats.effect.IO[Nothing]
 required: fs2.Stream[?,?]
        _                   ← consumer.subscribeTo("topic-inbox")

Error:(55, 9) parameter value consumer in value $anonfun is never used
        consumer            ← consumerStream[IO].using(consumerSettings(executionContext))

谁能提供任何见解来帮助我理解和修复这个神秘的错误?我尝试了各种尝试来解决这个问题,但无济于事。任何帮助将不胜感激,因为我似乎无法在谷歌上搜索类似的情况。

4

0 回答 0