问题标签 [apache-beam-internals]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
944 浏览

apache-beam - TextIO.Read().From() vs TextIO.ReadFiles() over withHintMatchesManyFiles()

在我的用例中,从 Kafka 获取一组匹配的文件模式,

PCollection<String> filepatterns = p.apply(KafkaIO.read()...);

在这里,每个模式最多可以匹配 300 多个文件。

Q1。我如何使用TextIO.Read()来匹配来自 的数据PCollection,因为它withHintMatchesManyFiles()仅适用于TextIO.Read()不适用于TextIO.ReadFiles().

Q2。如果使用通过 FileIO.Match->FileIO.ReadMatch()->TextIO.ReadFiles() 的withHintMatchesManyFiles()路径,在该路径中不可用,它将如何影响读取性能?

Q3。上述用例还有其他优化路径吗?

0 投票
1 回答
727 浏览

google-cloud-dataflow - 在 Apache Beam java 中使用带有自定义 POJO Java 类的 List 时收到很多警告

我是 Apache 梁的新手,我正在使用 Apache 梁,并在 GCP 中使用 Dataflow 作为运行程序。执行管道时出现以下错误。

PCollection 类似于 PCollection< KV < String,List < Person > > > 和 PCollection< KV < String,Iterable < List < Person > > >

我已经将 Person 实现为可序列化的 POJO 类并覆盖 equals 和 hash 方法。但我认为我还需要为 person 编写自定义 ListCoder 并在管道中注册。我不知道如何解决这个问题,请帮助。

0 投票
2 回答
316 浏览

apache-beam - 知道在 apache Beam 直接运行器中运行的线程数

我有一个使用直接运行器运行的 java 中的 apache 梁程序。Apache Beam 使用线程来实现分布式处理。

  1. 在运行时如何知道 apache Beam 产生的线程数?
  2. 如何设置运行时使用的最大线程数?
0 投票
2 回答
668 浏览

apache-beam - 运行 Apache Beam Python SplittableDoFn 时出错

尝试时遇到错误pubsub io > splittable dofn

有人可以帮我检查代码中我可能做错的任何事情吗

代码:

0 投票
1 回答
167 浏览

python - 速度和内存权衡将 Apache Beam PCollection 一分为二

我有一个 PCollection ,其中每个元素都是一个键,值元组是这样的:(key, (value1,..,value_n) )

我需要将此 PCollection 拆分为两个处理分支。

与往常一样,我需要整个管道尽可能快并使用尽可能少的内存。

我想到了两个想法:

选项 1:使用具有多个输出的 DoFn 拆分 PColl

然后像这样构建管道

选项 2:在原始 PCollection 上使用两个不同的 DoFn

另一种选择是使用两个 DoFns 来读取和处理同一个 PCollection。仅将一个用于数据的“左侧”和“右侧”:

构建管道更简单......(而且您不需要跟踪您拥有哪些标记输出):

但是……更快吗?将需要比第一个更少的内存?

(我正在考虑第一个选项可能会被跑步者融合 - 而不仅仅是数据流跑步者)。

0 投票
1 回答
118 浏览

python - 与字符串列表相比,带有字典列表的 beam.Create() 非常慢

我正在使用 Dataflow 处理具有大约 400 万个特征(总共约 2GB)的 Shapefile 并将几何加载到 BigQuery 中,因此在我的管道开始之前,我将 shapefile 特征提取到一个列表中,并使用beam.Create(features). 有两种方法可以创建初始功能列表:

  1. 将每个特征导出为 json 字符串,后续DoFns 需要将其解析为 dict:
  1. 导出从 JSON 字符串预解析的 python dict

使用选项 1 时,beam.Create(features)需要一分钟左右,管道继续。使用选项 2,beam.Create(features)在 6 核 i7 上需要 3 多个小时,并且似乎在这里花费了很多时间:

这是传递字典列表时trivial_inference变慢的原因吗?beam.Create我可以配置beam.Create为不做它试图在那里做的任何事情,或者以其他方式加快它的速度,这样字典列表就不会比字符串列表慢 100 倍?

0 投票
1 回答
182 浏览

google-cloud-platform - 使用 Apache Beam 加载文件到数据库

我需要将一个文件加载到我的数据库中,但在此之前我必须根据一些文件数据验证数据库中是否存在数据。例如,假设我在一个文件中有 5 条记录,那么我必须在数据库中检查 5 次以获取单独的记录。

那么我怎样才能动态地得到这个值呢?我们必须传递动态值而不是 2 (preparedStatement.setString(1, "2");)

在这里,我们正在创建一个 Dataflow 管道,它使用 Apache Beam 将数据加载到数据库中。现在我们创建一个管道对象并创建一个管道。使用 PCollection 我们存储到数据库中。

0 投票
1 回答
87 浏览

apache-beam - 我很容易看到带有# of csv 文件的apache 光束比例,但是一个csv 中的# 行呢?

我目前正在阅读这篇文章和 apache 梁文档https://medium.com/@mohamed.t.esmat/apache-beam-bites-10b8ded90d4c

我读过的每一篇文章都是关于 N 个文件的。在我们的用例中,我们每次都会收到一个新文件的 pubsub 事件来启动工作。我不需要按文件缩放,因为我可以使用 cloudrun。我需要根据文件中的行数进行缩放。IE。一个 100 行文件和一个 100,000,000 行文件,我希望看到大约在同一时间处理。

如果我按照上面的文章,我给它一个文件而不是很多,在幕后,apache Beam 将如何扩展。它如何知道将 1 个节点用于 100 行,而 1,000,000 行文件可能使用 1000 个节点。毕竟,它不知道文件中有多少行开始。

数据流不随文件中的行数缩放吗?我在想也许节点 1 会读取 0-99 行,节点 2 会读取/丢弃 0-99,然后读取 100-199。

有谁知道幕后发生了什么,所以我最终不会浪费数小时的测试时间来试图弄清楚它是否相对于文件中的行数进行缩放?

编辑:相关问题但不是同一个问题 -如何使用 Beam 读取大型 CSV?

我认为数据流可能会受到读取整个文件的一个节点的限制,我可以在普通计算机上执行此操作,但我真的想知道它是否会比这更好。

另一种说法是在幕后,这条线实际上在做什么

可能是 1 个节点读取然后发送到一堆其他节点,但是当 csv 为大数据大小时,如果只有 1 个 csv 读取器,则存在明显的瓶颈。

关于我的想法的更多背景。我确实看到了一个“HadoopFileSystem”连接器(尽管我们与 GCP Storage 交谈)。我的猜测是 HadoopFileSystem 的操作基于 HDFS 具有代表文件的“分区文件”这一事实,因此它已经是 N 个文件。我们使用谷歌云存储,所以它只是一个 csv 文件而不是 N 个文件。虽然 HDFS 连接器可以启动与分区相同数量的节点,但 TextIO 只能看到一个 csv 文件,仅此而已。

0 投票
1 回答
329 浏览

google-cloud-dataflow - 窗格和窗口 apache 梁之间的区别

窗格和窗口有什么区别?传入的元素被分组到窗口中。那么窗格包含什么?

我从梁文档中获取了以下代码

每个元素是否属于一个窗格?还是多个窗格?需要一个简单的类比来理解窗格和窗口

0 投票
1 回答
195 浏览

apache-spark - 等效于 Apache Beam 中的重新分区

在 Spark 中,如果我们必须重新洗牌数据,我们可以使用数据帧的重新分区。在 apache Beam 中为 pcollection 做同样的事情的方法是什么?

在 pyspark 中,