1

我们有一个 Apache Flink POC 应用程序,它在本地运行良好,但在我们部署到 Kinesis Data Analytics (KDA) 后,它不会将记录发送到接收器中。

使用的技术

当地的

  • 资料来源:卡夫卡 2.7
    • 1个经纪人
    • 1 个分区为 1 且复制因子为 1 的主题
  • 处理:Flink 1.12.1
  • 接收器:托管 ElasticSearch 服务 7.9.1(与 AWS 相同的实例)

AWS

  • 资料来源:亚马逊 MSK Kafka 2.8
    • 3 个经纪人(但我们正在连接一个)
    • 1 个分区为 1 的主题,复制因子为 3
  • 处理:亚马逊 KDA Flink 1.11.1
    • 并行度:2
    • 每个 KPU 的并行度:2
  • 接收器:托管 ElasticSearch 服务 7.9.1

应用逻辑

  1. FlinkKafkaConsumer从主题中读取 json 格式的消息
  2. json 映射到域对象,称为Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
    kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);

    consumer.setStartFromEarliest(); //Just for repeatable testing

    return environment
            .addSource(consumer)
            .map(new MapJsonToTelemetry());
}
  1. 为 EventTimeStamp 选择了 Telemetry 的时间戳。
    3.1。和forMonotonousTimeStamps
  2. 遥测StateIso用于keyBy.
    4.1。美国的两个字母 iso 代码
  3. 应用 5 秒翻滚窗口策略
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
    WatermarkStrategy<Telemetry> wmStrategy =
            WatermarkStrategy
                    .<Telemetry>forMonotonousTimestamps()
                    .withTimestampAssigner((event, timestamp) -> event.TimeStamp);

    return telemetries
            .assignTimestampsAndWatermarks(wmStrategy)
            .keyBy(t -> t.StateIso)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new WindowCountFunction());
}
  1. 调用一个自定义ProcessWindowFunction来执行一些基本的聚合。
    6.1。我们计算单个StateAggregatedTelemetry
  2. ElasticSearch 被配置为接收器。
    7.1。 StateAggregatedTelemetry数据被映射到 aHashMap并推入source.
    7.2. 所有setBulkFlushXYZ方法都设置为低值
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
    List<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));

    ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
            httpHosts,
            (ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
                Map<String, Object> record = new HashMap<>();

                record.put("stateIso", element.StateIso);
                record.put("healthy", element.Flawless);
                record.put("unhealthy", element.Faulty);
                ...

                LOG.info("Telemetry has been added to the buffer");
                indexer.add(Requests.indexRequest()
                        .index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
                        .source(record, XContentType.JSON));
            }
    );

    //Using low values to make sure that the Flush will happen
    esSinkBuilder.setBulkFlushMaxActions(25);
    esSinkBuilder.setBulkFlushInterval(1000);
    esSinkBuilder.setBulkFlushMaxSizeMb(1);
    esSinkBuilder.setBulkFlushBackoff(true);
    esSinkBuilder.setRestClientFactory(restClientBuilder -> {});

    LOG.info("Sink has been attached to the DataStream");
    telemetries.addSink(esSinkBuilder.build());
}

排除的东西

  • 我们设法将 Kafka、KDA 和 ElasticSearch 放在同一个 VPC 和同一个子网下,以避免需要对每个请求进行签名
  • 从日志中我们可以看到 Flink 可以到达 ES 集群。
    要求
{
    "locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
    "logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
    "message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "INFO"
}

回复

{
    "locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
    "logger": "org.elasticsearch.client.RestClient",
    "message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
    "threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
    "applicationARN": "arn:aws:kinesisanalytics:...",
    "applicationVersionId": "39",
    "messageSchemaVersion": "1",
    "messageType": "DEBUG"
}
  • 我们还可以通过查看 Flink Dashboard 来验证消息是否已从 Kafka 主题中读取并发送以进行处理 任务之间发送和接收数据

