问题标签 [apache-beam]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-cloud-dataflow - 如何为我的数据流指定工作人员数量?
我有一个 Apache Beam 管道,可以加载大约 90GB 的大型导入文件。我已经在 Apache Beam Java SDK 中编写了管道。
使用 的默认设置PipelineOptionsFactory
,我的工作需要很长时间才能完成。
我如何控制并以编程方式指定我的工作的并行度,从而指定工人的数量?
java - 即使我正在传递视图,“使用未知视图调用 sideInput()”异常?
代码是这样开始的:
声明了这些侧输入,为什么 Dataflow 看不到它们?
google-cloud-dataflow - 从 BigQuery 读取 Cloud Dataflow 作业在开始前卡住
在运行任何应用程序逻辑之前,我有一个 Cloud Dataflow 作业卡在启动阶段。我通过在步骤内添加日志输出语句对此进行了测试processElement
,但它没有出现在日志中,因此似乎没有达到。
我在日志中只能看到以下消息,每分钟出现一次:
logger: Starting supervisor: /etc/supervisor/supervisord_watcher.sh: line 36: /proc//oom_score_adj: Permission denied
这些每隔几秒钟循环一次:
VM is healthy? true.
http: TLS handshake error from 172.17.0.1:38335: EOF
Job is in state JOB_STATE_RUNNING, will check again in 30 seconds.
作业 ID 是2015-09-14_06_30_22-15275884222662398973
,尽管我还有另外两个作业 ( 2015-09-14_05_59_30-11021392791304643671
, 2015-09-14_06_08_41-3621035073455045662
),它们是我早上开始的并且有相同的问题。
关于可能导致这种情况的任何想法?
apache-beam - 什么是 Apache Beam?
我在浏览 Apache 的帖子时发现了一个名为 Beam 的新术语。谁能解释一下 Apache Beam 到底是什么?我试图用谷歌搜索,但无法得到明确的答案。
java - Dataflow JAVA SDK:以代码为输入,后台处理
请支持我了解以下场景的实现。
假设用户在前端的文本框中键入使用数据流 SDK 命令编写的代码。
我们需要获取该代码(假设为字符串)并在后端执行。
数据流 SDK 是否提供了像执行管理器这样的工具来做这样的事情?
也非常感谢一些资源来熟悉这种实现。
google-cloud-dataflow - 如何从 Flink 运行器上的 Google Dataflow (Apache Beam) 向 Kafka 发送消息
我正在尝试编写一个概念验证,它从 Kafka 获取消息,使用 Flink 上的 Beam 对其进行转换,然后将结果推送到不同的 Kafka 主题。
我使用 KafkaWindowedWordCountExample 作为起点,这是我想做的第一部分,但它输出到文本文件而不是 Kafka。FlinkKafkaProducer08 看起来很有希望,但我不知道如何将它插入管道。我在想它会用 UnboundedFlinkSink 或类似的东西包裹起来,但这似乎不存在。
关于我想要做什么的任何建议或想法?
我正在运行最新的孵化器光束(截至昨晚来自 Github)、集群模式下的 Flink 1.0.0 和 Kafka 0.9.0.1,所有这些都在 Google Compute Engine(Debian Jessie)上。
apache-kafka - 使用 FlinkKafkaProducer/FlinkKafkaConsumer 序列化和反序列化 avro 数据的问题
我在通过 Beam读取FlinkKafkaProducer
/写入 Avro 数据时遇到了一些问题。FlinkKafkaConsumer
如果有人可以指出FlinkKafkaProducer
并FlinkKafkaConsumer
使用 Avro 模式的工作示例(不使用 Kafka 的融合版) ,那就太好了
A)BeamKafkaFlinkAvroProducerTest(生产者)
如果我直接使用 KafkaProducer(即调用productSimpleData),一切正常(仅用于测试)。通过以下步骤使用FlinkKafkaProducer
as UnboundedSource (这是我应该做的)(即我称之为generateAvroData2 ):
首先,如果我使用
AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);
即基本上使用 Avro 的
/li>org.apache.avro.specific.SpecificDatumWriter
;我面临以下错误:接下来,如果我使用
/li>TypeInformationSerializationSchema
(不管管道中的 AvroCoder 是什么),事情显然可以正常工作,因为 Kafka 测试消费者工具会打印消息:
B)BeamKafkaFlinkAvroConsumerTest(消费者)
我知道我们应该TypeInformationSerializationSchema
在消费者和生产者中使用,或者应该分别在消费者和生产者中使用AvroDeserializationSchema
和AvroSerializationSchema
。
但是,无论使用AvroDeserializationSchema
or TypeInformationSerializationSchema
,我都会收到以下异常:
可能缺少一些非常基本的东西。所有代码都在这里。
java - 如何在 Dataflow/Beam 中将流数据与大型历史数据集相结合
我正在调查通过 Google Dataflow/Apache Beam 处理来自网络用户会话的日志,并且需要将用户的日志(流式传输)与上个月的用户会话历史记录结合起来。
我研究了以下方法:
- 使用 30 天固定窗口:最有可能大的窗口适合内存,我不需要更新用户的历史记录,只需参考即可
- 使用 CoGroupByKey 连接两个数据集,但两个数据集必须具有相同的窗口大小(https://cloud.google.com/dataflow/model/group-by-key#join),这在我的案例(24 小时 vs 30 天)
- 使用 Side Input 检索给定
element
in的用户会话历史记录processElement(ProcessContext processContext)
我的理解是通过加载的数据.withSideInputs(pCollectionView)
需要适合内存。我知道我可以将单个用户的所有会话历史记录到内存中,但不能将所有会话历史记录。
我的问题是,是否有办法从仅与当前用户会话相关的侧面输入加载/流式传输数据?
我正在想象一个 parDo 函数,它将通过指定用户的 ID 从侧面输入加载用户的历史会话。但是只有当前用户的历史会话才能放入内存;通过侧面输入加载所有历史会话会太大。
一些伪代码来说明:
google-cloud-dataflow - 实时流水线反馈回路
我有一个包含可能损坏/恶意数据的数据集。数据带有时间戳。我正在使用启发式函数对数据进行评级。一段时间后,我知道所有带有一些 ID 的新数据项都需要被丢弃,它们代表了很大一部分数据(高达 40%)。
现在我有两个批处理管道:
- 第一个只是对数据进行评级。
- 第二个首先过滤掉损坏的数据并运行分析。
我想从批处理模式(例如,每天运行)切换到在线处理模式(希望延迟 < 10 分钟)。
第二个管道使用全局窗口,使处理变得容易。当检测到损坏的数据键时,所有其他记录都被简单地丢弃(也很容易使用前几天丢弃的键作为预过滤器)。此外,它可以更轻松地对输出数据做出决策,因为在处理给定键的所有历史数据期间都是可用的。
主要问题是:我可以在数据流 DAG 中创建循环吗?假设我想累积给我处理的每个会话窗口的质量率,如果速率总和超过 X,管道早期阶段的一些过滤器功能应该过滤掉恶意密钥。
我知道侧面输入,我不知道它是否可以在运行时改变。
我知道根据定义,DAG 不能有循环,但是没有它如何达到相同的结果?
我想到的想法是使用侧面输出将 ID 标记为恶意并制作假的无界输出/输入。输出会将数据转储到某个存储中,输入将每小时加载一次并流式传输,以便可以加入。
google-cloud-dataflow - 有没有一种使用模拟作为数据流测试输入的好方法?
我正在尝试测试一个DoFn<KV<String, twitter4j.Status>, String>
实现,并提供测试数据作为输入。我正在探索的一个途径是使用一个Mockito.mock
对象作为输入,因为有大量的抽象方法可以实现。但是,在 my 中调用模拟方法DoFn
会更改对象,因此测试框架会抱怨“输出后不得以任何方式改变值”。
有没有另一种方法来完成我在这里尝试的事情?测试代码大致为: