问题标签 [spotify-scio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
616 浏览

google-cloud-dataflow - Apache Beam - 仅滑动窗口发出最早的活动窗口

我正在尝试使用 Apache Beam(通过 Scio)运行来自流式源的最后 3 天数据(处理时间)的连续聚合,并每 5 分钟从最早活动窗口输出结果。Earliest表示开始时间最早的窗口,active表示窗口结束时间还没有过去。本质上,我试图通过删除滑动窗口之间的非重叠时段来获得“滚动”聚合。

我正在尝试使用大小为 3 天和周期为 1 天的示例滑动窗口来实现的可视化:

我尝试过使用滑动窗口(大小=3 天,周期=5 分钟),但它们会在未来每 3 天/5 分钟组合生成一个新窗口,并且会为每个窗口发出早期结果。我尝试使用trigger = AfterWatermark.pastEndOfWindow(),但在工作刚开始时我需要早期结果。我尝试比较窗口之间的pane数据(isLast,timestamp等),但它们看起来相同。

我最近的尝试,这似乎有点 hack,包括将窗口信息附加到 DoFn 中的每个键,重新窗口到固定窗口,并尝试从附加数据中分组并减少到最旧的窗口,但最后reduceByKey没有'似乎没有输出任何东西。

DoFn 附加窗口信息

有什么建议么?

0 投票
1 回答
391 浏览

apache-flink - Apache Beam 作业有时在 Apache Flink 集群上使用 OptimizerPlanEnvironment$ProgramAbortException 失败

我一直在运行一个基于 Apache Beam 的数据摄取作业,它解析输入 CSV 文件并将数据写入接收器。当一次提交一个作业时(在正常负载情况下),此作业可以正常工作。

但最近当我开始负载测试时,我开始在一个循环中按顺序安排多个作业,我观察到一些异常和作业失败。目前,我正在使用脚本通过 Flink REST API 为RUNNING. 安排这些作业时,有时所有作业都在执行而没有任何问题,但大多数情况下,9 个作业中有 2 或 3 个失败,但有以下例外。我已经尝试过使用多个输入 CSV 文件的工作,但它显示出类似的行为。

例外一:

例外 2:

作业 jar 文件是使用 Spotify 的 Scio 库 (0.8.0) 在 Scala 中开发的。Flink 集群有以下规格:

  • Flink 版本 1.8.3
  • 1 个主节点和 2 个工作节点
  • 32 个任务槽和 2 个任务执行器
  • 作业管理器堆大小 = 48Gb
  • 任务执行器堆大小 = 24Gb
0 投票
1 回答
186 浏览

scala - 在 flink 集群上远程调试 apache Beam 作业

我在一个 flink 集群上运行一个流束作业,我得到了以下异常。

流式作业是从 apache pulsar 源获取数据,并将输出数据以 parquet 文件格式写入 Alluxio 数据湖。我正在使用 Spotify 的 scio 在 Scala 中编写这份工作。一个小代码块来强调我想要实现的目标:

从异常中,我可以看到源路径和输出路径应该具有相同的 URI 方案,但我不知道它是如何发生的,因为我使用 alluxio 路径作为输出目录。在 alluxio 输出目录上创建了一些临时目录,但是在WindowDuration创建输出文件时,会发生此异常。我怀疑临时位置可能默认配置为本地文件系统,所以我确实将其设置为输出目录路径(alluxio dir 路径),但它没有改变任何东西。

我想进行远程调试以找出问题所在。我已尝试使用此文档在任务执行程序节点上进行远程调试,但是一旦我的 IntelliJ IDE 与该节点连接,我的断点就不会受到影响。

有人可以建议,我如何调试或获取有关此问题的更多信息。谢谢

0 投票
0 回答
248 浏览

google-cloud-platform - 如何从云中执行的 Dataflow 管道中的 Google Drive 文件夹中读取数据?

我正在尝试使用 Google Drive API 从 GDrive 文件夹中读取文件作为 Cloud Dataflow 管道(包装在 Spotify Scio Scala SDK 中)的输入。

当我在本地运行管道时(使用 DirectRunner),在将GOOGLE_APPLICATION_CREDENTIALSenv var 设置为与我共享目标 GDrive 文件夹的服务帐户文件后,它可以很好地读取 GDrive 文件。但是,当我尝试使用以下命令在云中运行时:

我一次又一次地看到以下权限错误(我的管道没有失败,我必须手动取消它,因为没有任何进展):

我已验证我与用于运行管道的服务帐户共享目标文件夹及其父文件夹(以及计算引擎默认 SA,数据流工作人员根据默认情况使用docs),具有编辑权限。

这是导致抛出异常的代码(特别是该setFields行):

https://www.googleapis.com/auth/driveDrive 客户端是使用具有范围的应用程序默认凭据设置的。

我正在考虑设置一个云功能来将 GDrive 中的数据通过管道传输到 PubSub 主题中,但我想我会先在这里问一下,在从云数据流管道。

0 投票
1 回答
227 浏览

flutter - 如何在没有身份验证的情况下获取 Spotify 音乐列表

我想知道,是否可以在移动颤振 SDK 或 Web API 中获取没有访问令牌或 Aothuentication 的 Spotify 音乐列表。因为我只想在我的颤振应用程序中获取 Spotify 音乐列表。提前致谢。

0 投票
1 回答
156 浏览

google-cloud-dataflow - 如何尽快限制 Apache Beam 中的 PCollection?

我在 Google Cloud DataFlow(带有 Scio SDK)上使用 Apache Beam 2.28.0。我有一个很大的输入PCollection(有界),我想将它限制/采样到固定数量的元素,但我想尽快开始下游处理。

目前,当我的输入PCollection有例如 20M 元素并且我想通过使用https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/transforms/Sample.html将其限制为 1M #any-long-

它一直等到所有 20M 元素都被读取,这需要很长时间。

如何有效地将元素数量限制为固定大小并在达到限制后立即开始下游处理,丢弃其余的输入处理?

0 投票
0 回答
59 浏览

dataflow - 如何使用 scio 和数据流从 Scollection [TableRow] 获取记录

scio用来读取带有嵌套字段的 json 文件。

我的 json 看起来像这样{"context":"mycontext","value":{"foo":"bar", "foo1":"bar1"}}

我正在使用下面的代码,它将返回一个 Scollection[TableRow]。

val data = sc.tableRowJsonFile(jsonFile)

现在我需要从数据中获取记录让我们说“价值”并创建一个新的 Scollection[TableRow]。

我试过代码 sc.tableRowJsonFile(jsonFile).map(_.getRecord("value").asInstanceOf[TableRow])

但是当在数据流中执行时,它会在运行时抛出 ClassCastException

Java.lang.ClassCastException: Java.util.Arraylist cannot be cast to Java.Util.Map

我正在从云作曲家作为 dataflowRunner 运行这项工作。有什么方法可以从 scollection[TableRow] 获取记录作为新的 Scollection[TableRow]。我是 scio 和数据流的新手

0 投票
0 回答
68 浏览

scala - 升级到版本 0.10.0 后,scio 无法写入 BigQuery 中的 BYTES 字段

我正在使用 scio 使用 Scala 将数据写入 BigQuery,升级到 0.10.0 版后遇到奇怪的错误。

这是我的简单示例:

该示例适用于 scio 版本 0.9.2 (build.gradle):

当我升级到 0.10.0 版时。我一直在关注scio 团队的迁移指南

我得到了错误:

"message" : "读取数据时出错,错误消息:从位置 0 开始的行中的 JSON 解析错误:为非重复字段指定的数组:blob。",

我也尝试了最新版本(0.11.3),但仍然是同样的错误。你知道如何解决这个问题吗?

0 投票
0 回答
22 浏览

apache-beam - Apache Beam:发送 Pub/Sub 消息并接收超时响应

我计划使用 Apache Beam,但不确定如何实现下一个用例。

  1. 对于每个元素,Pub/Sub 消息都会发送到外部系统。
  2. 外部系统应以另一个 Pub/Sub 消息进行响应。原始元素应丰富响应消息的详细信息。
  3. 如果 30 分钟内没有响应,则执行一些其他逻辑并将元素传递给下一个转换。

scio 项目中有非常有趣的BaseAsyncDoFn,但我认为它不适合这么长时间运行的请求。

0 投票
0 回答
29 浏览

google-cloud-dataflow - PubsubIO 的作业测试

我正在为流式数据流作业编写 JobTest。

现在我可以启动数据流作业,但它不会使用来自 pubsub 的消息测试消息。如何改进测试以读取通过 JobTest 发送的自定义消息?

另外我有一个连续运行的流式数据流作业,如何确保我触发的测试作业在运行测试后耗尽?