我当前的 Flink 应用程序
- 基于 Flink Stateful Function 3.1.1,从 Kafka 读取消息,处理消息,然后下沉到 Kafka Egress
- 应用程序已按照指南部署在 K8s 上并且运行良好:Stateful Functions Deployment
- 基于标准部署,我开启了kubernetes HA
我的目标
我想自动扩大/缩小有状态的功能。我也想知道如何创建更多的备用作业管理器
我对医管局的观察
我尝试kubernetes.jobmanager.replicas
在flink-config
ConfigMap 中设置:
---
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: shadow-fn
data:
flink-conf.yaml: |+
kubernetes.jobmanager.replicas: 7
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
我在 K8s 中没有看到备用作业管理器。
然后我直接调整deployment的replicas:
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-master
spec:
replicas: 7
备用作业经理出现。我查看 pod 日志,leader 选举成功。但是,当我在 Web 浏览器中访问 UI 时,它会显示:
{"errors":["Service temporarily unavailable due to an ongoing leader election. Please refresh."]}
我的方法有什么问题?
我关于缩放的问题
反应模式正是我所需要的。我试过但失败了,作业管理器有错误消息:
Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Reactive mode is configured for an unsupported cluster type. At the moment, reactive mode is only supported by standalone application clusters (bin/standalone-job.sh).
似乎不应该以这种方式完成有状态功能自动缩放。那么进行自动缩放的正确方法是什么?
潜在方法(可能不正确)
经过一番研究,我目前的方向是:
- Job Manger 与自动缩放无关。它与 K8s 上的 HA 有关。我只需要确保 Job Manager 具有正确的故障转移行为
- 我的有状态功能是
Flink remote services
,即它们是常规的k8s services
。它们可以以 的形式部署KNative service
以实现自动缩放。只有当 http 请求来自 Flink 的 worker 时,服务的副本才会增加 - 最重要的部分,Flink 的工人(或任务管理器)我还不知道如何进行自动缩放。也许我应该
KNative
用来部署 Flink 工作者?如果它不适用于 KNative,也许我应该完全改变 flink 运行时部署。例如,尝试原始的响应式演示。但恐怕 Stateuful 函数不打算那样工作。
最后
我一遍又一遍地阅读 Flink 文档和 Github 示例,但找不到更多信息来执行此操作。任何提示/说明/指南表示赞赏!