在 Azure 函数中,我们如何将 Kafka 生产者连接设为单例或连接池。每次触发函数时,都会创建一个新的 Kafka 连接。
import json
import logging
import os, time
import azure.functions as func
from confluent_kafka import Producer
def main(event: func.EventGridEvent):
##kafka configuration
conf = {
'bootstrap.servers': os.getenv('AZURE_EVENT_HUB_SERVER') + ':9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': os.getenv('AZURE_EVENT_HUB_CONN_STRING')
}
data = event.get_json()
topic = "events"
p = Producer(**conf)
if topic is not None:
try:
p.produce(topic, key=data.id, value=data)
# logging.info('Producing message %s', file_path)
except BufferError:
logging.error('%% Local producer queue is full (%d messages awaiting delivery): try again\n', len(p))
# Wait until all messages have been delivered
p.flush()
logging.info(f'Sucessfully completed the processing for the event: {event.get_json()}')
else:
logging.error(f'Failed')