问题标签 [beam-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
113 浏览

java - 数据流/光束累加器编码器

我正在开发一个数据流管道,它使用SqlTransform库以及org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf.

这是一张代码幻灯片:

这在本地测试时工作正常(我还创建了一些测试,工作正常):

问题是当我想使用 Dataflow 在 Google 云中部署时,我有以下输出:

我搜索了这个问题,但我什么也没找到,只是一些类似的案例,他们开发了自己的 Agg 函数并且他们需要定义 CODER。在这种情况下,我不知道在哪里可以找到编码器,或者我是否必须自己创建它,因为我使用的是 Beam 函数 ( org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf)。

所以问题是:

  1. 有没有办法使用这个Beam函数来添加需要在Dataflow中运行的编码器?
  2. 如果我需要更改功能并创建一个新功能和编码器,我该怎么做?

谢谢

0 投票
2 回答
322 浏览

google-cloud-platform - 如何转换 PCollection到 PCollection> 在 JAVA 中

我正在尝试将包含多个值的 tablerow 转换为 KV。我可以在 DoFn 中实现这一点,但这会增加我想要进一步编写的代码的复杂性,并使我的工作更加困难。(基本上我需要对tablerow的两个pcollections执行CoGroupBy操作)

有什么方法可以将 PCollection 转换为 PCollection<KV<String, String>>,其中的键和值以与 tablerow 中相同的格式存储?

我写了一个看起来像这样的片段,但这并没有给我想要的结果,有什么方法可以加载 tablerow 中的所有条目并生成具有这些值的 KV?

0 投票
2 回答
53 浏览

apache-beam - 将 Apache Beam 中的多部分空 csv 文件写入 netApp 存储网格时出现异常

问题陈述

我们正在使用多个 csv 文件到 pcollections -> 应用 beam SQL 来转换数据 -> 写入结果 pcollection。如果我们在所有源 pCollections 中都有一些数据并且梁 SQL 使用一些数据生成新集合,那么这绝对可以正常工作。当 Transform pCollection 生成空 pCollection 并且在 netApp Storage Grid 中写入它时,它会在下面抛出,

以下是示例代码

观察

  • 如果我们编写简单文件而不是多部分文件(简单将对象放入存储网格),则工作正常
  • 似乎是 Storage Grid 的已知问题,但我们想检查我们是否可以从梁管道处理这个问题。

我试过的

  • 试图查看我是否可以在写入之前检查 PCollection 的大小并将一些字符串放入输出文件,但由于 PCollection 是空的,它根本不会进入 PTransform。
  • 也尝试使用 Count.globally ,但该事件没有帮助

  • 无论如何我们可以在 Beam 中处理这个问题,就像我们可以在写入之前检查 PCollection 的大小一样?如果大小为零,即空 pcollection,那么我们可以避免写入文件以避免此问题。
  • 有没有人遇到过类似的问题并能够解决?
0 投票
0 回答
5 浏览

apache-beam - 在梁 sql cli 中使用自定义元存储

我正在尝试使用beamsql-shell来使用beamsql 创建管道。目前,我创建了一个只读的自定义元存储,其中包含许多我想在查询中使用的表。只是为了一些细节,这些表查询 pubsub,因此目的是将查询作为流式管道执行。

在管道中,我可以选择 tableProvider 并在执行SqlTransform类似操作时传递元存储:

我的问题:有没有办法在运行时加载这个元存储beamsql-shell

0 投票
0 回答
9 浏览

python - 从 PTransform 获取数据时 Python Beam SqlTransform 未知编码器异常

有这个 PTransform 将数据映射到 beam.Row:

以及使用该 beam.Row 应用 SqlTransform 的 PTransform

它通过以下方式在单元测试中被调用:

运行此代码会产生 java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [...]异常

奇怪的是,如果我取消注释中的行FilterValuesInSegment,代码确实有效。为什么会遇到编码器异常?