0

在此处输入图像描述

上图是一个大查询表的表模式,它是在 spotify 的 scio 上运行的 apache Beam 数据流作业的输入。如果您不熟悉 scio,它是 Apache Beam Java SDK 的 Scala 包装器。特别是,“SCollection 包装 PCollection”。我在 BigQuery 磁盘上的输入表是 136 GB,但在数据流 UI 中查看我的 SCollection 的大小时,它是 504.91 GB。 在此处输入图像描述

我知道 BigQuery 在数据压缩和表示方面可能要好得多,但大小增加 3 倍以上似乎相当高。非常清楚,我使用的是类型安全大查询案例类(我们称之为 Clazz)表示,所以我的 SCollection 是 SCollection[Clazz] 类型而不是 SCollection[TableRow]。TableRow 是 Java JDK 中的原生表示。关于如何降低内存分配的任何提示?它与我输入中的特定列类型有关:字节、字符串、记录、浮点数等?

4

2 回答 2

4

这可能是由于 TableRow 格式包含列的字符串名称,这增加了大小。

考虑使用以下内容来创建对象的 PCollection 而不是 TableRows。这允许您直接读入与架构匹配的对象,这应该会稍微减少数据大小。

  /**
   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
   * each row of the table or query result, parsed from the BigQuery AVRO format using the specified
   * function.
   *
   * <p>Each {@link SchemaAndRecord} contains a BigQuery {@link TableSchema} and a
   * {@link GenericRecord} representing the row, indexed by column name. Here is a
   * sample parse function that parses click events from a table.
   *
   * <pre>{@code
   * class ClickEvent { long userId; String url; ... }
   *
   * p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, ClickEvent>() {
   *   public ClickEvent apply(SchemaAndRecord record) {
   *     GenericRecord r = record.getRecord();
   *     return new ClickEvent((Long) r.get("userId"), (String) r.get("url"));
   *   }
   * }).from("...");
   * }</pre>
   */
  public static <T> TypedRead<T> read(
      SerializableFunction<SchemaAndRecord, T> parseFn) {
于 2018-05-12T00:02:17.723 回答
0

使用 Kryo 进行案例类序列化并不是最有效的,因为 Kryo 会序列化每个对象的完整类名。您可以通过注册经过扩展洗牌的类来解决此问题。

https://github.com/spotify/scio/wiki/FAQ#how-do-i-use-custom-kryo-serializers

这将在 Scio 0.7.x 中通过新的基于 Magnolia 宏的编码器派生来解决。

https://github.com/spotify/scio/wiki/Coders

于 2018-09-21T09:54:47.543 回答