问题标签 [flink-statefun]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
51 浏览

apache-flink - Apache Flink 有状态函数中的负积压

Flink Statefun 中一个远程函数的 backlog 怎么可能是负数?

在我目前正在处理的管道上,当集群处于压力之下并且某些功能处于背压之下时,我会经常看到这种行为。然后积压的指标有时会变成负数并留在那里。下图显示了这种现象。

背压图

根据文档,指标 numBacklog 是“待发送消息的数量”。

我只是不明白这怎么可能是负面的。有人知道吗?

0 投票
0 回答
31 浏览

apache-flink - Flink statefun 应用程序可以运行在 Flink 会话集群上吗?

我可以在现有的 Flink 会话集群上运行 Flink statefun 应用程序吗?旧 statefun 版本(2.2)中的文档说这是可能的,但是该部分已从较新版本的文档中删除,所以想知道它是否仍然可能。

0 投票
1 回答
85 浏览

amazon-web-services - AWS 上的 Flink 有状态函数如何处理来自 Lambda 的背压/429 节流?

使用远程函数的Flink Stateful Functions涉及 Flink StateFun 集群,将计算任务的执行交给通过一些 FaaS 机制部署的远程工作者,例如 AWS Lambda。

AWS lambdas 受到扩展限制(它们可以扩展的速度和限制),如docs 中所述。请注意,并发的 Lambda 配额适用于账户,而不是每个单独的 lambda 函数。

在大规模流系统中,特别是如果单个 Lambda 调用执行的工作相对于该期间数据流中遇到的键的数量而言相对较长,可以想象 Flink StateFun 集群可能会遇到 Lambda 节流事件。换句话说,StateFun 集群在尝试通过 API 网关调用 Lambda 时会收到来自 API 网关的 429 错误,因为同时 Lambda 调用的数量已达到限制。

Flink 是如何处理这个问题的?它是否实现了退避/重试,以及如何处理数据流中事件的排序?

0 投票
1 回答
64 浏览

apache-flink - Flink Stateful Functions URL Path Template 支持哪些模板参数?

在部署 Flink Stateful Functions 时,需要指定函数的端点是什么,即 Flink 需要点击什么 URL 才能触发远程函数的执行。

文档状态:

URL 模板名称可能包含模板参数,这些参数是根据函数的特定类型填写的。例如,发送到消息类型 com.example/greeter 的消息将被发送到 http://bar.foo.com/greeter

支持哪些其他模板值urlPathTemplate?这些值来自哪里?

0 投票
1 回答
60 浏览

python - Python Flink有状态函数的Ingress上的Kafka Key访问

我一直在看 Flink Stateful Functions。它看起来非常有希望——除了一件事——我希望我只是想念它。

对于我的生活,看不到从 Python 中的 kafka 入口访问 kafka 密钥的方法。在 Java 中,我看到我可以使用反序列化器并将其有效地打包到解码的message对象中。但我找不到替代品。
在我们的例子中,键具有有价值的信息,但值中不存在。

有人遇到过这个 - 还是我错过了?

0 投票
1 回答
32 浏览

apache-flink - 使用通用 Kafka 摄取的自定义反序列化器

接下来(抱歉,有不同的用户):Kafka Key access on Ingress of a Python Flink Stateful function

我们的用例是我们使用 Kafka 标头作为跟踪和沿袭以及所需元数据的手段。看这个: https ://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/ io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java#L45-L61看起来像使用标准反序列化器,标头被丢弃。

实际上,我想要的是一种注入我自己的反序列化器的方法,该反序列化器将返回一条包含此消息和记录中的任何其他元数据的消息。我想添加类似 UniversalKafkaIngress 的东西,以便我可以使用远程模块对其进行配置。

查看代码,我可以看到我可以注册一个新的 ExtensionModule,并替换反序列化器(并创建一个自定义类型)。这是推荐的吗?如果是这样 - 是否有任何文档(如果没有,我如何配置 statefun 来获取它)?

或者,还有其他首选方法吗?

再次感谢...

0 投票
0 回答
26 浏览

apache-flink - 每个环境的有状态功能模块定义/配置

我们有一个简单的状态函数作业,调用 HTTP 端点(在 AWS 上通过弹性负载均衡器)并将结果推送到 Kafka。

  • 创建了一个要部署在独立集群(它不是 docker Image)上的 jar 文件,仅包括 module.yaml 文件。我们statefun-flink distribution在该 jar 文件中添加了 3.0.0 版本作为依赖项。

  • 将我们的作业配置中的 Entry 类设置为,org.apache.flink.statefun.flink.core.StatefulFunctionsJob并在资源文件夹中简单地保留一个 module.yaml 文件,用于 jar 文件中的模块配置。

我想在运行时通过命令行或任何其他允许在启动作业时传递所需模块定义(例如,module_dev_env.yaml)的方式设置模块定义。有可能这样做还是我应该选择不同的选择?

