24

Go 有一个适用于通道的 select 语句。从文档中:

select 语句让 goroutine 等待多个通信操作。

一个 select 阻塞,直到它的一个 case 可以运行,然后它执行那个 case。如果多个都准备好了,它会随机选择一个。

是否有与以下代码等效的 Python:

package main

import "fmt"

func main() {
    c1 := make(chan int)
    c2 := make(chan int)
    quit := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            c1 <- i
        }
        quit <- 0
    }()

    go func() {
        for i := 0; i < 2; i++ {
            c2 <- i
        }
    }()

    for {
        select {
        case <-c1:
            fmt.Println("Received value from c1")
        case <-c2:
            fmt.Println("Received value from c2")
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

该程序的输出:

Received value from c1
Received value from c1
Received value from c2
Received value from c1
Received value from c2
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
quit
4

7 回答 7

16

这是一个非常直接的翻译,但是“如果多个已准备好,则选择哪个”部分的工作方式有所不同-它只是采用首先出现的内容。这也就像使用gomaxprocs(1).

import threading
import Queue

def main():
    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    combined = Queue.Queue(maxsize=0)

    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))

    t = threading.Thread(target=listen_and_forward, args=(c1,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(c2,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(quit,))
    t.daemon = True
    t.start()

    while True:
        which, message = combined.get()
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()

基本的变化是使用组合消息的线程来模拟选择。如果你打算经常使用这种模式,你可能会写一些选择代码:

import threading
import Queue

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = threading.Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()

但...

请注意,这个选择并不完全是 go ,尽管它对您的程序无关紧要 - 一个 goroutine 可以在一个通道上发送一个结果,如果我们不总是迭代选择完成!

于 2013-10-02T06:57:00.930 回答
12

还要考虑 Benoit Chesneau 的偏移库。它是 Go 并发模型到 Python 的一个端口,在底层使用了 Fiber。

他在 PyCon APAC 2013 上对此做了一个演讲:

于 2013-10-02T18:04:47.867 回答
9

您可以使用multiprocessing.Pipe代替chanthreading.Thread代替goselect.select代替select

这是使用这种方法在 Python 中重新实现您的 go 示例:

import random
from multiprocessing import Pipe
from select import select
from threading import Thread


def main():
    c1_r, c1_w = Pipe(duplex=False)
    c2_r, c2_w = Pipe(duplex=False)
    quit_r, quit_w = Pipe(duplex=False)

    def func1():
        for i in range(10):
            c1_w.send(i)
        quit_w.send(0)

    Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2_w.send(i)

    Thread(target=func2).start()

    while True:
        ready, _, _ = select([c1_r, c2_r, quit_r], [], [])
        which = random.choice(ready)
        if which == c1_r:
            c1_r.recv()
            print 'Received value from c1'
        elif which == c2_r:
            c2_r.recv()
            print 'Received value from c2'
        elif which == quit_r and len(ready) == 1:
            quit_r.recv()
            print 'Received value from quit'
            return

if __name__ == '__main__':
    main()

此实现基于@Thomas 的实现,但与@Thomas 不同的是,它不会产生额外的线程来执行选择。

使用 Python 2.7.13 在 Linux 上测试。Windows 的行为可能会有所不同,因为 select 是 Unixy 的东西。

编辑:我添加了len(ready) == 1条件,因此仅在其他管道排空后才处理退出。这在 Go 中不是必需的,因为通道的大小为零,因此收到发送到的消息之前func1不能发送消息。感谢@Sean Perry 的评论。quit_wc1_w

于 2017-03-16T01:24:46.563 回答
4

在 Python 3.5 中,有一些关键字可以让函数在执行中暂停asyncawait从而能够在 evenloop 而不是线程上运行。asyncio标准库提供了一个。

为了更直接地映射 Go 阻塞通道的行为,select您可以使用这个小库,然后您的示例代码在 Python 中看起来非常相似。

于 2016-02-22T00:37:02.810 回答
4

是的,使用goless一切皆有可能。你可以试试看。

玩得开心 ;-)

这是一个例子:

c1 = goless.chan()
c2 = goless.chan()

def func1():
    time.sleep(1)
    c1.send('one')
goless.go(func1)

def func2():
    time.sleep(2)
    c2.send('two')
goless.go(func2)

for i in range(2):
    case, val = goless.select([goless.rcase(c1), goless.rcase(c2)])
    print(val)
于 2016-09-01T10:52:38.727 回答
2

这是另一个模仿 go 语法的尝试:

from threading import Thread
from Queue import Queue

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    Thread(target=lambda: [c1.put(i) for i in range(10)] or quit.put(0)).start()
    Thread(target=lambda: [c2.put(i) for i in range(2)]).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

main()
于 2013-10-02T07:44:50.323 回答
2

为了完整性:Go 风格的频道,包括工作选择可作为pygolang的一部分:

ch1 = chan()    # synchronous channel
ch2 = chan(3)   # channel with buffer of size 3

def _():
    ch1.send('a')
    ch2.send('b')
go(_)

ch1.recv()      # will give 'a'
ch2.recv_()     # will give ('b', True)

_, _rx = select(
    ch1.recv,           # 0
    ch2.recv_,          # 1
    (ch2.send, obj2),   # 2
    default,            # 3
)
if _ == 0:
    # _rx is what was received from ch1
    ...
if _ == 1:
    # _rx is (rx, ok) of what was received from ch2
    ...
if _ == 2:
    # we know obj2 was sent to ch2
    ...
if _ == 3:
    # default case
    ...

偏移量(参见https://stackoverflow.com/a/19143696/9456786)似乎也很有趣。

不幸的是, goless(参见https://stackoverflow.com/a/39269599/9456786)的选择实现很弱,这在设计上无法在同步通道上正常工作

于 2018-12-07T14:27:20.690 回答