我想使用融合的 Kafka 库“github.com/confluentinc/confluent-kafka-go/kafka”在 Golang 中配置具有 Kerberos 身份验证的 Kafka 消费者。还需要使用 Avro 类型的 Kafka 消息来读取消息。在此处获取 Java 代码。
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
public class KerberosConsumer {
static final Logger logger = Logger.getLogger(AvroConsumer.class);
public static void main(String[] args) throws InterruptedException, FileNotFoundException, IOException {
PropertyConfigurator.configure(args[0]);
Properties prop = new Properties();
prop.load(new FileInputStream(args[1]));
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BOOTSTRAP_SERVER_1:9093,BOOTSTRAP_SERVER_2:9093");
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "ConsumerClient");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "<CONSUMER_GROUP_ID>");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "<PATH>/kafka-client.truststore.jks");
properties.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty("schema.registry.url", "<SCHEMA_REGISTRY_URL>");
System.setProperty("java.security.krb5.conf", "<PATH>/krb5.conf");
System.setProperty("java.security.auth.login.config", "<PATH>/kafka_client_jaas.conf");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("<TOPIC_NAME>"));
consumer.poll(0); // without this the below statement never got any records
ObjectMapper mapper = new ObjectMapper();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(0);
for (ConsumerRecord<String, String> record : records)
logger.info("Message received successfully - Partition:" + record.partition() + " - Offset:" + record.offset() + " - Message:" + mapper.writeValueAsString(record.value().toString()));
}
} finally {
consumer.close();
}
}
}
这应该如何在golang中完成?