0 投票
1 回答
247 浏览

apache-flink - Flink StateFun high availability exception: "java.lang.IllegalStateException: There is no operator for the state ....."

I have 2 questions related to high availability of a StateFun application running on Kubernetes

Here are details about my setup:

  • Using StateFun v3.1.0
  • Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
  • Checkpointing mode is EXACTLY_ONCE
  • State backend is rocksdb and incremental checkpointing is enabled

1- I tried both Zookeeper and Kubernetes HA settings, result is the same (log below is from a Zookeeper HA env). When I kill the jobmanager pod, minikube starts another pod and this new pod fails when it tries to load last checkpoint:

I believe not being able to specify ids for Flink operators (as told here) when using StateFun is causing this. While it was working fine in the beginning, operators got some random id assigned and checkpointing went just fine. After the restart, the operators are assigned other random ids, and when the jobmanager (statefun master in this case) tries to load the state "2edd7b5dafb2c271440b25f6da5f4532" it fails to find the operator assigned to it originally.

Can someone confirm what I think is correct and / or give me directions for making my StateFun app work with high availability?

Another interesting thing to note is, after several restarts of the jobmanager pod with the above exception, it sometimes can get past the "Restoring job 00000000000000000000000000000000 from Checkpoint ..." line somehow (?), with "No master state to restore" log (link) - which makes me feel not sure about it really did recover or it just started discarding the state on last successful checkpoint. What might be causing this? Is it really recovering from the checkpoint successfully?

2- For Kubernetes deployments, on StateFun deployment documentation (link) Deployment type is used for jobmanager component. On the other hand Flink deployment documentation (Standalone / Kubernetes section) (link) uses Job type for jobmanager for high available setup (jobmanager-application-ha.yaml file)

Basically since Kubernetes will restart the pod on failures, either Job or Deployment can be used. But the thing is, when we try to stop the job with a savepoint and Deployment type is used, Kubernetes restarts the pod regardless of successful savepoint creation and success exit status (0).

Are we supposed not to stop StateFun apps with savepoint when running on Kubernetes? I am aware of the related bug (link) - but although it seems to be deprecated I can do a cancel with savepoint - are we supposed to just delete deployment as told in High availability data clean up section? (link)

UPDATE for the first question: I turned on debug logging and could capture a session with the exception and a successful startup in a row. The following is from the unsuccessful one:

and this is from the successful one:

It seems that generated hashes between two runs are computed differently.

0 投票
1 回答
84 浏览

flink-statefun - 从嵌入式 Java 函数调用远程 Python 函数时,我应该如何设置消息有效负载

我正在使用 Flink StateFun 3.1.0,并且想从嵌入式 Java 函数调用远程 Python 函数。在 StateFun 2.2 中,我可以通过我的(通过 Protobuf 生成的)Java 类的实例创建一个 ProtoBuf Any Any.pack(msg),这很有效。现在我明白了

嵌入式 Java 函数、远程 Java 函数和远程 Python 函数之间调用的例子很多,但是我没有找到从嵌入式 Java 调用远程 Python 的例子。

在远程 Java 示例中,使用Context具有send(Message)方法的函数调用该函数,我认为该方法也可用于调用远程 Python 函数,但我的嵌入式 Java 函数传递了不支持此方法的不同类型的上下文。

0 投票
1 回答
67 浏览

apache-flink - 需要关于从 Flink DataStream Job 迁移到 Flink Stateful Functions 3.1 的建议

我有一个基于 Flink Data Stream 的工作 Flink 作业。我想基于 Flink 有状态函数 3.1重写整个作业。

我目前的 Flink Job 的功能是:

  1. 阅读来自 Kafka 的消息
  2. 每个消息的格式都是一个数据包的切片,例如(s for slice):
    • s-0, s-1 用于数据包 0
    • s-4、s-5、s-6 用于数据包 1
  3. 该作业将切片合并为几个数据包,然后将数据包接收到 HBase
  4. 窗口函数用于处理切片到达的无序

我的目标

  • 目前我已经在我的 k8s 上运行了 Flink Stateful Functions 演示。我想在有状态函数上重写我的整个工作。
  • 将数据保存到 MinIO 而不是 HBase

我目前的计划

我已经阅读了文档并得到了一些想法。我的计划是:

我的问题

我对我的计划没有信心。我的理解/计划有什么问题吗?

有什么我应该参考的最佳实践吗?

更新:

窗口用于组装结果

  1. 获取一个切片,检查它的元数据并知道它是数据包的最后一个
  2. 也知道数据包应该包含 10 个切片
  3. 如果已经有 10 个切片,则合并它们
  4. 如果还没有足够的切片,请等待一段时间(例如 10 分钟),然后合并或记录数据包错误。

我想在重写期间摆脱 windows,但我不知道如何