1

I'm trying to run some integration tests for a data stream using an embedded kafka cluster. When executing all the tests in a different environment than my local, the tests are failing due to some internal state that's not removed properly.

I can get the all the tests running on the non-local environment when I start/stop the kafka cluster before/after each test but I only want to start and stop the cluster once, at the beginning and at the end of the execution of my suite of tests.

I tried to remove the local streams state but that didn't seem to work:

override protected def afterEach(): Unit = KStreamTestUtils.purgeLocalStreamsState(properties)

Is there a way to get my suit of tests running without having to start/stop cluster each time?

Right below there are the relevant classes.

class TweetStreamProcessorSpec extends FeatureSpec
  with MockFactory with GivenWhenThen with Eventually with BeforeAndAfterEach with BeforeAndAfterAll {

  val CLUSTER: EmbeddedKafkaCluster = new EmbeddedKafkaCluster
  val TEST_TOPIC: String = "test_topic"
  val properties = new Properties()

  override def beforeAll(): Unit = {
    CLUSTER.start()
    CLUSTER.createTopic(TEST_TOPIC, 1, 1)
  }

  override def afterAll(): Unit = CLUSTER.stop()

  // if uncommenting these lines tests works
  // override def afterEach(): Unit = CLUSTER.stop() 
  // override protected def beforeEach(): Unit = CLUSTER.start()

  def createProducer: KafkaProducer[String, TweetEvent] = {
    val properties = Map(
      KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
      VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ReflectAvroSerializer[TweetEvent]].getName,
      BOOTSTRAP_SERVERS_CONFIG -> CLUSTER.bootstrapServers(),
      SCHEMA_REGISTRY_URL_CONFIG -> CLUSTER.schemaRegistryUrlForcedToLocalhost()
    )
    new KafkaProducer[String, TweetEvent](properties)
  }

  def kafkaConsumerSettings: KafkaConfig = {
    val bootstrapServers = CLUSTER.bootstrapServers()
    val schemaRegistryUrl = CLUSTER.schemaRegistryUrlForcedToLocalhost()
    val zookeeper = CLUSTER.zookeeperConnect()

    KafkaConfig(
      ConfigFactory.parseString(
        s"""
        akka.kafka.bootstrap.servers = "$bootstrapServers"
        akka.kafka.schema.registry.url = "$schemaRegistryUrl"
        akka.kafka.zookeeper.servers = "$zookeeper"
        akka.kafka.topic-name = "$TEST_TOPIC"
        akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
        akka.kafka.consumer.kafka-clients.value.deserializer = ${classOf[ReflectAvroDeserializer[TweetEvent]].getName}
        akka.kafka.consumer.kafka-clients.client.id = client1
        akka.kafka.consumer.wakeup-timeout=20s
        akka.kafka.consumer.max-wakeups=10
      """).withFallback(ConfigFactory.load()).getConfig("akka.kafka")
    )
  }

  feature("Logging tweet data from kafka topic") {

    scenario("log id and payload when consuming a update tweet event") {
      publishEventsToKafka(List(upTweetEvent))
      val logger = Mockito.mock(classOf[Logger])
      val pipeline = new TweetStreamProcessor(kafkaConsumerSettings, logger)
      pipeline.start
      eventually(timeout(Span(5, Seconds))) {
        Mockito.verify(logger, Mockito.times(1)).info(s"updating tweet uuid=${upTweetEvent.getUuid}, payload=${upTweetEvent.getPayload}")
      }
      pipeline.stop
    }

    scenario("log id when consuming a delete tweet event") {
      publishEventsToKafka(List(delTweetEvent))
      val logger = Mockito.mock(classOf[Logger])
      val pipeline = new TweetStreamProcessor(kafkaConsumerSettings, logger)
      pipeline.start
      eventually(timeout(Span(5, Seconds))) {
        Mockito.verify(logger, Mockito.times(1)).info(s"deleting tweet uuid=${delTweetEvent.getUuid}")
      }
      pipeline.stop
    }
  }
}


class TweetStreamProcessor(kafkaConfig: KafkaConfig, logger: Logger)
  extends Lifecycle with TweetStreamProcessor with Logging {

  private var control: Control = _
  private val valueDeserializer: Option[Deserializer[TweetEvent]] = None

  // ...

  def tweetsSource(implicit mat: Materializer): Source[CommittableMessage[String, TweetEvent], Control] =
    Consumer.committableSource(tweetConsumerSettings, Subscriptions.topics(kafkaConfig.topicName))

  override def start: Future[Unit] = {
    control = tweetsSource(materializer)
      .mapAsync(1) { msg =>
        logTweetEvent(msg.record.value())
          .map(_ => msg.committableOffset)
      }.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
      .mapAsync(3)(_.commitScaladsl())
      .to(Sink.ignore)
      .run()

    Future.successful()
  }


  override def stop: Future[Unit] = {
    control.shutdown()
      .map(_ => Unit)
  }
}

Any help over this would be much appreciated? Thanks in advance.

4

0 回答 0