10

我们有一个消息调度程序,它在将消息属性放入带有该键的 Kafka 主题队列之前从消息属性生成一个哈希键。

这样做是出于重复数据删除的目的。但是,我不确定如何在不实际设置本地集群并检查它是否按预期执行的情况下测试这种重复数据删除。

在线搜索用于模拟 Kafka 主题队列的工具并没有帮助,我担心我可能以错误的方式思考这个问题。

最终,任何用于模拟 Kafka 队列的行为都应该与本地集群的行为方式相同 - 即通过向主题队列插入密钥来提供重复数据删除。

有没有这样的工具?

4

3 回答 3

3

如果您需要验证 Kafka 特定功能,或使用 Kafka 特定功能实现,那么唯一的方法就是使用 Kafka!

Kafka 是否对其重复数据删除逻辑进行了任何测试?如果是这样,以下组合可能足以减轻您的组织感知到的失败风险:

  • 哈希逻辑的单元测试(确保相同的对象确实生成相同的哈希)
  • Kafka 主题重复数据删除测试(Kafka 项目内部)
  • 飞行前烟雾测试验证您的应用程序与 Kafka 的集成

如果 Kafka 没有围绕其主题重复数据删除进行任何类型的测试,或者您担心破坏性更改,那么围绕 Kafka 特定功能进行自动检查很重要。这可以通过集成测试来完成。我最近在基于 Docker 的集成测试管道上取得了很大的成功。在创建 Kafka docker 镜像(社区可能已经提供了一个)的初步工作之后,设置集成测试管道变得微不足道。管道可能如下所示:

  • 执行基于应用程序的单元测试(哈希逻辑)
  • 一旦通过,您的 CI 服务器就会启动 Kafka
  • 执行集成测试,验证重复写入仅向主题发出一条消息。

我认为重要的是确保将 Kafka 集成测试最小化为仅包含绝对依赖于 Kafka 特定功能的测试。即使使用 docker-compose,它们也可能比单元测试慢几个数量级,~1 毫秒 vs 1 秒?要考虑的另一件事是维护集成管道的开销可能值得冒险相信 Kakfa 将提供它声称的主题重复数据删除。

于 2016-10-31T12:36:20.323 回答
0

为了在 Python单元测试下使用 SBT测试任务模拟Kafka,我做了如下操作。应该安装Pyspark 。

build.sbt中定义应该与测试一起运行的任务:

val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")

val command = "python3 -m unittest app_test.py"
val workingDirectory = new File("./project/src/main/python")

testPythonTask := {
  val s: TaskStreams = streams.value
  s.log.info("Executing task testPython")
  Process(command,
    workingDirectory,
    // arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
    "PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
      // collect all jar paths from project
      .format((fullClasspath in Runtime value)
      .map(_.data.getCanonicalPath)
        .filter(_.contains(".jar"))
        .mkString(",")),
    "PYSPARK_PYTHON" -> "python3") ! s.log
}

//attach custom test task to default test tasks
test in Test := {
  testPythonTask.value
  (test in Test).value
}

testOnly in Test := {
  testPythonTask.value
  (testOnly in Test).value
}

python 测试用例(app_test.py)中:

import random
import unittest
from itertools import chain

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.tests import PySparkStreamingTestCase

class KafkaStreamTests(PySparkStreamingTestCase):
    timeout = 20  # seconds
    duration = 1

    def setUp(self):
        super(KafkaStreamTests, self).setUp()

        kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
            .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
        self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
        self._kafkaTestUtils.setup()

    def tearDown(self):
        if self._kafkaTestUtils is not None:
            self._kafkaTestUtils.teardown()
            self._kafkaTestUtils = None

        super(KafkaStreamTests, self).tearDown()

    def _randomTopic(self):
        return "topic-%d" % random.randint(0, 10000)

    def _validateStreamResult(self, sendData, stream):
        result = {}
        for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
                                                   sum(sendData.values()))):
            result[i] = result.get(i, 0) + 1

        self.assertEqual(sendData, result)

    def test_kafka_stream(self):
        """Test the Python Kafka stream API."""
        topic = self._randomTopic()
        sendData = {"a": 3, "b": 5, "c": 10}

        self._kafkaTestUtils.createTopic(topic)
        self._kafkaTestUtils.sendMessages(topic, sendData)

        stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
                                         "test-streaming-consumer", {topic: 1},
                                         {"auto.offset.reset": "smallest"})
        self._validateStreamResult(sendData, stream)

Flume、Kinesis和其他pyspark.streaming.tests模块的更多示例。

于 2018-01-24T06:52:37.207 回答
0

这是一个在 Python 中针对 Kafka 相关功能进行自动化测试的示例:https ://github.com/up9inc/async-ms-demo/blob/main/grayscaler/tests.py

它使用http://mockintosh.io项目的“Kafka Mock”功能。

免责声明:我隶属于该项目。

于 2021-06-21T14:16:48.823 回答