全部。
我有一个小型解析器,可以将找到的数据写入 Postgres,作为我使用的数据库框架https://github.com/jackc/pgx。
我将解析后的数据从各种 goroutine 写入无缓冲通道。
我有特殊的 goroutine,我从这个通道读取数据并将其写入数据库。
我正在调试一个应用程序,它在某个时间后永远挂起(可能等待与池中数据库的免费连接)。
如何确定哪个 goroutine 阻塞了执行?
我听说有一个 pprof,但我从未使用过它。谢谢。
最小的例子:我有这样的结构
ParsingResults struct {
parser DataParser
data []*common.Data
err error
}
在单独的 goroutine 中,我像这样初始化无缓冲通道:
results = make(chan *ParsingResults)
然后我启动各种 goroutines,在其中运行解析器:
go fetcher.Parse(results)
每个解析器收集数据并将其传递给通道,如下所示:
var (
results chan<- *ParsingResults
pageResults *ParsingResults
)
results <- pageResults
if pageResults.err != nil {
return
}
time.Sleep(p.provider.DelayBetweenPages)
并在一个单独的 goroutine 中启动这样的函数:
func (fetcher *Fetcher) waitForResults(ctx context.Context) {
for {
select {
case results := <-fetcher.resultsChannel:
provider := results.parser.GetProvider()
if results.err != nil {
common.Logger.Errorw("failed to fetch data from provider",
"provider", provider.Url,
"error", results.err)
continue
}
data := fetcher.removeDuplicates(results.data)
common.Logger.Infow("fetched some data",
"provider", provider.Url,
"rows_count", len(results.data),
"unique_rows_count", len(data))
_, err := fetcher.Repo.SaveFetchedData(ctx, data)
if err != nil {
common.Logger.Errorw("failed to save fetched data",
"provider", provider.Url,
"error", err)
continue
}
common.Logger.Infow("fetched data were saved successfully",
"provider", provider.Url,
"rows_count", len(results.data),
"unique_rows_count", len(data))
case <-ctx.Done():
return
default:
common.Logger.Infow("for debugging's sake! waiting for some data to arrive!")
}
}
}
数据在此函数中存储在数据库中:
func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
if len(rows) == 0 {
return 0, nil
}
baseQB := sq.Insert(db.DataTableName).
Columns(saveFetchedDataCols...).
PlaceholderFormat(sq.Dollar)
batch := &pgx.Batch{}
for _, p := range rows {
curQB := baseQB.Values(p.Row1, p.Row2, sq.Expr("NOW()"))
curQuery, curArgs, err := curQB.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to generate SQL query: %w", err)
}
batch.Queue(curQuery, curArgs...)
}
br := repo.pool.SendBatch(ctx, batch)
ct, err := br.Exec()
if err != nil {
return 0, fmt.Errorf("failed to run SQL query batch: %w", err)
}
return ct.RowsAffected(), nil
}