问题标签 [flink-batch]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - Flink 短作业不导出 prometheus job_name 字段
[描述]
我在 Kubernetes 上运行 Flink 1.11.1,并使用 Prometheus 和 Grafana 设置监控堆栈。
我观察到在 Flink Cluster 上运行 WordCount 示例(通过 UI 提交)不会$(job_name)
在 prometheus 上返回。
为了排除故障,我下载了 flink 示例 WordCount 作业并使用Thread.sleep()
. 如下面的屏幕截图所示,我运行了这项工作,然后运行了更长的版本。
prometheus 上只有第二次运行(更长的作业)导出$(job_name)
字段,如下图所示 Grafana 仪表板屏幕截图 ( label_values(job_name)
)。这暗示较短的运行作业不会导出给定字段。
我还尝试使用 Flink 建议的设置运行 pushgateway 导出器,结果与上述相同。
[问题]
有没有办法从短期运行的作业中收集 job_name 指标,我的设置是否错误?还是由于普罗米修斯的刮擦间隔而无法实现?谢谢你。
apache-flink - 我可以从我的 flink 流应用程序中公开一个端点吗
我想从我的 flink 流应用程序中公开一个端点。它返回一些关于应用程序的静态元数据。有什么可能的方法来实现这一点。请帮忙
apache-flink - Flink 中使用定义的聚合函数 - 未找到函数签名的匹配项
我想在 Flink中的“ Select .. From .. GROUP BY .. ”查询中保留每个键的所有原始行。我定义了一个名为 RowToJsonAgg的AggregateFunction ,它将行聚合成一个 Json 字符串。
// 假设行看起来像“$field1_name, $field1_value, $field2_name, $field2_value, ...” // 尝试从行生成 json。然而,当我运行查询时,Flink 似乎找不到这个函数}
数据类和查询如下所示:
当我运行应用程序时,我得到了 SQL 验证异常,详细消息是“未找到函数签名 row_to_json_agg(CHARACTER, NUMERIC, CHARACTER, NUMERIC) 的匹配项”
似乎 Flink 找不到正确的累积函数来调用。
如果我将累积函数声明如下
并改变了查询
我遇到了同样的异常,消息是“找不到函数签名 row_to_json_agg(NUMERIC,NUMERIC) 的匹配项”
任何想法如何使聚合函数工作?
flink-batch - scala中的flink批处理(无输出)
我正在 flink scala 中编写一个简单的程序,它显示消费(csm)超过某个阈值(100)的人的姓名和客户的 ID。程序的编译成功完成但是我有以下异常并且我没有看到输出。我正在使用 IDE IntelliJ。
任何帮助..谢谢。
apache-flink - Flink Elasticsearch sink 成功处理程序
我使用 Flink Elasticsearch sink 将记录批量插入到 ES。
我想在记录成功同步到 Elasticsearch 后做一个操作。有一个 failureHandler ,我们可以通过它重试失败。flink elasticsearch sink中是否有successHandler?
注意:在将记录添加到批量处理器之前我无法执行此操作,因为不能保证记录与 ES 同步?我只想在记录同步到 Elasticsearch 后进行操作。
kubernetes - 应用完成后清理 Kubernetes 上的 Flink 应用集群的最佳实践
我们在 Kubernetes 上以 Application 模式运行 Flink 作业,问题是当作业完成/停止时,作业管理器容器将退出,但 1. 任务管理器的部署 2. 作业管理器服务 3. configMap 仍然存在,除非我们运行 kubectl delete 来清理它。
如果我们手动停止作业,这没什么大不了的,但是如果我们的 Flink 作业是一个稍后会完成的批处理作业,这意味着我们需要一个外部服务来持续监控作业管理器容器并在它完成时清理剩余资源完成,这不是很实用。
我想知道这里的最佳做法是什么?我们是否支持在 Kubernetes 上运行 Flink 批处理作业?如果是,那么 Flink 作业本身应该有一种方法可以在完成时清理所有内容,对吗?
scala - 如何使用 Flink 过滤具有公共字段(但不同模式)的镶木地板文件
我有一个文件夹,其中包含具有不同模式的镶木地板文件,它们都有一个保证存在的公共字段。我想根据该字段过滤行并将其写回其他镶木地板文件。
spark中的类似动作将相当简单,看起来像
问题是,如果我要扩展 ParquetInputFormat,我还必须提供可能不同的模式
或使用这样的源函数:
我对后者的问题是我无法获取原始数据
有任何想法吗 ?
apache-flink - 跨 flink 任务管理器平均分配任务槽
我有一个 flink 工作(2 个任务管理器),工作并行度为 64,任务槽为 64。
我将其中一个运算符的并行度设置为 16。此运算符(16 个并行度)插槽未均匀分布在两个任务管理器中。它通常需要更高的任务槽,例如一个任务管理器中的 10/11 和另一个任务管理器中的 5/6。
我正在使用 flink 版本 1.11.2。我尝试添加 cluster.evenly-spread-out-slots: true 但它不起作用。非常感谢任何解决方案
apache-flink - 每次提交作业超过休息后,作业管理器中的 flink 磁盘使用量都会增加
我已经在 AWS ECS 中部署了自己的 flink 设置。一项服务用于 JobManager,一项服务用于任务管理器。我正在为作业管理器运行一项 ECS 任务,为 TASK 管理器运行 3 个 ecs 任务。
我有一种批处理作业,我每天使用 flink rest 上传并更改新参数,当我每次提交磁盘内存增加约 600MB 时,我将检查点设置为 S3 。我还设置了historyserver.archive.clean-expired-jobs true 。
由于我在 ECS 上运行,因此无法找到每次 jar 上传和执行时内存增加的原因。
我应该查看哪些 flink 配置参数以确保每次上传新作业时内存都不会增加?
apache-flink - Flink SQL 流记录从 CSV 到 Kafka
我有一个将记录从 CSV 推送到 Kafka 的 flink sql 作业。csv 有 100,000 条记录。一旦移动了 95,000 条记录,该作业就会长时间处于运行状态。除此之外,剩下的 5000 条记录根本不会被推送。我不确定为什么它不起作用,我确实应用了多个任务管理器但仍然没有用。我在一个带有 VM 的单 Flink 集群上运行。它基于码头工人。
顺便说一句,在将记录从 CSV 推送到 Elasticsearch 时也会观察到相同的行为。它挂在 95,000+,没有完成 100,000。
这是查询
源表:CSV 文件系统
目标表:Kafka 主题
Flink SQL 插入查询以将记录从 CSV 推送到 Kafka。
这是来自 Apache Flink Webdashboard 的图片
这是来自 Kafka Kafdrop 的图片
这是 flink-conf.yaml