0

在 Azure 函数中,我们如何将 Kafka 生产者连接设为单例或连接池。每次触发函数时,都会创建一个新的 Kafka 连接。

import json
import logging
import os, time
import azure.functions as func
from confluent_kafka import Producer

def main(event: func.EventGridEvent):

    ##kafka configuration
    conf = {
    'bootstrap.servers': os.getenv('AZURE_EVENT_HUB_SERVER') + ':9093',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': '$ConnectionString',
    'sasl.password': os.getenv('AZURE_EVENT_HUB_CONN_STRING')
    }


    data = event.get_json()
    topic = "events"

    p = Producer(**conf)

    if topic is not None:
        try:
            p.produce(topic, key=data.id, value=data)
            # logging.info('Producing message %s', file_path)
        except BufferError:
            logging.error('%% Local producer queue is full (%d messages awaiting delivery): try again\n', len(p))
        # Wait until all messages have been delivered
        p.flush()

        logging.info(f'Sucessfully completed the processing for the event: {event.get_json()}')
    else:
        logging.error(f'Failed')

4

0 回答 0