问题标签 [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2114 浏览

google-cloud-pubsub - 创建从 Google Pub/Sub 读取的 Apache Beam 管道

我正在尝试使用 apache-beam 创建一个流管道,该管道从 google pub/sub 读取句子并将单词写入 Bigquery 表。

我正在使用0.6.0apache-beam 版本。

按照示例,我做了这个:

我在附近有一个错误:

apply(ParDo.of(new ExtractWords()))

因为前一个apply不是返回一个String而是一个Object

我想问题是从返回的类型PubsubIO.read().topic(options.getPubsubTopic())。类型是PTransform<PBegin, PCollection<T>>而不是PTransform<PBegin, PCollection<String>>

使用 apache-beam 从 google pub/sub 读取的正确方法是什么?

0 投票
2 回答
2808 浏览

java - 缓冲和刷新 Apache Beam 流数据

我有一个流式作业,初始运行必须处理大量数据。DoFn 调用支持批处理请求的远程服务之一,因此在使用有界集合时,我使用以下方法:

有没有办法窗口数据,以便可以在无界集合上使用相同的方法?

我试过以下:

但是每个元素都调用startBundleand 。finishBundle是否有机会使用 RxJava(2 分钟窗口或 100 个元素包):

0 投票
2 回答
845 浏览

tensorflow - 如何在 tf.Transform 中使用 Google DataFlow Runner 和模板?

我们正在 Google Cloud 上建立机器学习管道,利用 GC ML-Engine 进行分布式 TensorFlow 训练和模型服务,并利用 DataFlow 进行分布式预处理作业。

我们希望在 Google Cloud 上将 Apache Beam 应用程序作为 DataFlow 作业运行。查看ML-Engine 示例 ,似乎可以获得tensorflow_transform.beam.impl AnalyzeAndTransformDataset以指定要使用的PipelineRunner,如下所示:

TemplatingDataflowPipelineRunner提供了将我们的预处理开发与参数化操作分开的能力 - 请参见此处:https ://cloud.google.com/dataflow/docs/templates/overview - 基本上:

  • A)在PipelineOptions派生类型中,将选项类型更改为ValueProvider(python 方式:类型推断或类型提示???)
  • B) 将 runner 更改为TemplatingDataflowPipelineRunner
  • C) mvn archetype:generate to store template in GCS (python way: a yaml file like TF Hypertune ???)
  • D) gcloud beta 数据流作业运行 --gcs-location --parameters

问题是:你能告诉我我们如何使用tf.Transform来利用TemplatingDataflowPipelineRunner吗?

0 投票
1 回答
976 浏览

google-cloud-dataflow - 使用 Beam Python SDK 读取复杂的 XML

我如何最好地为 Python SDK 编写源代码,它应该读取嵌套的 XML 文件并将内容拆分为多行。现有的源都在行级别上工作,这不是我在 XML 上下文中需要的。

它是一堆 XML 文件,每个文件都生成一个交易,必须将其分解为多个记录(订单行、付款等)。

0 投票
2 回答
3186 浏览

java - 尝试使用 DataflowRunner 时出现 ClassNotFound 异常

我正在尝试使用 Apache Beam 0.6.0 在 GCP 上启动数据流作业。我正在使用 shade 插件编译一个 uber jar,因为我无法使用“mvn:execjava”启动该作业。我包括这个依赖:

我收到以下异常:

我还缺少其他东西吗?

0 投票
1 回答
394 浏览

apache-beam - Apache Beam:PubsubReader 因 NPE 而失败

我有一个梁管道,它在应用一些转换后从 PubSub 读取并写入 BigQuery。管道始终因 NPE 而失败。我正在使用梁 SDK 版本 0.6.0。关于我可能做错了什么的任何想法?我正在尝试使用 DirectRunner 运行管道。

0 投票
1 回答
3819 浏览

java - Eclipse 上带有 Dataflow Runner 的 Apache Beam MinimalWordcount 示例

我正在尝试使用 Eclipse 中的 DataFlowRunner 在 Windows 上使用 MinimalWordCount --> Run As Java Application from with in eclipse 运行 MinimalWordCount 示例,它与使用我的 gcs 存储桶的示例中的股票代码相同,但是我一直得到以下异常,有人可以让我知道这里有什么问题吗?

  1. 我已验证存储桶名称是否正确。
  2. 我已经在我的 Windows 机器上运行了 gcloud init。

