2

全部。

我有一个小型解析器,可以将找到的数据写入 Postgres,作为我使用的数据库框架https://github.com/jackc/pgx

我将解析后的数据从各种 goroutine 写入无缓冲通道。

我有特殊的 goroutine,我从这个通道读取数据并将其写入数据库。

我正在调试一个应用程序,它在某个时间后永远挂起(可能等待与池中数据库的免费连接)。

如何确定哪个 goroutine 阻塞了执行?

我听说有一个 pprof,但我从未使用过它。谢谢。

最小的例子:我有这样的结构

ParsingResults struct {
    parser  DataParser
    data []*common.Data
    err     error
}

在单独的 goroutine 中,我像这样初始化无缓冲通道:

results = make(chan *ParsingResults)

然后我启动各种 goroutines,在其中运行解析器:

go fetcher.Parse(results)

每个解析器收集数据并将其传递给通道,如下所示:

var (
    results chan<- *ParsingResults
    pageResults *ParsingResults
)
results <- pageResults
if pageResults.err != nil {
    return
}

time.Sleep(p.provider.DelayBetweenPages)

并在一个单独的 goroutine 中启动这样的函数:

func (fetcher *Fetcher) waitForResults(ctx context.Context) {
    for {
        select {
        case results := <-fetcher.resultsChannel:
            provider := results.parser.GetProvider()
            if results.err != nil {
                common.Logger.Errorw("failed to fetch data from provider",
                    "provider", provider.Url,
                    "error", results.err)
                continue
            }
            data := fetcher.removeDuplicates(results.data)
            common.Logger.Infow("fetched some data",
                "provider", provider.Url,
                "rows_count", len(results.data),
                "unique_rows_count", len(data))
            _, err := fetcher.Repo.SaveFetchedData(ctx, data)
            if err != nil {
                common.Logger.Errorw("failed to save fetched data",
                    "provider", provider.Url,
                    "error", err)
                continue
            }
            common.Logger.Infow("fetched data were saved successfully",
                "provider", provider.Url,
                "rows_count", len(results.data),
                "unique_rows_count", len(data))
        case <-ctx.Done():
            return
        default:
            common.Logger.Infow("for debugging's sake! waiting for some data to arrive!")
        }
    }
}

数据在此函数中存储在数据库中:

func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
    if len(rows) == 0 {
        return 0, nil
    }

    baseQB := sq.Insert(db.DataTableName).
        Columns(saveFetchedDataCols...).
        PlaceholderFormat(sq.Dollar)

    batch := &pgx.Batch{}
    for _, p := range rows {
        curQB := baseQB.Values(p.Row1, p.Row2, sq.Expr("NOW()"))
        curQuery, curArgs, err := curQB.ToSql()

        if err != nil {
            return 0, fmt.Errorf("failed to generate SQL query: %w", err)
        }
        batch.Queue(curQuery, curArgs...)
    }

    br := repo.pool.SendBatch(ctx, batch)
    ct, err := br.Exec()
    if err != nil {
        return 0, fmt.Errorf("failed to run SQL query batch: %w", err)
    }

    return ct.RowsAffected(), nil
}
4

1 回答 1

0

我在 pprof 中检查了完整的 goroutine 堆栈。所以错误是我在处理完批处理请求的结果后没有从池中释放连接。因此,10 个请求通过,池被完全填满,执行线程被阻塞。伙计们,你们都是最棒的。谢谢您的帮助。

于 2022-02-27T16:33:32.523 回答