9

我想利用时间分区表的新 BigQuery 功能,但我不确定这在 1.6 版本的 Dataflow SDK 中是否可行。

查看BigQuery JSON API,要创建一天分区表,需要传入一个

"timePartitioning": { "type": "DAY" }

选项,但 com.google.cloud.dataflow.sdk.io.BigQueryIO 接口只允许指定 TableReference。

我想也许我可以预先创建表,然后通过 BigQueryIO.Write.toTableReference lambda 潜入分区装饰器......?还有其他人通过 Dataflow 创建/写入分区表成功吗?

这似乎与设置当前不可用的表过期时间类似。

4

6 回答 6

8

正如 Pavan 所说,绝对可以使用 Dataflow 写入分区表。您使用的DataflowPipelineRunner是流模式还是批处理模式?

您提出的解决方案应该有效。具体来说,如果您预先创建了一个设置了日期分区的表,那么您可以使用BigQueryIO.Write.toTableReferencelambda 来写入日期分区。例如:

/**
 * A Joda-time formatter that prints a date in format like {@code "20160101"}.
 * Threadsafe.
 */
private static final DateTimeFormatter FORMATTER =
    DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);

// This code generates a valid BigQuery partition name:
Instant instant = Instant.now(); // any Joda instant in a reasonable time range
String baseTableName = "project:dataset.table"; // a valid BigQuery table name
String partitionName =
    String.format("%s$%s", baseTableName, FORMATTER.print(instant));
于 2016-07-01T04:43:30.777 回答
7

我采用的方法(也适用于流模式):

  • 为传入记录定义自定义窗口
  • 将窗口转换为表/分区名称

    p.apply(PubsubIO.Read
                .subscription(subscription)
                .withCoder(TableRowJsonCoder.of())
            )
            .apply(Window.into(new TablePartitionWindowFn()) )
            .apply(BigQueryIO.Write
                           .to(new DayPartitionFunc(dataset, table))
                           .withSchema(schema)
                           .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            );
    

根据传入的数据设置窗口,可以忽略 End Instant,因为起始值用于设置分区:

public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {

private IntervalWindow assignWindow(AssignContext context) {
    TableRow source = (TableRow) context.element();
    String dttm_str = (String) source.get("DTTM");

    DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();

    Instant start_point = Instant.parse(dttm_str,formatter);
    Instant end_point = start_point.withDurationAdded(1000, 1);

    return new IntervalWindow(start_point, end_point);
};

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
    return Arrays.asList(assignWindow(c));
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
}

@Override
public IntervalWindow getSideInputWindow(BoundedWindow window) {
    if (window instanceof GlobalWindow) {
        throw new IllegalArgumentException(
                "Attempted to get side input window for GlobalWindow from non-global WindowFn");
    }
    return null;
}

动态设置表分区:

public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {

String destination = "";

public DayPartitionFunc(String dataset, String table) {
    this.destination = dataset + "." + table+ "$";
}

@Override
public String apply(BoundedWindow boundedWindow) {
    // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
    String dayString = DateTimeFormat.forPattern("yyyyMMdd")
                                     .withZone(DateTimeZone.UTC)
                                     .print(((IntervalWindow) boundedWindow).start());
    return destination + dayString;
}}

有没有更好的方法来达到同样的结果?

于 2016-11-29T10:35:11.690 回答
3

我相信当您不使用流式传输时应该可以使用分区装饰器。我们正在积极致力于通过流媒体支持分区装饰器。如果您今天在非流媒体模式下看到任何错误,请告诉我们。

于 2016-06-30T14:12:04.447 回答
1

Apache Beam 2.0 版支持开箱即用的分片 BigQuery 输出表。

于 2017-06-15T14:46:46.293 回答
0

如果您以table_name_YYYYMMDD格式传递表名,那么 BigQuery 会将其视为分片表,可以模拟分区表功能。请参阅文档:https ://cloud.google.com/bigquery/docs/partitioned-tables

于 2018-03-31T15:01:50.523 回答
0

我已经通过数据流将数据写入 bigquery 分区表。如果该分区中的数据已经存在,那么这些写入是动态的,那么我可以附加到它或覆盖它。

我已经用 Python 编写了代码。这是对 bigquery 的批处理模式写入操作。

client = bigquery.Client(project=projectName)
dataset_ref = client.dataset(datasetName)
table_ref = dataset_ref.table(bqTableName)       
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = skipLeadingRows
job_config.source_format = bigquery.SourceFormat.CSV
if tableExists(client, table_ref):            
    job_config.autodetect = autoDetect
    previous_rows = client.get_table(table_ref).num_rows
    #assert previous_rows > 0
    if allowJaggedRows is True:
        job_config.allowJaggedRows = True
    if allowFieldAddition is True:
        job_config._properties['load']['schemaUpdateOptions'] = ['ALLOW_FIELD_ADDITION']
    if isPartitioned is True:
        job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
    if schemaList is not None:
        job_config.schema = schemaList            
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
else:            
    job_config.autodetect = autoDetect
    job_config._properties['createDisposition'] = 'CREATE_IF_NEEDED'
    job_config.schema = schemaList
    if isPartitioned is True:             
        job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
    if schemaList is not None:
        table = bigquery.Table(table_ref, schema=schemaList)            
load_job = client.load_table_from_uri(gcsFileName, table_ref, job_config=job_config)        
assert load_job.job_type == 'load'
load_job.result()       
assert load_job.state == 'DONE'

它工作正常。

于 2018-06-29T07:32:41.440 回答