1

GroupByKey我正在编写一个管道代码,它将在 DataFlow 的批处理和流模式下使用,并且在批处理模式下使用时我遇到了 OOM 问题。下面的代码显示了问题:当我有一个大文件时,GroupByKey似乎将所有内容都保存在内存中,仅在输入完成后才发出值。我尝试使用触发器来强制触发事件,但失败了。我找不到在大文件上使用此转换的任何方法。

如何在 beam go 中实现一个包含分组并且可以有效处理大文件的管道?

package sisubqio_test

import (
    "context"
    "flag"
    "fmt"
    "io"
    "os"
    "strings"
    "sync/atomic"
    "testing"
    "time"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func TestWriter(t *testing.T) {
    mustNotFail := func(err error) {
        if err != nil {
            t.Fatal(err)
        }
    }

    // test file with a few lines of text
    fName := "in.tmp.txt"
    f, err := os.OpenFile(fName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
    mustNotFail(err)
    defer func() {
        mustNotFail(f.Close())
        mustNotFail(os.Remove(fName))
    }()
    for i := 0; i < 10; i++ {
        _, err = fmt.Fprintf(f, "line %d\n", i)
        mustNotFail(err)
    }

    _, err = f.Seek(0, io.SeekStart)
    mustNotFail(err)

    flag.Parse()
    beam.Init()

    pipeline, s := beam.NewPipelineWithRoot()
    col := textio.Read(s, fName)

    // add timestamp to messages: each message has a timestamp 20s after
    // the previous one
    now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
    var counter int32
    col = beam.ParDo(s, func(line beam.X) (typex.EventTime, beam.X) {
        i := atomic.AddInt32(&counter, 1) - 1
        evTime := mtime.Time(mtime.FromTime(now.Add(20 * time.Duration(i) * time.Second)).Milliseconds())
        t.Logf("[0] input event, time=%v", evTime)
        return evTime, line
    }, col)

    // add a window and inspect events, when emitted
    col = beam.WindowInto(s,
        window.NewFixedWindows(time.Minute),
        col,
        beam.Trigger(window.TriggerAlways()), // I tried all triggers here; makes no difference
    )
    col = beam.ParDo(s, func(w typex.Window, e string) string {
        t.Logf("[1] window: %v", w)
        return e
    }, col)

    // add a key and group by it; inspect events, when emitted
    col = beam.AddFixedKey(s, col)
    col = beam.ParDo(s, func(et typex.EventTime, group int, x beam.X) (int, beam.X) {
        t.Logf("[2] at %v got (group %d)",
            time.UnixMilli(int64(et)),
            group)
        return group, x
    }, col)

    // ISSUE IS HERE
    // It doesn't matter the trigger I use, it looks like GroupByKey
    // always wants to hold everything into memory and only then
    // emit it's outputs. With large files is always OOMs.
    col = beam.GroupByKey(s, col)
    beam.ParDo0(s, func(w typex.Window, group int, valIter func(*string) bool) {
        sb := strings.Builder{}
        fmt.Fprintf(&sb, "[3] win=%v out group=%d", w, group)
        var elm string
        for valIter(&elm) {
            fmt.Fprintf(&sb, " %s;", elm)
        }
        t.Log(sb.String())
    }, col)

    mustNotFail(beamx.Run(context.Background(), pipeline))
}

输出:

    writer_test.go:58: [0] input event, time=1577836800000
    writer_test.go:69: [1] window: [1577836800000:1577836860000)
    writer_test.go:79: [2] at 2020-01-01 01:00:00 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836820000
    writer_test.go:69: [1] window: [1577836800000:1577836860000)
    writer_test.go:79: [2] at 2020-01-01 01:00:20 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836840000
    writer_test.go:69: [1] window: [1577836800000:1577836860000)
    writer_test.go:79: [2] at 2020-01-01 01:00:40 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836860000
    writer_test.go:69: [1] window: [1577836860000:1577836920000)
    writer_test.go:79: [2] at 2020-01-01 01:01:00 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836880000
    writer_test.go:69: [1] window: [1577836860000:1577836920000)
    writer_test.go:79: [2] at 2020-01-01 01:01:20 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836900000
    writer_test.go:69: [1] window: [1577836860000:1577836920000)
    writer_test.go:79: [2] at 2020-01-01 01:01:40 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836920000
    writer_test.go:69: [1] window: [1577836920000:1577836980000)
    writer_test.go:79: [2] at 2020-01-01 01:02:00 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836940000
    writer_test.go:69: [1] window: [1577836920000:1577836980000)
    writer_test.go:79: [2] at 2020-01-01 01:02:20 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836960000
    writer_test.go:69: [1] window: [1577836920000:1577836980000)
    writer_test.go:79: [2] at 2020-01-01 01:02:40 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836980000
    writer_test.go:69: [1] window: [1577836980000:1577837040000)
    writer_test.go:79: [2] at 2020-01-01 01:03:00 +0100 CET got (group 0)
    writer_test.go:95: [3] win=[1577836920000:1577836980000) out group=0 line 6; line 7; line 8;
    writer_test.go:95: [3] win=[1577836980000:1577837040000) out group=0 line 9;
    writer_test.go:95: [3] win=[1577836800000:1577836860000) out group=0 line 0; line 1; line 2;
    writer_test.go:95: [3] win=[1577836860000:1577836920000) out group=0 line 3; line 4; line 5;

编辑:我发现与触发器窗口相关的 Jira 票证,在撰写本文时,让人相信触发器,特别是触发器传播是 WIP。

4

1 回答 1

2

Beam使用 map 和 reduce 操作。Map(转换)可以在不同的工作人员/虚拟机上并行完成。Reduce需要知道要执行的所有元素,因此它将所有元素加载到内存中,然后执行reducegroupBy操作。

您有 2 个解决方案:

  • 您可以创建窗口来仅处理大文件的块。但是,您groupBy不会是全局的,而是每个窗口。

  • 您也可以尝试新的Dataflow主要选项。它是无服务器且完全可扩展的。承诺是消除所有 OOM 错误(我只在 Java 中得到,我从不使用 Beam Go SDK)

您还可以增加工作人员的内存,但这不是一个可扩展的解决方案(而且成本更高!)。主要选项是好的选项(但仍在预览中)

于 2021-09-23T20:33:12.133 回答