问题标签 [flink-statefun]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 如何访问 Flink statefun 函数
flink playground 给出了一个演示,如下所示:
如何通过http请求访问该功能?发布?得到?
apache-flink - Flink Stateful Functions 没有重启任务管理器
我正在尝试重新部署应用程序,但在 Flink 版本:1.10.1 上出现以下问题而失败。
以下是我在部署应用程序时获得的应用程序配置和错误日志。直到有时,此配置都可以正常工作。直到最近它才开始失败。
我已经检查了 S3 目录中的检查点,它包含最新日期为 1 月 6 日的文件夹和元数据。之后它没有任何数据。
RocksDB 被配置为未恢复的后端。我们在 Kubernetes 集群上部署我们的应用程序。我相信 RockDB 是和 Flink 一起打包的,除了指定 Flink 配置外,我们真的不需要做任何事情。
向更广泛的受众投放,以便专门在 K8s 环境中遇到相同问题的人可以分享他们的经验
apache-flink - 如何在 K8s 上自动扩展/缩减 Flink 有状态函数
我当前的 Flink 应用程序
- 基于 Flink Stateful Function 3.1.1,从 Kafka 读取消息,处理消息,然后下沉到 Kafka Egress
- 应用程序已按照指南部署在 K8s 上并且运行良好:Stateful Functions Deployment
- 基于标准部署,我开启了kubernetes HA
我的目标
我想自动扩大/缩小有状态的功能。我也想知道如何创建更多的备用作业管理器
我对医管局的观察
我尝试kubernetes.jobmanager.replicas
在flink-config
ConfigMap 中设置:
我在 K8s 中没有看到备用作业管理器。
然后我直接调整deployment的replicas:
备用作业经理出现。我查看 pod 日志,leader 选举成功。但是,当我在 Web 浏览器中访问 UI 时,它会显示:
我的方法有什么问题?
我关于缩放的问题
反应模式正是我所需要的。我试过但失败了,作业管理器有错误消息:
似乎不应该以这种方式完成有状态功能自动缩放。那么进行自动缩放的正确方法是什么?
潜在方法(可能不正确)
经过一番研究,我目前的方向是:
- Job Manger 与自动缩放无关。它与 K8s 上的 HA 有关。我只需要确保 Job Manager 具有正确的故障转移行为
- 我的有状态功能是
Flink remote services
,即它们是常规的k8s services
。它们可以以 的形式部署KNative service
以实现自动缩放。只有当 http 请求来自 Flink 的 worker 时,服务的副本才会增加 - 最重要的部分,Flink 的工人(或任务管理器)我还不知道如何进行自动缩放。也许我应该
KNative
用来部署 Flink 工作者?如果它不适用于 KNative,也许我应该完全改变 flink 运行时部署。例如,尝试原始的响应式演示。但恐怕 Stateuful 函数不打算那样工作。
最后
我一遍又一遍地阅读 Flink 文档和 Github 示例,但找不到更多信息来执行此操作。任何提示/说明/指南表示赞赏!
apache-flink - 有状态函数中的时间特性
我一般理解事件时间使用水印及时取得进展。对于更多基于迭代的 Flink Statefun 来说,这可能是个问题。所以我的问题是如果我使用延迟消息(https://nightlies.apache.org/flink/flink-statefun-docs-stable/docs/sdk/java/#sending-delayed-messages),那么这是否意味着我们可以在有状态函数中仅使用处理时间概念吗?
我想更改为事件时间处理模型,但不确定它如何与有状态函数一起使用。
有人可以解释一下吗?
apache-flink - 入口配置为带有 Flink statefun 的字符串
我想要做什么
一旦按照python 演练,我正在尝试修改module.yaml
文件,因此入口和出口不是Protobuf而是String。我并没有真正修改大多数文件,只是module.yaml
尝试配置字符串入口并且greeter.py
不考虑状态或 protobuf 消息,只打印从入口接收的输入。
项目的架构没有改变:
使用的配置文件和python应用程序:
码头工人-compose.yml
模块.yaml
迎宾员.py
错误
运行docker-compose up -d --build
flink master后停止并出现下一个错误:
我不知道这个异常Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no routers defined.
是否是主要问题以及为什么会发生。
kotlin - 如何聚合窗口会话 flink 中的元素?
当一段时间内没有收到元素时,我正在使用 flink Session 窗口,即;当发生不活动间隙时,它应该发出一个事件。
我在 flink 作业中将间隙配置为 10 秒。我发送了 event1 并在 5 秒后发送了 event2。这两个事件应该属于第一个窗口。输出应该是这两个事件的聚合。但我只得到第一个事件。
下面是我试过的代码:
然后 MyProcessWindowFunction 看起来像
我得到了唯一的 event1,但我想获得这两个事件的聚合(我发送了 event1 和 event2)。
我们如何获得会话中可用事件的聚合?
apache-flink - 具有远程功能的 Flink MapState
所以,我正在尝试使用远程函数来实现 flink 的 mapState。在 mapState 中,我将存储大约 1000 个用一个键映射的值。有人可以指出我为远程功能实现它的方向吗?我找到了带有嵌入式功能的文章https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/。
另外,当我从远程功能访问它时,我的整个状态会在每个请求中传输吗?如果是,有什么办法可以避免每次我请求时都进行大量数据传输?
apache-flink - 有没有办法在 Flink 有状态函数的 Kafka Ingress 中自定义一个 Kafka Deserializer?
在我的项目中,我将使用 flink statefun kafka ingress 来消耗来自 kafka 的 avro 序列化记录,但似乎没有配置参数供用户指定用于反序列化 kafka 记录密钥的反序列化器,在源代码RoutableKafkaIngressDeserializer中,我还注意到密钥直接从字节数组转换为字符串。
我尝试通过kafka消费者属性key.deserializer指定反序列化器,但是根据flink kafka附加属性不是可行的解决方案
我发现可以通过 datastream API Kafka deserializer 指定 kafka 反序列化器,不幸的是,这个配置目前在 flink statefun kafka ingress 中似乎不可用。