每个 Kinesis 应用程序都必须包含以下三个组件:
IRecordProcessor 接口
实现 IRecordProcessor 接口的类的工厂
初始化应用程序并创建工作者的代码
现在我在这里了解到,一旦我们将生产者配置为将记录添加到 Kinesis 流。然后 KCL 应用程序可以通过以下 processRecords 实现从 Kinesis 流中读取记录。那么这个 processRecords 方法应该有某种方法将它传递给 S3 以使用连接器库进行最终存储。
public void processRecords(列出记录,IRecordProcessorCheckpointer checkpointer)
查询:我应该如何从 KCL 应用程序的 processRecords 调用连接器库来将数据记录存储在 S3 上?
我浏览了显示示例 Kinesis 应用程序的链接 https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonKinesisApplication/SampleRecordProcessor.java
在上面的链接中,我正在粘贴方法片段,如下所示。
private void processRecordsWithRetries(List<Record> records) {
for (Record record : records) {
boolean processedSuccessfully = false;
String data = null;
for (int i = 0; i < NUM_RETRIES; i++) {
try {
// For this app, we interpret the payload as UTF-8 chars.
data = decoder.decode(record.getData()).toString();
LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data);
//
// Logic to process record goes here.
//
processedSuccessfully = true;
break;
} catch (CharacterCodingException e) {
LOG.error("Malformed data: " + data, e);
break;
} catch (Throwable t) {
LOG.warn("Caught throwable while processing record " + record, t);
}
// backoff if we encounter an exception.
try {
Thread.sleep(BACKOFF_TIME_IN_MILLIS);
} catch (InterruptedException e) {
LOG.debug("Interrupted sleep", e);
}
}
if (!processedSuccessfully) {
LOG.error("Couldn't process record " + record + ". Skipping the record.");
}
}
}
在上面的代码中,当我们说“处理记录的逻辑在这里”。(请参阅上面的代码)这里我的要求是将数据放在 s3 上。我知道我们有可以执行此操作的连接器库,但是我现在无法想象如何调用连接器库?请建议