所以我正在尝试在 S3 事件上编写一个 lambda 函数,它将消息放入 kafka 主题。我的 aws lambda 函数正在触发并且也没有收到任何错误。但我无法在 Kafka 主题中看到这些消息。
这是我的 lambda 函数
String srcBucket = record.getS3().getBucket().getName();
String srcKey = record.getS3().getObject().getUrlDecodedKey();
System.out.println("Bucket is " + srcBucket + " and Key is " + srcKey);
// Assign topicName to string variable
String topicName = "AWSKafkaTutorialTopic";
// create instance for properties to access producer configs
Properties props = new Properties();
props.put("bootstrap.servers",
"b-3.205147-riskaudit.rtyrty.c5.kafka.us-east-1.amazonaws.com:9092,b-4.205147-riskaudit.rtyt.c5.kafka.us-east-1.amazonaws.com:9092,b-5.205147-tryrt.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092");
System.out.println("bootstrap.servers successfully");
// Set acknowledgements for producer requests.
props.put("acks", "all");
// If the request fails, the producer can automatically retry,
props.put("retries", 0);
// Specify buffer size in config
props.put("batch.size", 16384);
// Reduce the no of requests less than 0
props.put("linger.ms", 1);
// The buffer.memory controls the total amount of memory available to the
// producer for buffering.
props.put("buffer.memory", 33554432);
System.out.println("before key.serializer successfully");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
System.out.println("after key.serializer successfully");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
System.out.println("Inside loop successfully");
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
System.out.println("Message sent successfully");
producer.close();
return "Message Pushed success fully";
我的 lambda 函数一直运行到 for 循环,但无法看到之后会发生什么。请帮忙