问题标签 [flink-statefun]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
41 浏览

java - 如何访问 Flink statefun 函数

flink playground 给出了一个演示,如下所示:

如何通过http请求访问该功能?发布?得到?

0 投票
0 回答
34 浏览

apache-flink - Flink Stateful Functions 没有重启任务管理器

我正在尝试重新部署应用程序,但在 Flink 版本:1.10.1 上出现以下问题而失败。

以下是我在部署应用程序时获得的应用程序配置和错误日志。直到有时,此配置都可以正常工作。直到最近它才开始失败。

我已经检查了 S3 目录中的检查点,它包含最新日期为 1 月 6 日的文件夹和元数据。之后它没有任何数据。

RocksDB 被配置为未恢复的后端。我们在 Kubernetes 集群上部署我们的应用程序。我相信 RockDB 是和 Flink 一起打包的,除了指定 Flink 配置外,我们真的不需要做任何事情。

向更广泛的受众投放,以便专门在 K8s 环境中遇到相同问题的人可以分享他们的经验

0 投票
1 回答
47 浏览

apache-flink - 如何在 K8s 上自动扩展/缩减 Flink 有状态函数

我当前的 Flink 应用程序

  • 基于 Flink Stateful Function 3.1.1,从 Kafka 读取消息,处理消息,然后下沉到 Kafka Egress
  • 应用程序已按照指南部署在 K8s 上并且运行良好:Stateful Functions Deployment
  • 基于标准部署,我开启了kubernetes HA

我的目标

我想自动扩大/缩小有状态的功能。我也想知道如何创建更多的备用作业管理器

我对医管局的观察

我尝试kubernetes.jobmanager.replicasflink-configConfigMap 中设置:

我在 K8s 中没有看到备用作业管理器。

然后我直接调整deployment的replicas:

备用作业经理出现。我查看 pod 日志,leader 选举成功。但是,当我在 Web 浏览器中访问 UI 时,它会显示:

我的方法有什么问题?

我关于缩放的问题

反应模式正是我所需要的。我试过但失败了,作业管理器有错误消息:

似乎不应该以这种方式完成有状态功能自动缩放。那么进行自动缩放的正确方法是什么?

潜在方法(可能不正确)

经过一番研究,我目前的方向是:

  1. Job Manger 与自动缩放无关。它与 K8s 上的 HA 有关。我只需要确保 Job Manager 具有正确的故障转移行为
  2. 我的有状态功能是Flink remote services,即它们是常规的k8s services。它们可以以 的形式部署KNative service以实现自动缩放。只有当 http 请求来自 Flink 的 worker 时,服务的副本才会增加
  3. 最重要的部分,Flink 的工人(或任务管理器)我还不知道如何进行自动缩放。也许我应该KNative用来部署 Flink 工作者?如果它不适用于 KNative,也许我应该完全改变 flink 运行时部署。例如,尝试原始的响应式演示。但恐怕 Stateuful 函数不打算那样工作。

最后

我一遍又一遍地阅读 Flink 文档和 Github 示例,但找不到更多信息来执行此操作。任何提示/说明/指南表示赞赏!

0 投票
1 回答
42 浏览

apache-flink - 有状态函数中的时间特性

我一般理解事件时间使用水印及时取得进展。对于更多基于迭代的 Flink Statefun 来说,这可能是个问题。所以我的问题是如果我使用延迟消息(https://nightlies.apache.org/flink/flink-statefun-docs-stable/docs/sdk/java/#sending-delayed-messages),那么这是否意味着我们可以在有状态函数中仅使用处理时间概念吗?

我想更改为事件时间处理模型,但不确定它如何与有状态函数一起使用。

有人可以解释一下吗?

0 投票
0 回答
36 浏览

apache-flink - 入口配置为带有 Flink statefun 的字符串

我想要做什么

一旦按照python 演练,我正在尝试修改module.yaml文件,因此入口和出口不是Protobuf而是String。我并没有真正修改大多数文件,只是module.yaml尝试配置字符串入口并且greeter.py不考虑状态或 protobuf 消息,只打印从入口接收的输入。

项目的架构没有改变:

使用的配置文件和python应用程序:

码头工人-compose.yml

模块.yaml

迎宾员.py

错误

运行docker-compose up -d --build flink master后停止并出现下一个错误:

我不知道这个异常Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no routers defined.是否是主要问题以及为什么会发生。

0 投票
1 回答
17 浏览

kotlin - 如何聚合窗口会话 flink 中的元素?

当一段时间内没有收到元素时,我正在使用 flink Session 窗口,即;当发生不活动间隙时,它应该发出一个事件。

我在 flink 作业中将间隙配置为 10 秒。我发送了 event1 并在 5 秒后发送了 event2。这两个事件应该属于第一个窗口。输出应该是这两个事件的聚合。但我只得到第一个事件。

下面是我试过的代码:

然后 MyProcessWindowFunction 看起来像

我得到了唯一的 event1,但我想获得这两个事件的聚合(我发送了 event1 和 event2)。

我们如何获得会话中可用事件的聚合?

0 投票
0 回答
25 浏览

apache-flink - 具有远程功能的 Flink MapState

所以,我正在尝试使用远程函数来实现 flink 的 mapState。在 mapState 中,我将存储大约 1000 个用一个键映射的值。有人可以指出我为远程功能实现它的方向吗?我找到了带有嵌入式功能的文章https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/

另外,当我从远程功能访问它时,我的整个状态会在每个请求中传输吗?如果是,有什么办法可以避免每次我请求时都进行大量数据传输?

0 投票
0 回答
13 浏览

apache-flink - 有没有办法在 Flink 有状态函数的 Kafka Ingress 中自定义一个 Kafka Deserializer?

在我的项目中,我将使用 flink statefun kafka ingress 来消耗来自 kafka 的 avro 序列化记录,但似乎没有配置参数供用户指定用于反序列化 kafka 记录密钥的反序列化器,在源代码RoutableKafkaIngressDeserializer中,我还注意到密钥直接从字节数组转换为字符串。

我尝试通过kafka消费者属性key.deserializer指定反序列化器,但是根据flink kafka附加属性不是可行的解决方案

我发现可以通过 datastream API Kafka deserializer 指定 kafka 反序列化器,不幸的是,这个配置目前在 flink statefun kafka ingress 中似乎不可用。