问题标签 [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
2693 浏览

google-cloud-dataflow - 如何为我的数据流指定工作人员数量?

我有一个 Apache Beam 管道,可以加载大约 90GB 的大型导入文件。我已经在 Apache Beam Java SDK 中编写了管道。

使用 的默认设置PipelineOptionsFactory,我的工作需要很长时间才能完成。

我如何控制并以编程方式指定我的工作的并行度,从而指定工人的数量?

0 投票
2 回答
630 浏览

java - 即使我正在传递视图,“使用未知视图调用 sideInput()”异常?

代码是这样开始的:

声明了这些侧输入,为什么 Dataflow 看不到它们?

0 投票
1 回答
922 浏览

google-cloud-dataflow - 从 BigQuery 读取 Cloud Dataflow 作业在开始前卡住

在运行任何应用程序逻辑之前,我有一个 Cloud Dataflow 作业卡在启动阶段。我通过在步骤内添加日志输出语句对此进行了测试processElement,但它没有出现在日志中,因此似乎没有达到。

我在日志中只能看到以下消息,每分钟出现一次:

logger: Starting supervisor: /etc/supervisor/supervisord_watcher.sh: line 36: /proc//oom_score_adj: Permission denied

这些每隔几秒钟循环一次:

VM is healthy? true.

http: TLS handshake error from 172.17.0.1:38335: EOF

Job is in state JOB_STATE_RUNNING, will check again in 30 seconds.

作业 ID 是2015-09-14_06_30_22-15275884222662398973,尽管我还有另外两个作业 ( 2015-09-14_05_59_30-11021392791304643671, 2015-09-14_06_08_41-3621035073455045662),它们是我早上开始的并且有相同的问题。

关于可能导致这种情况的任何想法?

0 投票
2 回答
23467 浏览

apache-beam - 什么是 Apache Beam?

我在浏览 Apache 的帖子时发现了一个名为 Beam 的新术语。谁能解释一下 Apache Beam 到底是什么?我试图用谷歌搜索,但无法得到明确的答案。

0 投票
1 回答
75 浏览

java - Dataflow JAVA SDK:以代码为输入,后台处理

请支持我了解以下场景的实现。

假设用户在前端的文本框中键入使用数据流 SDK 命令编写的代码。
我们需要获取该代码(假设为字符串)并在后端执行。
数据流 SDK 是否提供了像执行管理器这样的工具来做这样的事情?
也非常感谢一些资源来熟悉这种实现。

0 投票
2 回答
1124 浏览

google-cloud-dataflow - 如何从 Flink 运行器上的 Google Dataflow (Apache Beam) 向 Kafka 发送消息

我正在尝试编写一个概念验证,它从 Kafka 获取消息,使用 Flink 上的 Beam 对其进行转换,然后将结果推送到不同的 Kafka 主题。

我使用 KafkaWindowedWordCountExample 作为起点,这是我想做的第一部分,但它输出到文本文件而不是 Kafka。FlinkKafkaProducer08 看起来很有希望,但我不知道如何将它插入管道。我在想它会用 UnboundedFlinkSink 或类似的东西包裹起来,但这似乎不存在。

关于我想要做什么的任何建议或想法?

我正在运行最新的孵化器光束(截至昨晚来自 Github)、集群模式下的 Flink 1.0.0 和 Kafka 0.9.0.1,所有这些都在 Google Compute Engine(Debian Jessie)上。

0 投票
0 回答
1768 浏览

apache-kafka - 使用 FlinkKafkaProducer/FlinkKafkaConsumer 序列化和反序列化 avro 数据的问题

我在通过 Beam读取FlinkKafkaProducer/写入 Avro 数据时遇到了一些问题。FlinkKafkaConsumer

如果有人可以指出FlinkKafkaProducerFlinkKafkaConsumer使用 Avro 模式的工作示例(不使用 Kafka 的融合版) ,那就太好了

A)BeamKafkaFlinkAvroProducerTest(生产者)

