问题标签 [apache-beam]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-cloud-dataflow - 水印行为不稳定,可能是由于事件时间错误(客户端时钟偏差)
我们有来自视频流客户端的事件流,大约 1k-3k/s,主要是心跳,我们将其聚合到会话中。我们使用具有 10 分钟间隔持续时间和默认触发的会话窗口(即不允许提前触发和不允许延迟)。
我们观察到的是(1)不稳定和突发的会话发射,以及(2)不稳定和“跳跃”的水印,有时可以追溯到 2 周的历史。
我们认为这可能与具有本地时钟偏差的客户端子集相关联,从而影响事件时间。
这意味着水印不会将来自“坏”客户端的事件标记为延迟,而是根据这些新(旧)时间戳进行自我调整,从而有效地阻止最近结束的会话的发射。并且只有在某个预定义的时间内没有观察到不良事件时,水印才会提前到实时并且可以发出最近的会话。
这是一个合理的假设吗?这是上述条件下的预期行为吗?鉴于我们的假设是正确的,推荐的解决方案是什么?
apache-flink - 有没有办法以编程方式使用纱线上的 flink 后端启动梁作业?
有没有办法以编程方式使用纱线上的 flink 后端启动梁作业?FlinkPipelineOptions 可以配置为指向一个纱线集群吗?
google-cloud-dataflow - 为什么 DoFnTester 不再支持 finishBundle() 中的输出?
我们有一些函数可以批量处理捆绑包中的元素。他们需要在 finishBundle() 方法中输出最后一批。
类似于上一个问题的答案中描述的内容。
在基于 Beam 0.4.0 的最新 Dataflow 2.0.0-beta1 版本中,如果被测函数在上下文中调用输出方法,则 DoFnTester 会从其 finishBundle() 方法中抛出 UnsupportedOperationException。
这种行为改变有什么原因吗?运行时不再支持 DoFn.finishBundle() 的输出吗?
avro - Google DataFlow 和读取 Parquet 文件
尝试使用 Google DataFlow Java SDK,但对于我的用例,我的输入文件是 .parquet 文件。
找不到任何开箱即用的功能来将 parquet 作为有界数据源读入 DataFlow 管道。据我了解,我可以创建一个编码器和/或接收器,有点像基于Parquet Reader的 AvroIO 。
有没有人可以建议如何以最佳方式实施它?或指向我的参考指南 \ 示例?
感谢你的帮助!
- 一种
google-cloud-dataflow - 云数据流作业无故失败
我有一个要使用 Apache Beam API (0.4.0) 执行的 Google Cloud Dataflow 作业。管道使用本地运行器成功运行。当我使用数据流运行程序提交作业时,作业已提交,但是在 32 秒后失败,而在任何地方都没有显示原因。日志似乎是空的。gcloud cli 也没有任何帮助:
如何追踪错误的原因?
java - 在 Dataflow 中覆盖 AvroIO 默认编码器
我正在尝试使用自定义编码器来处理 Dataflow 中的数据。我所做的是以下内容:
- 将数据从 BigQuery 导出到 avro 文件
- 使用 avro-tools-1.7.7.jar 从这些文件中的模式自动生成一个类
- 使用 Kryo 为该类编写了一个自定义编码器
- 用注释类
@DefaultCoder(MyCustomCoder.class)
- 使用注册我的编码器
p.getCoderRegistry().registerCoder(MyCustomClass.class, MyCustomCoder.class);
- 使用从 avro 文件中读取数据
PCollection<MyCustomClass> pc = p.apply(AvroIO.Read.named("Name").from("gs://bucket/path/to/*.avro").withSchema(MyCustomClass.class));
问题是,如果我的编码器中有错误,我的工作只会在洗牌步骤中失败。看起来 Dataflow 并没有使用我的自定义编码器从 avro 文件中加载数据。真的是这样吗?如果是这样,有没有办法覆盖用于加载数据的编码器?
python - Beam/Dataflow Python:AttributeError:'_UnwindowedValues'对象没有属性'sort'
我正在使用 Apache Beam 的 Python SDK 开发一个在 Google Cloud Dataflow 上运行的工作流程。
在本地运行时,工作流成功完成且没有错误,并且数据输出完全符合预期。
当我尝试在 Dataflow 服务上运行时,它会引发以下错误:
AttributeError: '_UnwindowedValues' object has no attribute 'sort'
这来自以下代码:
这被称为使用标准beam.ParDo
,如下所示:
元组中的数据(k, v)
如下例所示:
作为v
日期和值的对象
我尝试切换到标准 lambda 函数也会引发相同的错误。
任何想法为什么这在本地与在 Dataflow 上运行不同?或建议解决方法。
google-cloud-dataflow - 在 Google Dataflow / Apache Beam 中读取嵌套 JSON
可以通过以下方式使用 Dataflow 读取 Cloud Storage 上未嵌套的 JSON 文件:
如果我只想将这些日志以最少的过滤写入 BigQuery,我可以使用像这样的 DoFn 来做到这一点:
但是,我不知道如何以这种方式访问 JSON 文件中的嵌套字段。
- 如果 TableRow 包含一个
RECORD
namedr
,是否可以在不进一步序列化/反序列化的情况下访问其键/值? - 如果我需要使用库对自己进行序列化/反序列化,那么使用标准of代替来获得我以这种方式失去的一些性能
Jackson
是否更有意义?Coder
TextIO.Read
TableRowJsonCoder
编辑
这些文件是换行符分隔的,看起来像这样:
google-cloud-dataflow - 在侧输出上构建具有相同转换的数据流管道
我们正在构建一个流式管道,其中数据可能会在几个步骤中遇到不同的错误,例如序列化错误、验证错误和写入存储时的运行时错误。每当发生错误时,我们将数据定向到侧面输出。这些侧输出的错误处理逻辑是相同的。我们将数据写入通用错误存储以进行后期处理/报告。
构建管道至少有三个选项。(下面的伪代码)
使用转换的新实例处理每个侧输出。
/li>使用转换的单个实例处理每个侧输出。
/li>将这些侧输出的输出展平,并使用单个转换来处理所有错误。
/li>
对于使用哪一个以获得更好的可扩展性和性能,是否有任何建议?或者也许没关系?