0

我当前的 Flink 应用程序

  • 基于 Flink Stateful Function 3.1.1,从 Kafka 读取消息,处理消息,然后下沉到 Kafka Egress
  • 应用程序已按照指南部署在 K8s 上并且运行良好:Stateful Functions Deployment
  • 基于标准部署,我开启了kubernetes HA

我的目标

我想自动扩大/缩小有状态的功能。我也想知道如何创建更多的备用作业管理器

我对医管局的观察

我尝试kubernetes.jobmanager.replicasflink-configConfigMap 中设置:

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: shadow-fn
data:
  flink-conf.yaml: |+
    kubernetes.jobmanager.replicas: 7
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

我在 K8s 中没有看到备用作业管理器。

然后我直接调整deployment的replicas:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: statefun-master
spec:
  replicas: 7

备用作业经理出现。我查看 pod 日志,leader 选举成功。但是,当我在 Web 浏览器中访问 UI 时,它会显示:

{"errors":["Service temporarily unavailable due to an ongoing leader election. Please refresh."]}

我的方法有什么问题?

我关于缩放的问题

反应模式正是我所需要的。我试过但失败了,作业管理器有错误消息:

Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Reactive mode is configured for an unsupported cluster type. At the moment, reactive mode is only supported by standalone application clusters (bin/standalone-job.sh).

似乎不应该以这种方式完成有状态功能自动缩放。那么进行自动缩放的正确方法是什么?

潜在方法(可能不正确)

经过一番研究,我目前的方向是:

  1. Job Manger 与自动缩放无关。它与 K8s 上的 HA 有关。我只需要确保 Job Manager 具有正确的故障转移行为
  2. 我的有状态功能是Flink remote services,即它们是常规的k8s services。它们可以以 的形式部署KNative service以实现自动缩放。只有当 http 请求来自 Flink 的 worker 时,服务的副本才会增加
  3. 最重要的部分,Flink 的工人(或任务管理器)我还不知道如何进行自动缩放。也许我应该KNative用来部署 Flink 工作者?如果它不适用于 KNative,也许我应该完全改变 flink 运行时部署。例如,尝试原始的响应式演示。但恐怕 Stateuful 函数不打算那样工作。

最后

我一遍又一遍地阅读 Flink 文档和 Github 示例,但找不到更多信息来执行此操作。任何提示/说明/指南表示赞赏!

4

1 回答 1

0

由于反应模式是一项新的实验性功能,因此并非默认调度程序支持的所有功能也适用于反应模式(及其自适应调度程序)。Flink 社区正在努力解决这些限制。

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/

于 2022-01-28T08:54:44.443 回答