我正在使用Go语言进行数据导入工作,我想将每个步骤编写为闭包,并使用通道进行通信,即每个步骤都是并发的。该问题可以通过以下结构来定义。
- 从数据源
获取小部件
- 将源 1 的翻译添加到Widgets。
- 将源 2 中的翻译添加到Widgets。
- 将来源 1 的定价添加到Widgets。
- 将WidgetRevisions添加到Widgets。
- 将源 1 中的翻译添加到WidgetRevisions
- 将源 2 中的翻译添加到WidgetRevisions
出于这个问题的目的,我只处理必须在新Widget上执行的前三个步骤。我假设在此基础上,第四步可以作为一个管道步骤来实现,它本身是根据一个子三步管道来实现的,以控制 *WidgetRevision*s
为此,我一直在编写一小段代码来为我提供以下 API:
// A Pipeline is just a list of closures, and a smart
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()
// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)
// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)
我已经实现了它(在Gist或Go Playground上的代码),但它以 100% 的成功失败率陷入僵局
调用时出现僵局p.Execute()
,因为可能其中一个通道最终无事可做,没有任何事情可以发送,也没有工作可做......
emit()
在and中添加几行调试输出drain()
,我看到以下输出,我相信闭包调用之间的流水线是正确的,并且我看到一些小部件被省略了。
Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000
以下是我对这种方法的一些了解:
- 我相信这种设计“饿死”一个或另一个协程并不少见,我相信这就是死锁的原因
- 我希望管道首先将东西输入其中(API将实现
Pipeline.Process(*Widget)
- 如果我能做到这一点,排水管可能是一个“步骤”,它没有将任何东西传递给下一个函数,这可能是一个更干净的 API
- 我知道我没有实现任何类型的梯级缓冲区,所以我完全有可能只是让机器的可用内存过载
- 我真的不相信这是好的 Go 风格......但它似乎利用了很多 Go 功能,但这并不是真正的好处
- 由于 WidgetRevisions 也需要管道,我想让我的管道更通用,也许
interface{}
类型是解决方案,我不太了解 Go 以确定这是否明智。 - 有人建议我考虑实现一个互斥锁来防止竞争条件,但我相信我会保存,因为闭包每个都将在 Widget 结构的一个特定单元上运行,但是我很乐意接受有关该主题的教育.
总结:我该如何修复这个代码,我应该修复这个代码,如果你是一个比我更有经验的围棋程序员,你会如何解决这个“顺序工作单元”问题?