我将 KPL 与 AWS lambda (Java) 一起用于生成 Kinesis 流。
我添加消息的代码是这样的:
ListenableFuture<UserRecordResult> f = KP.addUserRecord(Stream, partitionKey, ByteBuffer.wrap(data.getBytes()));
Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
private Logger LG = Logger.getLogger(this.getClass());
@Override
public void onSuccess(UserRecordResult result) {
LG.info("Successfully sent "+result.getSequenceNumber()+" to stream shard #"+result.getShardId());
}
@Override
public void onFailure(Throwable t) {
LG.debug("Something wrong happend while sending to stream , "+t.getMessage());
}
});
问题有时是在执行期间Lambda
,生产者没有将消息提交给 Kinesis。因此,如果我想强制执行它来推送消息,我必须稍后调用flushSync()
导致 Lambda 出现其他错误的方法。
我的 KPL 配置也是:
AggregationEnabled = true
AggregationMaxCount = 4294967295
AggregationMaxSize = 51200
CollectionMaxCount = 500
CollectionMaxSize = 5242880
ConnectTimeout = 6000
FailIfThrottled = false
MaxConnections = 24
MetricsGranularity = shard
MinConnections = 1
RateLimit = 150
RecordMaxBufferedTime = 3000
RecordTtl = 30000
RequestTimeout = 60000
VerifyCertificate = true
CredentialsRefreshDelay = 100