我们没有运气的尝试

  • 我们已经实现了一个RichParallelSourceFunction发出 1_000_000 条消息然后退出 的
    • 这在本地环境中运行良好
    • 作业在 AWS 环境中完成,但 sink 端没有数据
  • 我们已经实现了另一个RichParallelSourceFunction每秒发出 100 条消息
    • 基本上我们有两个循环,一个while(true)外部和for内部
    • 在内部循环之后,我们调用Thread.sleep(1000)
    • 这在本地环境中运行良好
    • 但是在 AWS 中,我们可以看到检查点的大小不断增长,并且在 ELK 中没有出现任何消​​息
  • 我们尝试使用不同的并行设置运行 KDA 应用程序
    • 但是没有区别
  • 我们还尝试使用不同的水印策略(forBoundedOutOfOrderness, withIdle, noWatermarks
    • 但是没有区别
  • 我们添加了ProcessWindowFunction和 的日志ElasticsearchSinkFunction
    • 每当我们从 IDEA 运行应用程序时,这些日志都在控制台上
    • 每当我们使用 KDA 运行应用程序时,CloudWatch 中就没有此类日志
      • 添加到main它们的那些日志确实出现在 CloudWatch 日志中

我们假设我们在 sink 端看不到数据,因为没有触发窗口处理逻辑。这就是为什么在 CloudWatch 中看不到处理日志的原因。

任何帮助都将受到欢迎!


更新#1

  • 我们已经尝试将 Flink 版本从 1.12.1 降级到 1.11.1
    • 没有变化
  • 我们尝试过处理时间窗口而不是事件时间
    • 它甚至不适用于本地环境

更新#2

平均消息大小约为 4kb。以下是示例消息的摘录:

{
  "affiliateCode": "...",
  "appVersion": "1.1.14229",
  "clientId": "guid",
  "clientIpAddr": "...",
  "clientOriginated": true,
  "connectionType": "Cable/DSL",
  "countryCode": "US",
  "design": "...",
  "device": "...",
  ...
  "deviceSerialNumber": "...",
  "dma": "UNKNOWN",
  "eventSource": "...",
  "firstRunTimestamp": 1609091112818,
  "friendlyDeviceName": "Comcast",
  "fullDevice": "Comcast ...",
  "geoInfo": {
    "continent": {
      "code": "NA",
      "geoname_id": 120
    },
    "country": {
      "geoname_id": 123,
      "iso_code": "US"
    },
    "location": {
      "accuracy_radius": 100,
      "latitude": 37.751,
      "longitude": -97.822,
      "time_zone": "America/Chicago"
    },
    "registered_country": {
      "geoname_id": 123,
      "iso_code": "US"
    }
  },
  "height": 720,
  "httpUserAgent": "Mozilla/...",
  "isLoggedIn": true,
  "launchCount": 19,
  "model": "...",
  "os": "Comcast...",
  "osVersion": "...",
  ...
  "platformTenantCode": "...",
  "productCode": "...",
  "requestOrigin": "https://....com",
  "serverTimeUtc": 1617809474787,
  "serviceCode": "...",
  "serviceOriginated": false,
  "sessionId": "guid",
  "sessionSequence": 2,
  "subtype": "...",
  "tEventId": "...",
  ...
  "tRegion": "us-east-1",
  "timeZoneOffset": 5,
  "timestamp": 1617809473305,
  "traits": {
    "isp": "Comcast Cable",
    "organization": "..."
  },
  "type": "...",
  "userId": "guid",
  "version": "v1",
  "width": 1280,
  "xb3traceId": "guid"
}

我们只ObjectMapper用来解析 json 的一些字段。下面是这个Telemetry类的样子:

public class Telemetry {
    public String AppVersion;
    public String CountryCode;
    public String ClientId;
    public String DeviceSerialNumber;
    public String EventSource;
    public String SessionId;
    public TelemetrySubTypes SubType; //enum
    public String TRegion;
    public Long TimeStamp;
    public TelemetryTypes Type; //enum
    public String StateIso;
    
    ...
}

更新#3

资源

子任务选项卡

ID 收到的字节数 收到的记录 发送的字节数 发送的记录 地位
0 0乙 0 0乙 0 跑步
1 0乙 0 2.83 MB 15,000 跑步

水印标签

没有数据

窗户

子任务选项卡

ID 收到的字节数 收到的记录 发送的字节数 发送的记录 地位
0 1.80 MB 9,501 0乙 0 跑步
1 1.04 MB 5,499 0乙 0 跑步

水印

子任务 水印
1 无水印
2 无水印
4

2 回答 2

1

根据您提供的评论和更多信息,问题似乎在于两个 Flink 消费者不能从同一个分区消费。因此,在您的情况下,只有一个运算符的并行实例将从 kafka 分区消耗,而另一个将处于空闲状态。

通常 Flink 操作员会选择MIN([all_downstream_parallel_watermarks]),因此在您的情况下,一个 Kafka 消费者将产生正常的水印,而另一个将永远不会产生任何东西(flinkLong.Min在这种情况下假设),所以 Flink 将选择较低的一个Long.Min。因此,window 永远不会被触发,因为当数据流动时,永远不会生成水印之一。好的做法是在使用 Kafka 时使用与 Kafka 分区数量相同的并行性。

于 2021-05-18T17:24:59.773 回答
1

在与 AWS 人员进行了一次支持会议后,我们发现我们错过了在流媒体环境中设置时间特征。

  • TimeCharacteristic在 1.11.1中,默认值为IngestionTime.
  • 自 1.12.1(参见相关发行说明)以来,默认值为EventTime

在 Flink 1.12 中,默认的流时间特性已更改为 EventTime,因此您不再需要调用此方法来启用事件时间支持。

因此,在我们EventTime明确设置之后,它开始像魅力一样生成水印:

streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
于 2021-05-19T08:45:20.533 回答