我们有一个消息调度程序,它在将消息属性放入带有该键的 Kafka 主题队列之前从消息属性生成一个哈希键。
这样做是出于重复数据删除的目的。但是,我不确定如何在不实际设置本地集群并检查它是否按预期执行的情况下测试这种重复数据删除。
在线搜索用于模拟 Kafka 主题队列的工具并没有帮助,我担心我可能以错误的方式思考这个问题。
最终,任何用于模拟 Kafka 队列的行为都应该与本地集群的行为方式相同 - 即通过向主题队列插入密钥来提供重复数据删除。
有没有这样的工具?
我们有一个消息调度程序,它在将消息属性放入带有该键的 Kafka 主题队列之前从消息属性生成一个哈希键。
这样做是出于重复数据删除的目的。但是,我不确定如何在不实际设置本地集群并检查它是否按预期执行的情况下测试这种重复数据删除。
在线搜索用于模拟 Kafka 主题队列的工具并没有帮助,我担心我可能以错误的方式思考这个问题。
最终,任何用于模拟 Kafka 队列的行为都应该与本地集群的行为方式相同 - 即通过向主题队列插入密钥来提供重复数据删除。
有没有这样的工具?
如果您需要验证 Kafka 特定功能,或使用 Kafka 特定功能实现,那么唯一的方法就是使用 Kafka!
Kafka 是否对其重复数据删除逻辑进行了任何测试?如果是这样,以下组合可能足以减轻您的组织感知到的失败风险:
如果 Kafka 没有围绕其主题重复数据删除进行任何类型的测试,或者您担心破坏性更改,那么围绕 Kafka 特定功能进行自动检查很重要。这可以通过集成测试来完成。我最近在基于 Docker 的集成测试管道上取得了很大的成功。在创建 Kafka docker 镜像(社区可能已经提供了一个)的初步工作之后,设置集成测试管道变得微不足道。管道可能如下所示:
我认为重要的是确保将 Kafka 集成测试最小化为仅包含绝对依赖于 Kafka 特定功能的测试。即使使用 docker-compose,它们也可能比单元测试慢几个数量级,~1 毫秒 vs 1 秒?要考虑的另一件事是维护集成管道的开销可能值得冒险相信 Kakfa 将提供它声称的主题重复数据删除。
为了在 Python单元测试下使用 SBT测试任务模拟Kafka,我做了如下操作。应该安装Pyspark 。
在build.sbt中定义应该与测试一起运行的任务:
val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")
val command = "python3 -m unittest app_test.py"
val workingDirectory = new File("./project/src/main/python")
testPythonTask := {
val s: TaskStreams = streams.value
s.log.info("Executing task testPython")
Process(command,
workingDirectory,
// arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
"PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
// collect all jar paths from project
.format((fullClasspath in Runtime value)
.map(_.data.getCanonicalPath)
.filter(_.contains(".jar"))
.mkString(",")),
"PYSPARK_PYTHON" -> "python3") ! s.log
}
//attach custom test task to default test tasks
test in Test := {
testPythonTask.value
(test in Test).value
}
testOnly in Test := {
testPythonTask.value
(testOnly in Test).value
}
在python 测试用例(app_test.py)中:
import random
import unittest
from itertools import chain
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.tests import PySparkStreamingTestCase
class KafkaStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds
duration = 1
def setUp(self):
super(KafkaStreamTests, self).setUp()
kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
self._kafkaTestUtils.setup()
def tearDown(self):
if self._kafkaTestUtils is not None:
self._kafkaTestUtils.teardown()
self._kafkaTestUtils = None
super(KafkaStreamTests, self).tearDown()
def _randomTopic(self):
return "topic-%d" % random.randint(0, 10000)
def _validateStreamResult(self, sendData, stream):
result = {}
for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
sum(sendData.values()))):
result[i] = result.get(i, 0) + 1
self.assertEqual(sendData, result)
def test_kafka_stream(self):
"""Test the Python Kafka stream API."""
topic = self._randomTopic()
sendData = {"a": 3, "b": 5, "c": 10}
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
{"auto.offset.reset": "smallest"})
self._validateStreamResult(sendData, stream)
Flume、Kinesis和其他pyspark.streaming.tests
模块的更多示例。
这是一个在 Python 中针对 Kafka 相关功能进行自动化测试的示例:https ://github.com/up9inc/async-ms-demo/blob/main/grayscaler/tests.py
它使用http://mockintosh.io项目的“Kafka Mock”功能。
免责声明:我隶属于该项目。