问题标签 [flink-batch]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
36 浏览

apache-flink - Flink sink 向 DB 提交数据,然后向 Kafka 提交数据

源(从kafka读取)->keyby()-> window()-> transform(agg)-> dbSink(提交到数据库)。这工作正常。

现在我有一个用例,我需要将提交给 DB 的数据写入另一个 kafka 主题以进行一些事件处理。如果提交到数据库失败,这些记录将被删除。我需要的是:source(从kafka读取)->keyby()-> window()-> transform(agg)-> dbSink(提交到数据库)->keyby()->window()->kafkaSink() .

接收器运营商不支持 Sideouputs。我能想到的一件事:将 dbSink 转换为 flatMap()。在此 CustomDbFlatMap 中,提交到存储库,然后添加到收集器。这是实现这一目标的唯一方法吗?

0 投票
2 回答
139 浏览

java - 无法在 FileProcessing.PROCESS_CONTINUOS 模式下读取(文本)文件

我需要从特定路径连续读取文件。

意味着 flink 作业应该不断地轮询指定的位置,并以一定的时间间隔读取将到达该位置的文件。

示例:我在 Windows 机器上的位置是 C:/inputfiles 在下午 2:00 获取文件 file_1.txt,在下午 2:30 获取文件_2.txt,在下午 3:00 获取 file_3.txt。

我用下面的代码对其进行了试验。

现在为了在 flink 集群上进行测试,我使用 flink 的 1.9.2 版本启动了 flink 集群,并且我能够实现每隔一段时间连续读取文件的目标。

注意:Flink 的 1.9.2 版本可以在 windows 机器上启动集群。

但是现在我必须将 flink 的版本从 1.9.2 升级到 1.12 。我们使用 docker 在 1.12 上启动集群(与 1.9.2 不同)。

与 Windows 路径不同,我根据 docker 位置更改了文件位置,但上面的程序没有在那里运行。

此外:访问文件不是问题。意味着如果我在开始作业之前放置文件,那么该作业会正确读取这些文件,但如果我在运行时添加任何新文件,则它不会读取这些新添加的文件。

需要帮助才能找到解决方案。

提前致谢。

0 投票
1 回答
45 浏览

java - 如何在读取csv文件时停止flink读取重复数据

我想先通过下面的场景来解释我的问题陈述。

场景: 我正在使用 flink+java8 的 flink 的 PROCESS_CONTINOUS 模式进行连续文件读取。

这实际上是一种批量读取功能,其中不同的文件将在一天的不同时间收到。因此,假设 file_1.csv 在下午 3:00 到达,那么我的 flink 作业将读取此文件。file-2.csv 再次在下午 3:30 到达,然后 flink 作业也将读取此文件,并且该过程将继续以这种方式工作,直到作业停止。我们将这些数据下沉到 Kafka。

问题: 当我重新启动 flink 作业时,它开始读取所有先前读取的文件的数据。这意味着我在重新启动作业时一次又一次地获得相同的记录。

有没有办法防止数据重复?

0 投票
0 回答
50 浏览

elasticsearch - Flink 向 ES 写入数据报错:request retries exceeded max retry timeout 30000

Flink 的 Parallelism 设置为 1 时不报,但是太慢了,12 小时跑了 300 万条数据。Parallelism,设置为默认8,会报错:request retries exceeded max retry timeout 30000

0 投票
0 回答
35 浏览

java - 有什么方法可以在 flink 中获取 CSV HEADERS/COLUMNS 详细信息

我需要获取 cvs 标头详细信息,例如标头名称、标头计数、标头数据类型。

此外,这个 csv 文件在不同的时间会有所不同。所以我想制作一个通用映射器,它可以读取任何文件,提取标题并映射到通用映射器。

示例:我有一个 csv 文件说 file1.csv,其中 3 列/标题说名称、地址、国家。在读取这个 csv 文件时,这三列将被映射到一个通用映射器或 pojo,比如 CommonCsvAttribute。

现在,一旦我有另一个 csv 文件说 file2.csv,它有 5 列说部门 ID、部门名称、部门位置、部门负责人、部门计数。在阅读此 csv 文件时,这 5 列/标题应映射到相同的 pojo CommonCsvAttribute。