如果我直接使用 KafkaProducer(即调用productSimpleData),一切正常(仅用于测试)。通过以下步骤使用FlinkKafkaProduceras UnboundedSource (这是我应该做的)(即我称之为generateAvroData2 ):

  1. 首先,如果我使用AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);

    即基本上使用 Avro 的org.apache.avro.specific.SpecificDatumWriter;我面临以下错误:

    /li>
  2. 接下来,如果我使用TypeInformationSerializationSchema(不管管道中的 AvroCoder 是什么),事情显然可以正常工作,因为 Kafka 测试消费者工具会打印消息:

    /li>

B)BeamKafkaFlinkAvroConsumerTest(消费者)

我知道我们应该TypeInformationSerializationSchema在消费者和生产者中使用,或者应该分别在消费者和生产者中使用AvroDeserializationSchemaAvroSerializationSchema

但是,无论使用AvroDeserializationSchemaor TypeInformationSerializationSchema,我都会收到以下异常:


可能缺少一些非常基本的东西。所有代码都在这里

0 投票
1 回答
2623 浏览

java - 如何在 Dataflow/Beam 中将流数据与大型历史数据集相结合

我正在调查通过 Google Dataflow/Apache Beam 处理来自网络用户会话的日志,并且需要将用户的日志(流式传输)与上个月的用户会话历史记录结合起来。

我研究了以下方法:

  1. 使用 30 天固定窗口:最有可能大的窗口适合内存,我不需要更新用户的历史记录,只需参考即可
  2. 使用 CoGroupByKey 连接两个数据集,但两个数据集必须具有相同的窗口大小(https://cloud.google.com/dataflow/model/group-by-key#join),这在我的案例(24 小时 vs 30 天)
  3. 使用 Side Input 检索给定elementin的用户会话历史记录processElement(ProcessContext processContext)

我的理解是通过加载的数据.withSideInputs(pCollectionView)需要适合内存。我知道我可以将单个用户的所有会话历史记录到内存中,但不能将所有会话历史记录。

我的问题是,是否有办法从仅与当前用户会话相关的侧面输入加载/流式传输数据?

我正在想象一个 parDo 函数,它将通过指定用户的 ID 从侧面输入加载用户的历史会话。但是只有当前用户的历史会话才能放入内存;通过侧面输入加载所有历史会话会太大。

一些伪代码来说明:

0 投票
1 回答
541 浏览

google-cloud-dataflow - 实时流水线反馈回路

我有一个包含可能损坏/恶意数据的数据集。数据带有时间戳。我正在使用启发式函数对数据进行评级。一段时间后,我知道所有带有一些 ID 的新数据项都需要被丢弃,它们代表了很大一部分数据(高达 40%)。

现在我有两个批处理管道:

  1. 第一个只是对数据进行评级。
  2. 第二个首先过滤掉损坏的数据并运行分析。

我想从批处理模式(例如,每天运行)切换到在线处理模式(希望延迟 < 10 分钟)。

第二个管道使用全局窗口,使处理变得容易。当检测到损坏的数据键时,所有其他记录都被简单地丢弃(也很容易使用前几天丢弃的键作为预过滤器)。此外,它可以更轻松地对输出数据做出决策,因为在处理给定键的所有历史数据期间都是可用的。

主要问题是:我可以在数据流 DAG 中创建循环吗?假设我想累积给我处理的每个会话窗口的质量率,如果速率总和超过 X,管道早期阶段的一些过滤器功能应该过滤掉恶意密钥。

我知道侧面输入,我不知道它是否可以在运行时改变。

我知道根据定义,DAG 不能有循环,但是没有它如何达到相同的结果?

我想到的想法是使用侧面输出将 ID 标记为恶意并制作假的无界输出/输入。输出会将数据转储到某个存储中,输入将每小时加载一次并流式传输,以便可以加入。

0 投票
2 回答
1423 浏览

google-cloud-dataflow - 有没有一种使用模拟作为数据流测试输入的好方法?

我正在尝试测试一个DoFn<KV<String, twitter4j.Status>, String>实现,并提供测试数据作为输入。我正在探索的一个途径是使用一个Mockito.mock对象作为输入,因为有大量的抽象方法可以实现。但是,在 my 中调用模拟方法DoFn会更改对象,因此测试框架会抱怨“输出后不得以任何方式改变值”。

有没有另一种方法来完成我在这里尝试的事情?测试代码大致为: