GroupByKey
我正在编写一个管道代码,它将在 DataFlow 的批处理和流模式下使用,并且在批处理模式下使用时我遇到了 OOM 问题。下面的代码显示了问题:当我有一个大文件时,GroupByKey
似乎将所有内容都保存在内存中,仅在输入完成后才发出值。我尝试使用触发器来强制触发事件,但失败了。我找不到在大文件上使用此转换的任何方法。
如何在 beam go 中实现一个包含分组并且可以有效处理大文件的管道?
package sisubqio_test
import (
"context"
"flag"
"fmt"
"io"
"os"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func TestWriter(t *testing.T) {
mustNotFail := func(err error) {
if err != nil {
t.Fatal(err)
}
}
// test file with a few lines of text
fName := "in.tmp.txt"
f, err := os.OpenFile(fName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
mustNotFail(err)
defer func() {
mustNotFail(f.Close())
mustNotFail(os.Remove(fName))
}()
for i := 0; i < 10; i++ {
_, err = fmt.Fprintf(f, "line %d\n", i)
mustNotFail(err)
}
_, err = f.Seek(0, io.SeekStart)
mustNotFail(err)
flag.Parse()
beam.Init()
pipeline, s := beam.NewPipelineWithRoot()
col := textio.Read(s, fName)
// add timestamp to messages: each message has a timestamp 20s after
// the previous one
now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
var counter int32
col = beam.ParDo(s, func(line beam.X) (typex.EventTime, beam.X) {
i := atomic.AddInt32(&counter, 1) - 1
evTime := mtime.Time(mtime.FromTime(now.Add(20 * time.Duration(i) * time.Second)).Milliseconds())
t.Logf("[0] input event, time=%v", evTime)
return evTime, line
}, col)
// add a window and inspect events, when emitted
col = beam.WindowInto(s,
window.NewFixedWindows(time.Minute),
col,
beam.Trigger(window.TriggerAlways()), // I tried all triggers here; makes no difference
)
col = beam.ParDo(s, func(w typex.Window, e string) string {
t.Logf("[1] window: %v", w)
return e
}, col)
// add a key and group by it; inspect events, when emitted
col = beam.AddFixedKey(s, col)
col = beam.ParDo(s, func(et typex.EventTime, group int, x beam.X) (int, beam.X) {
t.Logf("[2] at %v got (group %d)",
time.UnixMilli(int64(et)),
group)
return group, x
}, col)
// ISSUE IS HERE
// It doesn't matter the trigger I use, it looks like GroupByKey
// always wants to hold everything into memory and only then
// emit it's outputs. With large files is always OOMs.
col = beam.GroupByKey(s, col)
beam.ParDo0(s, func(w typex.Window, group int, valIter func(*string) bool) {
sb := strings.Builder{}
fmt.Fprintf(&sb, "[3] win=%v out group=%d", w, group)
var elm string
for valIter(&elm) {
fmt.Fprintf(&sb, " %s;", elm)
}
t.Log(sb.String())
}, col)
mustNotFail(beamx.Run(context.Background(), pipeline))
}
输出:
writer_test.go:58: [0] input event, time=1577836800000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836820000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836840000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836860000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836880000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836900000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836920000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836940000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836960000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836980000
writer_test.go:69: [1] window: [1577836980000:1577837040000)
writer_test.go:79: [2] at 2020-01-01 01:03:00 +0100 CET got (group 0)
writer_test.go:95: [3] win=[1577836920000:1577836980000) out group=0 line 6; line 7; line 8;
writer_test.go:95: [3] win=[1577836980000:1577837040000) out group=0 line 9;
writer_test.go:95: [3] win=[1577836800000:1577836860000) out group=0 line 0; line 1; line 2;
writer_test.go:95: [3] win=[1577836860000:1577836920000) out group=0 line 3; line 4; line 5;
编辑:我发现与触发器和窗口相关的 Jira 票证,在撰写本文时,让人相信触发器,特别是触发器传播是 WIP。