0

所以我正在尝试在 S3 事件上编写一个 lambda 函数,它将消息放入 kafka 主题。我的 aws lambda 函数正在触发并且也没有收到任何错误。但我无法在 Kafka 主题中看到这些消息。

这是我的 lambda 函数

String srcBucket = record.getS3().getBucket().getName();

        String srcKey = record.getS3().getObject().getUrlDecodedKey();

        System.out.println("Bucket is " + srcBucket + "  and Key is " + srcKey);
        // Assign topicName to string variable
        String topicName = "AWSKafkaTutorialTopic";

        // create instance for properties to access producer configs
        Properties props = new Properties();

        props.put("bootstrap.servers",
                "b-3.205147-riskaudit.rtyrty.c5.kafka.us-east-1.amazonaws.com:9092,b-4.205147-riskaudit.rtyt.c5.kafka.us-east-1.amazonaws.com:9092,b-5.205147-tryrt.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092");
        System.out.println("bootstrap.servers successfully");
        // Set acknowledgements for producer requests.
        props.put("acks", "all");

        // If the request fails, the producer can automatically retry,
        props.put("retries", 0);

        // Specify buffer size in config
        props.put("batch.size", 16384);

        // Reduce the no of requests less than 0
        props.put("linger.ms", 1);

        // The buffer.memory controls the total amount of memory available to the
        // producer for buffering.
        props.put("buffer.memory", 33554432);

        System.out.println("before key.serializer successfully");

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        System.out.println("after  key.serializer successfully");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        System.out.println("Inside loop successfully");
        for (int i = 0; i < 10; i++)

            producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
        System.out.println("Message sent successfully");
        producer.close();

        return "Message Pushed success fully";

我的 lambda 函数一直运行到 for 循环,但无法看到之后会发生什么。请帮忙

4

1 回答 1

0

对我来说一切都很好只需添加props.put("producer.type", "async"); 并且您可能没有从启动 MSK 的 vpc 运行您的 lmbda 功能。另外请注意 IAM 政策。试试这个AWSLambdaVPCAccessExecutionRole和安全组。

如果您设置所有这些代码将开始工作。

于 2019-12-13T17:49:58.027 回答