0

所以我有一些管道。我必须逐行比较 2 个文件,例如加载到数据库。我想有一些超时来处理单行,如果它达到了就中断管道。输入端点是函数Run()。在这里,我们进行一些验证并使用命令创建结构

之后在 fuction 中run(),我们开始在单个 goroutine 中处理 stdout,并comm在 ubuntu 中运行主进程命令。好吧,它工作正常,我在比赛中检查了它: go build --racego run --race。没有问题和输出。但我有测试,检查简单的逻辑。它工作得几乎很好,1 次前 10 次运行它失败了。在我对此进行调查之后,我在这个测试中发现了竞争条件。有人可以解释一下,出了什么问题?

进入

func (cmWk *CommWorker) Run(path1, path2, category, subcategory string) error {
    err := cmWk.opt.validate()
    if err != nil {
        return err
    }
    if notExistFile(path1) {
        return &AESCommError{fmt.Sprintf(
            "Check your file path [%s].It is invalid", path1),
        }
    }
    if isDir(path1) {
        return &AESCommError{fmt.Sprintf(
            "Check your file path [%s].It is invalid", path1),
        }
    }
    // If path2 does not exist, we create this file
    if notExistFile(path2) {
        err := createIfNotExist(path2)
        if err != nil {
            return err
        }
    }
    cmWk.category = category
    cmWk.subcategory = subcategory
    cmWk.aesComm = NewAESComm(path1, path2, cmWk.opt)
    return cmWk.run()
}
// NewAESComm returns AESComm structure with described options
func NewAESComm(path1, path2 string, options *AESCommOptions) *AESComm {
    arguments := append(getArguments(options), path1, path2)
    return &AESComm{
        FilePath1: path1,
        FilePath2: path2,
        cmd:       exec.Command("comm", arguments...),
    }
}
// Run starts comm process with options
func (cmWk *CommWorker) run() error {
    stdout, err := cmWk.aesComm.StdoutPipe()
    if err != nil {
        return err
    }
    defer stdout.Close()

    go cmWk.processStdout(stdout)

    return cmWk.aesComm.Run()
}
// Run start comm task
func (comm AESComm) Run() error {
    if err := comm.cmd.Start(); err != nil {
        return err
    }
    return comm.cmd.Wait()
}
func (cmWk *CommWorker) processStdout(stdout io.ReadCloser) {
    scanner := bufio.NewScanner(stdout)
    timer := time.NewTimer(cmWk.pTimeout)
    erCh := make(chan error, 1)
    for scanner.Scan() {
        line := scanner.Text()
        if strings.HasPrefix(line, "#") {
            continue
        }
        if err := cmWk.process(line, timer, erCh); err != nil {
            cmWk.interrupt(err)
            return
        }

        timer.Reset(cmWk.pTimeout)
    }
    if err := cmWk.clear(erCh); err != nil {
        cmWk.interrupt(err)
    }
}
func (cmWk *CommWorker) process(line string, timer *time.Timer, erCh chan error) error {
    erCh <- cmWk.processor.ProcessLine(line, cmWk.category, cmWk.subcategory)
    select {
    case <-timer.C:
        return fmt.Errorf("got timeout error handling line")
    default:
        return <-erCh
    }
}
func (cmWk *CommWorker) interrupt(err error) {
    cmWk.Error = err
    _ = cmWk.aesComm.cmd.Process.Signal(signal)
}

破损测试

func TestCommWorker_Run5(t *testing.T) {
    proc := &testUtils.TestProcessor{Fail: true, Lines: []string{}} // simple test processor that fails
    createWriteFile(srs, fp1)
    wk := NewCommWorker(&AESCommOptions{CheckOrder: true, PTimeOut: pTimeout}, proc)
    err := wk.Run(fp1, fp2, testCategory, testSubCategory)
    assert.Nil(t, err)
    assert.Error(t, wk.Err()) // race condition
}

输出

==================
WARNING: DATA RACE
Read at 0x00c00048e3a8 by goroutine 15:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Err()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:144 +0x4b9
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:103 +0x4ca
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Previous write at 0x00c00048e3a8 by goroutine 19:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).interrupt()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:119 +0x22e
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).processStdout()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:107 +0x219

Goroutine 15 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1168 +0x5bb
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1439 +0xa6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1437 +0x612
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1345 +0x3b3
  main.main()
      _testmain.go:73 +0x236

Goroutine 19 (running) created at:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:54 +0x169
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
==================
--- FAIL: TestCommWorker_Run5 (0.08s)
    testing.go:1038: race detected during execution of test
==================
WARNING: DATA RACE
Read at 0x00c0005160a0 by goroutine 19:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).interrupt()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:120 +0x2ad
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).processStdout()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:107 +0x219

Previous write at 0x00c0005160a0 by goroutine 15:
  os/exec.(*Cmd).Start()
      /usr/local/go/src/os/exec/exec.go:422 +0x8e4
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.AESComm.Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWrapper.go:93 +0x3c
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:56 +0x1fa
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 19 (running) created at:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:54 +0x169
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 15 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1168 +0x5bb
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1439 +0xa6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1437 +0x612
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1345 +0x3b3
  main.main()
      _testmain.go:73 +0x236
==================
==================
WARNING: DATA RACE
Read at 0x00c0004b5890 by goroutine 19:
  os.(*Process).signal()
      /usr/local/go/src/os/exec_unix.go:65 +0x64
  os.(*Process).Signal()
      /usr/local/go/src/os/exec.go:131 +0x2f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).interrupt()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:120 +0x256
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).processStdout()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:107 +0x219

Previous write at 0x00c0004b5890 by goroutine 15:
  os.newProcess()
      /usr/local/go/src/os/exec.go:25 +0x5ee
  os.startProcess()
      /usr/local/go/src/os/exec_posix.go:62 +0x668
  os.StartProcess()
      /usr/local/go/src/os/exec.go:102 +0x92
  os/exec.(*Cmd).Start()
      /usr/local/go/src/os/exec/exec.go:422 +0x8af
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.AESComm.Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWrapper.go:93 +0x3c
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:56 +0x1fa
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 19 (running) created at:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:54 +0x169
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 15 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1168 +0x5bb
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1439 +0xa6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1437 +0x612
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1345 +0x3b3
  main.main()
      _testmain.go:73 +0x236
==================
FAIL
exit status 1

4

1 回答 1

0

我认为问题存在于从run. 它最终调用interruptwhich 写入CommWorker'sError属性。在您的测试中,您生成 goroutine,然后读取该Error属性 ( wk.Err()),但不能保证生成的子程序goroutine做了什么。

在检查结果之前,您需要做一些事情来确保cmWk.processStdout(stdout)(goroutine 中的东西)绝对完整。您可以在没有 goroutine 的情况下按顺序运行它,或者您可以将 a 引入WaitGroupgoroutineCommWorker用来指示完成并等待测试。

于 2021-03-29T12:33:31.437 回答