将来,如果其他文件带有不同数量的列/标题,则应将其映射到相同的映射器 pojo CommonCsvAttribute。

为了实现这一点,我正在考虑动态读取标题/列,然后对其进行映射。

那么有什么方法可以在 flink 中读取这些标题/列详细信息吗?

0 投票
1 回答
48 浏览

apache-flink - Flink:在DataStream API的批处理模式下左连接相当于Dataset API?

Flink 文档中已经提到,DataSet API 将来会被弃用。因此,我正在研究以批处理模式(我相信现在处于 Beta 版)迁移的Dataset APIDataStream API 的原型。

我们的代码库中有这个(类似的)代码,它在数据集上使用leftOuterJoin

问题是我无法在 Datastream API docs - Join中找到Left JoinLeft Outer Join等效项。

由于他们正在考虑完全弃用 DataSet API,我假设现在应该有一种方法可以在 DataStream API 中执行此 Left Outer Join。

有人可以指导我以正确的方式做到这一点吗?TIA

0 投票
1 回答
296 浏览

java - 如何在没有更新和删除更改错误的情况下在 flink 中写入 s3 表接收器?

考虑一个代码:

这会给我错误:

org.apache.flink.client.program.ProgramInvocationException:主要方法导致错误:表接收器'default_catalog.default_database.output_table'不支持消费节点GroupAggregate产生的更新更改(select = [MIN($ f0) AS id, MAX(createdDate) AS created_date, COUNT(DISTINCT $f2) AS count_value ])

有没有办法通过 flink 将聚合写入 s3?(flink 以批处理模式运行)

0 投票
0 回答
108 浏览

java - Apache Flink 1.14.0 - 提交作业时出错

我是 Flink 的新手。我已经在 AWS 服务器上完成了 Flink 1.14.0 的独立安装,并在 java 1.8 中编写了一个简单的工作。

当我运行代码时,我在 Flink 运行时收到以下错误

我的工作是多次重试提交工作,最后以另一条消息停止

注意:我检查了我的应用程序和服务器中的 flink 和 java 版本。一切都一样(Flink 1.4.0 & java 1.8)

0 投票
1 回答
30 浏览

apache-flink - Apache Flink - 匹配连续模式中具有不同值的字段

考虑一个用例,我们需要找到攻击模式,例如从同一设备和相同的用户名登录 10 次失败,然后从不同的设备但相同的用户名成功登录。这应该在 10 分钟内发生。

假设我们有 10 个登录失败的 Windows 事件,其中用户 A 作为用户名,B 作为设备名,并且我们从用户 A 使用不同的设备 C 成功登录,我们应该发出警报。请让我知道如何使用 flink CEP 来解决此案。

0 投票
0 回答
51 浏览

java - ververica 上 flink 中的 Azure blob 存储凭据配置给出 java.lang.ClassNotFoundException

我正在通过 flink 管道访问 azure blob 存储。

根据 flink 文档https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/ 有两种方法可以实现这一点。

1) fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

我实施了这种方法,但根据我们的组织安全策略,硬编码访问密钥不是建议的方式。所以这种方法没有帮助。

2)fs.azure.account.keyprovider.<account_name>.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider

**a)**我们正在使用这种方法将运行作业的检查点和保存点保存在 azure blob 存储上,比如 storage1 。表示这种方法(或键值对组合)已经在使用。

**b)**现在我们需要将 csv/text/xml 文件保存在不同的 blob 存储上,例如 storage2。

为了访问这个 blob 存储帐户,我需要提供访问密钥,并且这需要通过我在第 a 点中提到的相同方式通过配置进行访问。

为此,我创建了一个特定于应用程序的类,其内部逻辑(环境变量除外)与 EnvironmentVariableKeyProvider 相同。

我在deployment.ym l 中声明了如下配置

现在,当我的应用程序尝试访问此 fs.azure.account.keyprovider.storage2.blob.core.windows.net 属性时,它给了我以下错误。

似乎 flink 无法加载用户定义的类。

有没有办法加载这个用户定义的 MyAppEnvironmentVariableKeyProvider 类。