线程“主”java.lang.RuntimeException 中的异常:无法从 org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod 的工厂方法 DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions) 构造实例(InstanceBuilder.java:233) 在 org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:162) 在 org.apache.beam.sdk.runners.PipelineRunner.fromOptions(PipelineRunner.java:56) 在org.apache.beam.sdk.Pipeline.create(Pipeline.java:135) at com.dynaobject.apachebeamexperiment.MinimalWordCount.main(MinimalWordCount.java:77) 原因:sun.reflect 处的 java.lang.reflect.InvocationTargetException。在 sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 在 sun.reflect 的 NativeMethodAccessorImpl.invoke0(Native Method)。DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:222) ... 4 更多原因: java.lang.IllegalArgumentException: DataflowRunner 需要 gcpTempLocation,但未能从 org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:212) 的 PipelineOptions 中检索值 ... 9 更多原因:java.lang .IllegalArgumentException:但未能从 org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:212) 的 PipelineOptions 中检索值... 9 更多原因:java.lang.IllegalArgumentException:但未能从 org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:212) 的 PipelineOptions 中检索值... 9 更多原因:java.lang.IllegalArgumentException:为 gcpTempLocation 构造默认值时出错:tempLocation 不是有效的 GCS 路径 gs://tempxyz。 在 org.apache.beam.sdk.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:219) 在 org.apache.beam.sdk.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:205) 在 org.apache .beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:575) 在 org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:516) 在 org.apache.beam.sdk.options.ProxyInvocationHandler .invoke(ProxyInvocationHandler.java:155) at com.sun.proxy.$Proxy15.getGcpTempLocation(Unknown Source) at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:210) ... 还有 9 个 引起:java.lang.IllegalArgumentException:输出路径不存在或不可写:gs://tempxyz at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java :191) 在 org.apache.beam.sdk.util.GcsPathValidator.validateOutputFilePrefixSupported(GcsPathValidator.java:62) 的 org.apache.beam.sdk.util.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:78)。 beam.sdk.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:217) ... 还有 15 个

0 投票
1 回答
341 浏览

python - 从 GAE Cronjob 触发 Apache Beam (Python)

在替换我的旧appengine-mapreduce工作时,我需要一种方法来从我的 cron 触发这个 python 数据流工作。

我已阅读https://cloud.google.com/blog/big-data/2016/04/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions,但不清楚Python的完整翻译。

Cloud Functions 没有安装 python,我不确定是否/如何安装便携式 python。所以我假设从我的托管 VM Python 实例触发会更容易......据我所知,它将是这样的:

  • 我正在使用 GAE 灵活虚拟机(无沙盒)。
  • 我可以将 apache_beam 库(运行my_dataflow.py)包含到我的 docker 映像中。
  • 我可以通过我的项目推送上传这些文件,以便可以从 VM 磁盘访问它们:my_dataflow.pysetup.py(安装我的库依赖项)和apache-beam.tar.gz(因为我正在编写尚未在 PyPI 上发布的 0.7.0 API)
  • 我可以调用my_dataflow.run()指向 PipelineOptionssetup.pyapache-beam.tar.gz

是这样,还是我错过了任何其他步骤?希望避免在这里找错树,并担心在花了几个小时推倒重来试图让它发挥作用后遇到已知的不可逾越的路障。

0 投票
2 回答
1106 浏览

google-cloud-dataflow - 根据元素值关闭窗口

当输入元素在 DoFn 的侧面输出中具有标志值时,有没有办法关闭窗口?例如,指示会话关闭的事件会关闭窗口。

我一直在阅读文档,触发器主要是基于时间的。一个例子会很棒。

编辑: Trigger.OnElementContext.forTrigger(ExecutableTrigger trigger) 似乎很有希望,但 ExecutableTrigger 文档目前还很薄。

0 投票
4 回答
434 浏览

google-cloud-dataflow - 什么是 Maven 依赖项

我正在尝试运行 Apache Beam Cookbook 示例,有一个 import 语句。

我的 Eclipse 项目为此需要的 Maven 依赖项是什么,我找不到任何文档。

谢谢。