0

我是 Pytest 框架的新手。我目前有一个 Python 应用程序,它使用 Kafka-Python 消费和生成消息。我正在尝试编写集成测试,以验证是否已从主题消费和生产。我目前正在使用pytest-docker一个带有生产者和消费者主题的 Kafka 容器。我遇到间歇性错误“NoBrokersAvailable,无法识别的代理版本”我不确定我做错了什么。任何帮助将不胜感激。

码头工人-compose.yml:

version: '2.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

  create-consumer-topic:
    image: confluentinc/cp-kafka:5.3.0
    depends_on:
      - kafka1
    command: |
      bash -c 'echo Waiting for Kafka to be ready... && \
      cub kafka-ready -b kafka1:9092 1 20 && \
      kafka-topics --create --topic test-consumer --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zoo1:2181 && \
      sleep infinity'
    environment:
      KAFKA_BROKER_ID: ignored
      KAFKA_ZOOKEEPER_CONNECT: ignored

  create-producer-topic:
    image: confluentinc/cp-kafka:5.3.0
    depends_on:
      - kafka1
    command: |
      bash -c 'echo Waiting for Kafka to be ready... && \
      cub kafka-ready -b kafka1:9092 1 20 && \
      kafka-topics --create --topic test-producer --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zoo1:2181 && \
      kafka-console-producer
      sleep infinity'
    environment:
      KAFKA_BROKER_ID: ignored
      KAFKA_ZOOKEEPER_CONNECT: ignored

conftest.yml:

@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
    return os.path.join(str(pytestconfig.rootdir), "docker-compose.yml")


@pytest.fixture(scope="session")
def producer(docker_services):
    producer = Producer(config=KAFKA_PRODUCER_CONFIG, key_serializer=str.encode,
                        value_serializer=str.encode, alerter=None)
    sleep(4)
    return producer

测试.py:

def test__producer_sends_to_topic(producer):
    producer.open()
    producer.send(Payload(key="hello", value="world"))
    assert <code to assert>

制片人:

class Producer:
    """
    Wrapper for kafka producer.
    This wrapper handles creating, closing and sending messages to topic.
    """

    def __init__(self, config: Dict[str, str], key_serializer, value_serializer, alerter=None):
        self.alerter = alerter
        self.config = config
        self.retries = config['retries']
        self.topic = config['topic']
        self.seconds_between_tries = config['seconds_between_tries']
        self.producer = None
        self.key_serializer = key_serializer
        self.value_serializer = value_serializer

    def _get_context(self) -> SSLContext:
        context = create_default_context(
            purpose=Purpose.CLIENT_AUTH,
            cafile=self.config['ca_file'])
        context.load_cert_chain(
            self.config['cert_file'],
            password=self.config['password'])
        return context

    def open(self) -> KafkaProducer:
        """ Create the underlying KafkaProducer """
        if self.config.get('ssl_enabled', True):
            ssl_context = self._get_context()
        else:
            ssl_context = None

        self.producer = KafkaProducer(
            ssl_context=ssl_context,
            acks=self.config['acks'],
            client_id=self.config['client_id'],
            bootstrap_servers=self.config['bootstrap_servers'],
            security_protocol=self.config['security_protocol'],
            ssl_check_hostname=self.config['ssl_check_hostname'],
            key_serializer=self.key_serializer,
            value_serializer=self.value_serializer,
            request_timeout_ms=self.config['request_timeout_ms'],
            max_block_ms=self.config['max_block_ms'],
            batch_size=self.config['batch_size'],
            linger_ms=self.config['linger_ms'],
            buffer_memory=self.config['buffer_memory']
        )
4

1 回答 1

0

您可以检查以下内容

如果您在本地或在云环境中运行 Pytest,您的 FastAPI 应用程序可能无法连接到 Kafka。原因可能是 Kafka 托管在您未连接到的集群上的云中。

在这种情况下,您的 Pytest 尝试创建生产者但找不到它,导致没有可用的代理。

要隔离这一点,您可以:

  • 在您的 FastAPI 中创建一个单独的端点进行测试(不使用任何 Kafka)
  • 禁用生产者的创建或将其设置为 Producer is None 例如
producer = None
try:
    producer = KafkaProducer(
        bootstrap_servers=[BROKER],
        value_serializer=lambda m: json.dumps(m).encode('ascii')
        )
except Exception as e:
    print("No Broker, can we connect to Kafka?")
    print(f"Error e:, {e}")
于 2021-10-04T07:56:47.700 回答