2

我正在编写一些 Go 软件,负责下载和解析大量 JSON 文件并将解析后的数据写入 sqlite 数据库。我目前的设计有 10 个 go 例程,同时下载/解析这些 JSON 并将它们传送到另一个 go 例程,该例程的唯一工作是监听特定频道并将频道内容写入数据库。

系统会在所有写入完成后执行一些额外的读取操作,这会导致查询返回错误结果的问题,因为并非所有数据都已写入表。因为我提取的 JSON 数据是动态的,所以我没有简单的方法知道所有数据何时都已写入。

我考虑了两种解决方案的可能性,尽管我对这两种解决方案都不太满意:

  1. 在频道上收听并等待它为空。这原则上应该有效,但是,它不能确保数据已被写入,它只确保它已在通道上接收到。
  2. 同步对数据库的访问。这在原则上应该再次起作用,但是,我仍然需要将查询操作放在所有写操作之后。

我是否应该考虑其他任何设计决策来纠正这个问题?作为参考,我用来提取这些数据的库是 go-colly 和 go-sqlite3。感谢所有的帮助!

4

1 回答 1

1

你可以使用一个sync.WaitGroup

例如

package main

import "sync"

func main() {
    // Some sort of job queue for your workers to process. This job queue should be closed by the process
    // that populates it with items. Once the job channel is closed, any for loops ranging over the channel
    // will read items until there are no more items, and then break.
    jobChan := make(chan JobInfo)

    // Populate the job queue here...
    // ...
    close(jobChan)

    // We now have a full queue of jobs that can't accept new jobs because the channel is closed.

    // Number of concurrent workers.
    workerCount := 10

    // Initialize the WaitGroup.
    wg := sync.WaitGroup{}
    wg.Add(workerCount)

    // Create the worker goroutines.
    for i := 0; i < workerCount; i++ {
        go func() {
            // When the jobChan is closed, and no more jobs are available on the queue, the for loop
            // will exit, causing wg.Done() to be called, and the anonymous function to exit.
            for job := range jobChan {
                // Process job.
            }
            wg.Done()
        }()
    }

    // Wait for all workers to call wg.Done()
    wg.Wait()

    // Whatever you want to do after all queue items have been processed goes here.
    // ...
}
于 2020-06-02T18:56:29.893 回答