我是 Pytest 框架的新手。我目前有一个 Python 应用程序,它使用 Kafka-Python 消费和生成消息。我正在尝试编写集成测试,以验证是否已从主题消费和生产。我目前正在使用pytest-docker
一个带有生产者和消费者主题的 Kafka 容器。我遇到间歇性错误“NoBrokersAvailable,无法识别的代理版本”我不确定我做错了什么。任何帮助将不胜感激。
码头工人-compose.yml:
version: '2.1'
services:
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
create-consumer-topic:
image: confluentinc/cp-kafka:5.3.0
depends_on:
- kafka1
command: |
bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka1:9092 1 20 && \
kafka-topics --create --topic test-consumer --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zoo1:2181 && \
sleep infinity'
environment:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
create-producer-topic:
image: confluentinc/cp-kafka:5.3.0
depends_on:
- kafka1
command: |
bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka1:9092 1 20 && \
kafka-topics --create --topic test-producer --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zoo1:2181 && \
kafka-console-producer
sleep infinity'
environment:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
conftest.yml:
@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
return os.path.join(str(pytestconfig.rootdir), "docker-compose.yml")
@pytest.fixture(scope="session")
def producer(docker_services):
producer = Producer(config=KAFKA_PRODUCER_CONFIG, key_serializer=str.encode,
value_serializer=str.encode, alerter=None)
sleep(4)
return producer
测试.py:
def test__producer_sends_to_topic(producer):
producer.open()
producer.send(Payload(key="hello", value="world"))
assert <code to assert>
制片人:
class Producer:
"""
Wrapper for kafka producer.
This wrapper handles creating, closing and sending messages to topic.
"""
def __init__(self, config: Dict[str, str], key_serializer, value_serializer, alerter=None):
self.alerter = alerter
self.config = config
self.retries = config['retries']
self.topic = config['topic']
self.seconds_between_tries = config['seconds_between_tries']
self.producer = None
self.key_serializer = key_serializer
self.value_serializer = value_serializer
def _get_context(self) -> SSLContext:
context = create_default_context(
purpose=Purpose.CLIENT_AUTH,
cafile=self.config['ca_file'])
context.load_cert_chain(
self.config['cert_file'],
password=self.config['password'])
return context
def open(self) -> KafkaProducer:
""" Create the underlying KafkaProducer """
if self.config.get('ssl_enabled', True):
ssl_context = self._get_context()
else:
ssl_context = None
self.producer = KafkaProducer(
ssl_context=ssl_context,
acks=self.config['acks'],
client_id=self.config['client_id'],
bootstrap_servers=self.config['bootstrap_servers'],
security_protocol=self.config['security_protocol'],
ssl_check_hostname=self.config['ssl_check_hostname'],
key_serializer=self.key_serializer,
value_serializer=self.value_serializer,
request_timeout_ms=self.config['request_timeout_ms'],
max_block_ms=self.config['max_block_ms'],
batch_size=self.config['batch_size'],
linger_ms=self.config['linger_ms'],
buffer_memory=self.config['buffer_memory']
)