我们有一个 Apache Flink POC 应用程序,它在本地运行良好,但在我们部署到 Kinesis Data Analytics (KDA) 后,它不会将记录发送到接收器中。
使用的技术
当地的
- 资料来源:卡夫卡 2.7
- 1个经纪人
- 1 个分区为 1 且复制因子为 1 的主题
- 处理:Flink 1.12.1
- 接收器:托管 ElasticSearch 服务 7.9.1(与 AWS 相同的实例)
AWS
- 资料来源:亚马逊 MSK Kafka 2.8
- 3 个经纪人(但我们正在连接一个)
- 1 个分区为 1 的主题,复制因子为 3
- 处理:亚马逊 KDA Flink 1.11.1
- 并行度:2
- 每个 KPU 的并行度:2
- 接收器:托管 ElasticSearch 服务 7.9.1
应用逻辑
FlinkKafkaConsumer
从主题中读取 json 格式的消息- json 映射到域对象,称为
Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);
consumer.setStartFromEarliest(); //Just for repeatable testing
return environment
.addSource(consumer)
.map(new MapJsonToTelemetry());
}
- 为 EventTimeStamp 选择了 Telemetry 的时间戳。
3.1。和forMonotonousTimeStamps
- 遥测
StateIso
用于keyBy
.
4.1。美国的两个字母 iso 代码 - 应用 5 秒翻滚窗口策略
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
WatermarkStrategy<Telemetry> wmStrategy =
WatermarkStrategy
.<Telemetry>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.TimeStamp);
return telemetries
.assignTimestampsAndWatermarks(wmStrategy)
.keyBy(t -> t.StateIso)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new WindowCountFunction());
}
- 调用一个自定义
ProcessWindowFunction
来执行一些基本的聚合。
6.1。我们计算单个StateAggregatedTelemetry
- ElasticSearch 被配置为接收器。
7.1。StateAggregatedTelemetry
数据被映射到 aHashMap
并推入source
.
7.2. 所有setBulkFlushXYZ
方法都设置为低值
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));
ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
(ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
Map<String, Object> record = new HashMap<>();
record.put("stateIso", element.StateIso);
record.put("healthy", element.Flawless);
record.put("unhealthy", element.Faulty);
...
LOG.info("Telemetry has been added to the buffer");
indexer.add(Requests.indexRequest()
.index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
.source(record, XContentType.JSON));
}
);
//Using low values to make sure that the Flush will happen
esSinkBuilder.setBulkFlushMaxActions(25);
esSinkBuilder.setBulkFlushInterval(1000);
esSinkBuilder.setBulkFlushMaxSizeMb(1);
esSinkBuilder.setBulkFlushBackoff(true);
esSinkBuilder.setRestClientFactory(restClientBuilder -> {});
LOG.info("Sink has been attached to the DataStream");
telemetries.addSink(esSinkBuilder.build());
}
排除的东西
- 我们设法将 Kafka、KDA 和 ElasticSearch 放在同一个 VPC 和同一个子网下,以避免需要对每个请求进行签名
- 从日志中我们可以看到 Flink 可以到达 ES 集群。
要求
{
"locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
"logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
"message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
回复
{
"locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
"logger": "org.elasticsearch.client.RestClient",
"message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "DEBUG"
}
我们没有运气的尝试
- 我们已经实现了一个
RichParallelSourceFunction
发出 1_000_000 条消息然后退出 的- 这在本地环境中运行良好
- 作业在 AWS 环境中完成,但 sink 端没有数据
- 我们已经实现了另一个
RichParallelSourceFunction
每秒发出 100 条消息- 基本上我们有两个循环,一个
while(true)
外部和for
内部 - 在内部循环之后,我们调用
Thread.sleep(1000)
- 这在本地环境中运行良好
- 但是在 AWS 中,我们可以看到检查点的大小不断增长,并且在 ELK 中没有出现任何消息
- 基本上我们有两个循环,一个
- 我们尝试使用不同的并行设置运行 KDA 应用程序
- 但是没有区别
- 我们还尝试使用不同的水印策略(
forBoundedOutOfOrderness
,withIdle
,noWatermarks
)- 但是没有区别
- 我们添加了
ProcessWindowFunction
和 的日志ElasticsearchSinkFunction
- 每当我们从 IDEA 运行应用程序时,这些日志都在控制台上
- 每当我们使用 KDA 运行应用程序时,CloudWatch 中就没有此类日志
- 添加到
main
它们的那些日志确实出现在 CloudWatch 日志中
- 添加到
我们假设我们在 sink 端看不到数据,因为没有触发窗口处理逻辑。这就是为什么在 CloudWatch 中看不到处理日志的原因。
任何帮助都将受到欢迎!
更新#1
- 我们已经尝试将 Flink 版本从 1.12.1 降级到 1.11.1
- 没有变化
- 我们尝试过处理时间窗口而不是事件时间
- 它甚至不适用于本地环境
更新#2
平均消息大小约为 4kb。以下是示例消息的摘录:
{
"affiliateCode": "...",
"appVersion": "1.1.14229",
"clientId": "guid",
"clientIpAddr": "...",
"clientOriginated": true,
"connectionType": "Cable/DSL",
"countryCode": "US",
"design": "...",
"device": "...",
...
"deviceSerialNumber": "...",
"dma": "UNKNOWN",
"eventSource": "...",
"firstRunTimestamp": 1609091112818,
"friendlyDeviceName": "Comcast",
"fullDevice": "Comcast ...",
"geoInfo": {
"continent": {
"code": "NA",
"geoname_id": 120
},
"country": {
"geoname_id": 123,
"iso_code": "US"
},
"location": {
"accuracy_radius": 100,
"latitude": 37.751,
"longitude": -97.822,
"time_zone": "America/Chicago"
},
"registered_country": {
"geoname_id": 123,
"iso_code": "US"
}
},
"height": 720,
"httpUserAgent": "Mozilla/...",
"isLoggedIn": true,
"launchCount": 19,
"model": "...",
"os": "Comcast...",
"osVersion": "...",
...
"platformTenantCode": "...",
"productCode": "...",
"requestOrigin": "https://....com",
"serverTimeUtc": 1617809474787,
"serviceCode": "...",
"serviceOriginated": false,
"sessionId": "guid",
"sessionSequence": 2,
"subtype": "...",
"tEventId": "...",
...
"tRegion": "us-east-1",
"timeZoneOffset": 5,
"timestamp": 1617809473305,
"traits": {
"isp": "Comcast Cable",
"organization": "..."
},
"type": "...",
"userId": "guid",
"version": "v1",
"width": 1280,
"xb3traceId": "guid"
}
我们只ObjectMapper
用来解析 json 的一些字段。下面是这个Telemetry
类的样子:
public class Telemetry {
public String AppVersion;
public String CountryCode;
public String ClientId;
public String DeviceSerialNumber;
public String EventSource;
public String SessionId;
public TelemetrySubTypes SubType; //enum
public String TRegion;
public Long TimeStamp;
public TelemetryTypes Type; //enum
public String StateIso;
...
}
更新#3
资源
子任务选项卡
ID | 收到的字节数 | 收到的记录 | 发送的字节数 | 发送的记录 | 地位 |
---|---|---|---|---|---|
0 | 0乙 | 0 | 0乙 | 0 | 跑步 |
1 | 0乙 | 0 | 2.83 MB | 15,000 | 跑步 |
水印标签
没有数据
窗户
子任务选项卡
ID | 收到的字节数 | 收到的记录 | 发送的字节数 | 发送的记录 | 地位 |
---|---|---|---|---|---|
0 | 1.80 MB | 9,501 | 0乙 | 0 | 跑步 |
1 | 1.04 MB | 5,499 | 0乙 | 0 | 跑步 |
水印
子任务 | 水印 |
---|---|
1 | 无水印 |
2 | 无水印 |