问题标签 [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
697 浏览

google-cloud-dataflow - Dataflow 管道选项的服务帐户凭据

从 Dataflow 1.9 升级到 Beam 0.4.0。GcpOptions 上设置服务帐户名称 ( setServiceAccountName) 和密钥文件 ( setServiceAccountKeyFile) 的方法不再可用。最接近的替代方案是setGcpCredential.

要手动创建 GoogleCredential,要使用哪些合适的范围?我的管道需要访问 PubSub、Datastore 和 BigQuery,可能还有 Cloud Storage。

0 投票
0 回答
55 浏览

java - 何时使用 Datastore 数据启动 AppEngine Memcache?

我在 Google Cloud Datastore 中有大约 150000 条记录。这是一个文档数据库,最好使用称为“数据流”的东西与之交互。花了一段时间才弄清楚!这是我为了将 150000 条记录导入 Datastore 所必须使用的。

现在我想“启动内存缓存”并将所有(或尽可能多的)这些内存缓存到 AppEngine 内存缓存中。以前有没有人这样做过,如果你这样做过,最好的方法是什么?是否有任何“陷阱”?

0 投票
1 回答
617 浏览

google-cloud-dataflow - 有状态 ParDo 不适用于 Dataflow Runner

基于 Javadocs 和https://beam.apache.org/blog/2017/02/13/stateful-processing.html上的博客文章,我尝试使用使用 2.0.0-beta-2 SDK 的简单重复数据删除示例它从 GCS 读取一个文件(包含一个 json 列表,每个都有一个 user_id 字段),然后通过管道运行它,如下所述。

输入数据包含大约 146K 事件,其中只有 50 个事件是唯一的。整个输入大约是 50MB,应该可以在比 2 分钟固定窗口短得多的时间内处理。我只是在那里放置了一个窗口,以确保在不使用 GlobalWindow 的情况下保持 per-key-per-window 语义。我通过 3 个并行阶段运行窗口数据以比较结果,每个阶段都在下面解释。

  1. 只需将内容复制到 GCS 上的新文件中 - 这可确保所有事件都按预期处理,并且我验证内容与输入完全相同
  2. 在 user_id 上组合.PerKey 并仅从 Iterable 中选择第一个元素 - 这基本上应该对数据进行重复数据删除,并且它可以按预期工作。生成的文件具有原始事件列表中唯一项目的确切数量 - 50 个元素
  3. 有状态 ParDo 检查密钥是否已经被看到并仅在没有时才发出输出。理想情况下,由此产生的结果应该与 [2] 中的重复数据匹配,但我所看到的只是 3 个唯一事件。在我做的几次运行中,这 3 个唯一事件总是指向相同的 3 个 user_id。

有趣的是,当我从 DataflowRunner 切换到在本地运行整个过程的 DirectRunner 时,我看到 [3] 的输出与 [2] 匹配,只有 50 个唯一元素,如预期的那样。因此,我怀疑 Stateful ParDo 的 DataflowRunner 是否存在任何问题。

0 投票
2 回答
2172 浏览

google-cloud-storage - 谷歌云数据流到云存储

在此处输入图像描述上面的参考架构表明存在来自 Cloud Dataflow 的 Cloud Storage sink,但是似乎是当前默认 Dataflow API 的 Beam API 没有列出 Cloud Storage I/O 连接器。

在此处输入图像描述

任何人都可以帮助澄清是否存在,如果不存在将数据从 Dataflow 带到云存储的替代方法。

0 投票
1 回答
1035 浏览

python - Parallelize loop operation with Google Data Flow

I want to run an ARIMA Model with thousands of CSV files with various combination

Using Pyflux Here is some python code..

I can load these CSV's in Big Query and want to parallelize this operation of sending the data into the ARIMA model as the operation of running the data through ARIMA model with these Files or BigQuery result can run in parallel so that I can save significant time on this operation.

Is there a way to achieve this in Google Data flow?

0 投票
5 回答
2718 浏览

java - 从 ElasticsearchIO 等待 [10000] 毫秒后获取侦听器超时

我正在尝试测试一个简单的 Apache Beam 代码,其源代码为 Elasticsearch。我从git repo中找到了 ElasticsearchIO 源类。

我修改了 Beam 的 MinimalWordCount 示例,将源包含为 Elasticsearch 而不是 TextIO。下面是要点,

如果我运行代码,

我收到错误

执行 Java 类时发生异常。null:InvocationTargetException:java.io.IOException:等待[10000]毫秒后的侦听器超时

我调试了ElasticsearchIO.java并且可以看到一切正常,Elasicsearch 客户端已构建并且代码正在检索索引中的数据。但是读取转换后的 ParDo 函数根本不会执行。Elasticsearch 客户端一直在等待,最后得到一个超时错误。

我知道 Beam 的 Elasicsearch 连接器仍在开发中。但是任何人都可以帮助找出我做错了什么吗?

PS:我在本地运行 Elasticsearch 5.2.1。

0 投票
1 回答
980 浏览

python - 模块对象没有属性 BigqueryV2 - 本地 Apache Beam

我正在尝试使用 Apache Beam 为 Google BigQuery 提供的 I/O API 在本地(Sierra)运行管道。

我按照Beam Python quickstart的建议使用 Virtualenv 设置了我的环境,我可以运行 wordcount.py 示例。我还可以使用beam.Create和正确运行自定义管道beam.ParDo

但我无法使用 BigQuery I/O 运行管道。知道我做错了什么吗?

python脚本如下。

当我运行它时,我收到以下错误。

0 投票
2 回答
1096 浏览

google-cloud-dataflow - 带有 GlobalWindow 的 Beam 中的状态垃圾收集

Apache Beam 最近通过注释引入了状态单元StateSpec@StateId部分支持 Apache Flink 和 Google Cloud Dataflow。

我找不到任何有关将其与GlobalWindow. 特别是,有没有办法有一个“状态垃圾收集”机制来摆脱根据某些配置一段时间未见的键的状态,同时仍然保持键的单一历史状态是见过的经常够吗?

或者,在这种情况下使用的状态量是否会发散,无法回收与一段时间未见的键对应的状态?

我还对 Apache Flink 或 Google Cloud Dataflow 是否支持潜在的解决方案感兴趣。

Flink 和直接运行器似乎有一些用于“状态 GC”的代码,但我不确定它的作用以及在使用全局窗口时是否相关。

0 投票
1 回答
440 浏览

google-cloud-dataflow - 是否可以将非文本文件读入谷歌数据流管道?

我想将 pdf 文件读入管道。但是,除了纯文本或 xml 之外,我还没有找到任何关于文件格式的 apache 梁示例。

0 投票
1 回答
1051 浏览

tensorflow - 如何使 tf.Transform(TensorFlow 的 Apache Beam 预处理)工作?

我正在尝试利用tf.Transform lib通过Apache Beam(Google DataFlow) 使用TensorFlow进行数据预处理。https://github.com/tensorflow/transform

这是我的设置

conda create -n tftransform python=2.7 source activate tftransform pip install tensorflow pip install tensorflow-transform pip install dill==0.2.6 git clone https://github.com/tensorflow/transform.git cd transform/ python setup.py install # for good measure ...

然后我尝试执行 simple_examplehttps://github.com/tensorflow/transform/blob/master/examples/simple_example.py): python examples/simple_example.py

我收到以下错误AttributeError: 'DType' object has no attribute 'dtype'

(导入时也有警告No handlers could be found for logger "oauth2client.contrib.multistore_file"

这是堆栈跟踪 Traceback (most recent call last): File "examples/simple_example.py", line 64, in <module> preprocessing_fn, tempfile.mkdtemp())) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 439, in __ror__ result = p.apply(self, pvalueish, label) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/apache_beam/pipeline.py", line 249, in apply pvalueish_result = self.runner.apply(transform, pvalueish) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 162, in apply return m(transform, input) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in apply_PTransform return transform.expand(input) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/tensorflow_transform/beam/impl.py", line 597, in expand self._output_dir) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 439, in __ror__ result = p.apply(self, pvalueish, label) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/apache_beam/pipeline.py", line 249, in apply pvalueish_result = self.runner.apply(transform, pvalueish) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 162, in apply return m(transform, input) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in apply_PTransform return transform.expand(input) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/tensorflow_transform/beam/impl.py", line 328, in expand self._preprocessing_fn, input_schema) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/tensorflow_transform/impl_helper.py", line 416, in run_preprocessing_fn inputs = _make_input_columns(schema) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/tensorflow_transform/impl_helper.py", line 218, in _make_input_columns placeholders = schema.as_batched_placeholders() File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/tensorflow_transform/tf_metadata/dataset_schema.py", line 87, in as_batched_placeholders for key, column_schema in self.column_schemas.items()} File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/tensorflow_transform/tf_metadata/dataset_schema.py", line 87, in <dictcomp> for key, column_schema in self.column_schemas.items()} File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/tensorflow_transform/tf_metadata/dataset_schema.py", line 133, in as_batched_placeholder return self.representation.as_batched_placeholder(self) File "/Users/XXX/anaconda/envs/tftransform/lib/python2.7/site-packages/tensorflow_transform/tf_metadata/dataset_schema.py", line 330, in as_batched_placeholder return tf.placeholder(column.domain.dtype, AttributeError: 'DType' object has no attribute 'dtype'

这个库生产准备好了吗?我怎样才能使这项工作?