问题标签 [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
176 浏览

google-cloud-dataflow - 水印行为不稳定,可能是由于事件时间错误(客户端时钟偏差)

我们有来自视频流客户端的事件流,大约 1k-3k/s,主要是心跳,我们将其聚合到会话中。我们使用具有 10 分钟间隔持续时间和默认触发的会话窗口(即不允许提前触发和不允许延迟)。

我们观察到的是(1)不稳定和突发的会话发射,以及(2)不稳定和“跳跃”的水印,有时可以追溯到 2 周的历史。

我们认为这可能与具有本地时钟偏差的客户端子集相关联,从而影响事件时间。

这意味着水印不会将来自“坏”客户端的事件标记为延迟,而是根据这些新(旧)时间戳进行自我调整,从而有效地阻止最近结束的会话的发射。并且只有在某个预定义的时间内没有观察到不良事件时,水印才会提前到实时并且可以发出最近的会话。

这是一个合理的假设吗?这是上述条件下的预期行为吗?鉴于我们的假设是正确的,推荐的解决方案是什么?

0 投票
0 回答
231 浏览

apache-flink - 有没有办法以编程方式使用纱线上的 flink 后端启动梁作业?

有没有办法以编程方式使用纱线上的 flink 后端启动梁作业?FlinkPipelineOptions 可以配置为指向一个纱线集群吗?

0 投票
1 回答
133 浏览

google-cloud-dataflow - 为什么 DoFnTester 不再支持 finishBundle() 中的输出?

我们有一些函数可以批量处理捆绑包中的元素。他们需要在 finishBundle() 方法中输出最后一批。

类似于上一个问题的答案中描述的内容。

在基于 Beam 0.4.0 的最新 Dataflow 2.0.0-beta1 版本中,如果被测函数在上下文中调用输出方法,则 DoFnTester 会从其 finishBundle() 方法中抛出 UnsupportedOperationException。

这种行为改变有什么原因吗?运行时不再支持 DoFn.finishBundle() 的输出吗?

0 投票
1 回答
1653 浏览

avro - Google DataFlow 和读取 Parquet 文件

尝试使用 Google DataFlow Java SDK,但对于我的用例,我的输入文件是 .parquet 文件。

找不到任何开箱即用的功能来将 parquet 作为有界数据源读入 DataFlow 管道。据我了解,我可以创建一个编码器和/或接收器,有点像基于Parquet Reader的 AvroIO 。

有没有人可以建议如何以最佳方式实施它?或指向我的参考指南 \ 示例?

感谢你的帮助!

- 一种

0 投票
1 回答
221 浏览

google-cloud-ml - KeyError: u"FALSE [while running 'Extract label ids']"

我正在使用 cloudml-samples 中的花卉教程代码,试图在一组餐厅照片上实现多标签分类。

我有 dict.txt 和相应更新的输入,这里是示例行。


字典.txt


eval_set.csv


预处理作业开始运行良好,然后我看到这个特定的错误不断出现,直到作业失败。

作业日志快照


作业日志 - KeyError: u"FALSE [while running 'Extract label ids']"


作业日志 - 工作流失败

0 投票
1 回答
349 浏览

google-cloud-dataflow - 云数据流作业无故失败

我有一个要使用 Apache Beam API (0.4.0) 执行的 Google Cloud Dataflow 作业。管道使用本地运行器成功运行。当我使用数据流运行程序提交作业时,作业已提交,但是在 32 秒后失败,而在任何地方都没有显示原因。日志似乎是空的。gcloud cli 也没有任何帮助:

如何追踪错误的原因?

0 投票
1 回答
383 浏览

java - 在 Dataflow 中覆盖 AvroIO 默认编码器

我正在尝试使用自定义编码器来处理 Dataflow 中的数据。我所做的是以下内容:

  • 将数据从 BigQuery 导出到 avro 文件
  • 使用 avro-tools-1.7.7.jar 从这些文件中的模式自动生成一个类
  • 使用 Kryo 为该类编写了一个自定义编码器
  • 用注释类@DefaultCoder(MyCustomCoder.class)
  • 使用注册我的编码器 p.getCoderRegistry().registerCoder(MyCustomClass.class, MyCustomCoder.class);
  • 使用从 avro 文件中读取数据PCollection<MyCustomClass> pc = p.apply(AvroIO.Read.named("Name").from("gs://bucket/path/to/*.avro").withSchema(MyCustomClass.class));

问题是,如果我的编码器中有错误,我的工作只会在洗牌步骤中失败。看起来 Dataflow 并没有使用我的自定义编码器从 avro 文件中加载数据。真的是这样吗?如果是这样,有没有办法覆盖用于加载数据的编码器?

0 投票
1 回答
2997 浏览

python - Beam/Dataflow Python:AttributeError:'_UnwindowedValues'对象没有属性'sort'

我正在使用 Apache Beam 的 Python SDK 开发一个在 Google Cloud Dataflow 上运行的工作流程。

在本地运行时,工作流成功完成且没有错误,并且数据输出完全符合预期。

当我尝试在 Dataflow 服务上运行时,它会引发以下错误: AttributeError: '_UnwindowedValues' object has no attribute 'sort'

这来自以下代码:

这被称为使用标准beam.ParDo,如下所示:

元组中的数据(k, v)如下例所示:

作为v日期和值的对象

我尝试切换到标准 lambda 函数也会引发相同的错误。

任何想法为什么这在本地与在 Dataflow 上运行不同?或建议解决方法。

0 投票
1 回答
4756 浏览

google-cloud-dataflow - 在 Google Dataflow / Apache Beam 中读取嵌套 JSON

可以通过以下方式使用 Dataflow 读取 Cloud Storage 上未嵌套的 JSON 文件:

如果我只想将这些日志以最少的过滤写入 BigQuery,我可以使用像这样的 DoFn 来做到这一点:

但是,我不知道如何以这种方式访问​​ JSON 文件中的嵌套字段。

  1. 如果 TableRow 包含一个RECORDnamed r,是否可以在不进一步序列化/反序列化的情况下访问其键/值?
  2. 如果我需要使用库对自己进行序列化/反序列化,那么使用标准of代替来获得我以这种方式失去的一些性能Jackson是否更有意义?CoderTextIO.ReadTableRowJsonCoder

编辑

这些文件是换行符分隔的,看起来像这样:

0 投票
1 回答
312 浏览

google-cloud-dataflow - 在侧输出上构建具有相同转换的数据流管道

我们正在构建一个流式管道,其中数据可能会在几个步骤中遇到不同的错误,例如序列化错误、验证错误和写入存储时的运行时错误。每当发生错误时,我们将数据定向到侧面输出。这些侧输出的错误处理逻辑是相同的。我们将数据写入通用错误存储以进行后期处理/报告。

构建管道至少有三个选项。(下面的伪代码)

  1. 使用转换的新实例处理每个侧输出。

    /li>
  2. 使用转换的单个实例处理每个侧输出。

    /li>
  3. 将这些侧输出的输出展平,并使用单个转换来处理所有错误。

    /li>

对于使用哪一个以获得更好的可扩展性和性能,是否有任何建议?或者也许没关系?