1

每个 Kinesis 应用程序都必须包含以下三个组件:

  1. IRecordProcessor 接口

  2. 实现 IRecordProcessor 接口的类的工厂

  3. 初始化应用程序并创建工作者的代码

现在我在这里了解到,一旦我们将生产者配置为将记录添加到 Kinesis 流。然后 KCL 应用程序可以通过以下 processRecords 实现从 Kinesis 流中读取记录。那么这个 processRecords 方法应该有某种方法将它传递给 S3 以使用连接器库进行最终存储。

public void processRecords(列出记录,IRecordProcessorCheckpointer checkpointer)

查询:我应该如何从 KCL 应用程序的 processRecords 调用连接器库来将数据记录存储在 S3 上?

我浏览了显示示例 Kinesis 应用程序的链接 https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonKinesisApplication/SampleRecordProcessor.java

在上面的链接中,我正在粘贴方法片段,如下所示。

private void processRecordsWithRetries(List<Record> records) {
        for (Record record : records) {
            boolean processedSuccessfully = false;
            String data = null;
            for (int i = 0; i < NUM_RETRIES; i++) {
                try {
                    // For this app, we interpret the payload as UTF-8 chars.
                    data = decoder.decode(record.getData()).toString();
                    LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data);
            //
                    // Logic to process record goes here.
                    //
                    processedSuccessfully = true;
                    break;
                } catch (CharacterCodingException e) {
                    LOG.error("Malformed data: " + data, e);
                    break;
                } catch (Throwable t) {
                    LOG.warn("Caught throwable while processing record " + record, t);
                }

                // backoff if we encounter an exception.
                try {
                    Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                } catch (InterruptedException e) {
                    LOG.debug("Interrupted sleep", e);
                }
            }

            if (!processedSuccessfully) {
                LOG.error("Couldn't process record " + record + ". Skipping the record.");
            }
        }
    }

在上面的代码中,当我们说“处理记录的逻辑在这里”。(请参阅上面的代码)这里我的要求是将数据放在 s3 上。我知道我们有可以执行此操作的连接器库,但是我现在无法想象如何调用连接器库?请建议

4

1 回答 1

4

您应该尝试 kinesis 连接器库,它有一个您需要的示例: https ://github.com/awslabs/amazon-kinesis-connectors 。

于 2014-08-01T22:16:36.403 回答