2

我正在使用Go语言进行数据导入工作,我想将每个步骤编写为闭包,并使用通道进行通信,即每个步骤都是并发的。该问题可以通过以下结构来定义。

  1. 从数据源 获取小部件
    1. 将源 1 的翻译添加到Widgets
    2. 将源 2 中的翻译添加到Widgets
    3. 将来源 1 的定价添加到Widgets
    4. WidgetRevisions添加到Widgets
      1. 将源 1 中的翻译添加到WidgetRevisions
      2. 将源 2 中的翻译添加到WidgetRevisions

出于这个问题的目的,我只处理必须在新Widget上执行的前三个步骤。我假设在此基础上,第四步可以作为一个管道步骤来实现,它本身是根据一个子三步管道来实现的,以控制 *WidgetRevision*s

为此,我一直在编写一小段代码来为我提供以下 API:

// A Pipeline is just a list of closures, and a smart 
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()

// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)

// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)

我已经实现了它(在GistGo Playground上的代码),但它以 100% 的成功失败率陷入僵局

调用时出现僵局p.Execute(),因为可能其中一个通道最终无事可做,没有任何事情可以发送,也没有工作可做......

emit()在and中添加几行调试输出drain(),我看到以下输出,我相信闭包调用之间的流水线是正确的,并且我看到一些小部件被省略了。

Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000

以下是我对这种方法的一些了解:

  • 我相信这种设计“饿死”一个或另一个协程并不少见,我相信这就是死锁的原因
  • 我希望管道首先将东西输入其中(API将实现Pipeline.Process(*Widget)
    • 如果我能做到这一点,排水管可能是一个“步骤”,它没有将任何东西传递给下一个函数,这可能是一个更干净的 API
  • 我知道我没有实现任何类型的梯级缓冲区,所以我完全有可能只是让机器的可用内存过载
  • 我真的不相信这是好的 Go 风格......但它似乎利用了很多 Go 功能,但这并不是真正的好处
  • 由于 WidgetRevisions 也需要管道,我想让我的管道更通用,也许interface{}类型是解决方案,我不太了解 Go 以确定这是否明智。
  • 有人建议我考虑实现一个互斥锁来防止竞争条件,但我相信我会保存,因为闭包每个都将在 Widget 结构的一个特定单元上运行,但是我很乐意接受有关该主题的教育.

总结:我该如何修复这个代码,我应该修复这个代码,如果你是一个比我更有经验的围棋程序员,你会如何解决这个“顺序工作单元”问题?

4

1 回答 1

2

我只是不认为我会构建远离渠道的抽象。显式管道。

你可以很容易地为所有实际的管道操作创建一个函数,看起来像这样:

type StageMangler func(*Widget)

func stage(f StageMangler, chi <-chan *Widget, cho chan<- *Widget) {
    for widget := range chi {
                f(widget)
                cho <- widget
    }
    close(cho)
}

然后你可以传入func(w *Widget) { w.Whiz = true}或类似的阶段构建器。

那时您add可以收集这些及其工人数量,因此特定阶段可以更轻松地拥有n 个工人。

我只是不确定这是否比直接拼接通道更容易,除非您在运行时构建这些管道。

于 2012-12-11T05:43:10.083 回答