7

Go 中的信号量是通过通道实现的:

一个例子是: https ://sites.google.com/site/gopatterns/concurrency/semaphores

语境:

我们有几百台服务器,并且有一些我们想要限制访问的共享资源。因此,对于给定的资源,我们希望使用信号量来限制这些服务器只能访问 5 个并发访问。为此,我们计划使用锁定服务器。当一台机器访问资源时,它会首先向锁服务器注册它正在通过密钥访问资源。然后当它完成时,它会向锁服务器发送另一个请求,说它已经完成并释放信号量。这确保我们将对这些资源的访问限制为最大数量的并发访问。

问题:如果出现问题,想要优雅地处理这个问题。

问题

您如何在信号量上实现超时?

例子:

假设我的信号量大小为 5。同时有 10 个进程试图获取信号量中的锁,因此在这种情况下只有 5 个进程会获取它。

有时,进程会在没有响应的情况下死掉(真正的原因解释起来有点复杂,但基本上有时进程可能无法解锁它)因此导致问题,因为信号量中的空间现在被永久锁定。

所以我想对此有一个超时。这里有一些问题:

这些过程将从 2 秒到 60 分钟之间的任何时间运行。

我们有一些竞争条件,因为如果它超时然后进程尝试解锁它,那么我们已经解锁了信号量两次而不是一次。反之亦然,我们先解锁,然后超时。

如何采用上面发布的建议模式并将其转换为具有超时的线程安全信号量?

4

5 回答 5

1

由于您正在创建分布式锁定服务,我假设您的锁定服务器侦听端口,并且当您接受()一个连接时,您循环,等待每个连接的 goroutine 中的命令。当套接字被丢弃时,该 goroutine 退出(即:远程节点崩溃)

所以,假设这是真的,你可以做几件事。

1)创建一个深度与并发锁数量匹配的通道 2)锁定时,向通道发送消息(如果已满,它将阻塞) 3)解锁时,只需从通道中读取消息 4)您可以“ defer release()"(如果您已经锁定,则 release 会使用一条消息)

这是一个粗略的工作示例,除了套接字之外的所有内容。希望这是有道理的。 http://play.golang.org/p/DLOX7m8m6q

package main

import "fmt"

import "time"

type Locker struct {
    ch chan int
    locked bool
}

func (l *Locker) lock(){
    l.ch <- 1
    l.locked=true
}
func (l *Locker) unlock() {
    if l.locked { // called directly or via defer, make sure we don't unlock if we don't have the lock
        l.locked = false // avoid unlocking twice if socket crashes after unlock
        <- l.ch
    }
}

func dostuff(name string, locker Locker) {
    locker.lock()
    defer locker.unlock()
    fmt.Println(name,"Doing stuff")
    time.Sleep(1 * time.Second)
}

func main() {
    ch := make(chan int, 2)
    go dostuff("1",Locker{ch,false})
    go dostuff("2",Locker{ch,false})
    go dostuff("3",Locker{ch,false})
    go dostuff("4",Locker{ch,false})
    time.Sleep(4 * time.Second)
}
于 2013-10-29T01:17:28.240 回答
1

弄清楚你想要完成什么有点困难,但据我所知,你正试图让并发 goroutine 访问共享资源并在出现问题时优雅地处理它。我有几个关于如何处理这个问题的建议。

1) 使用同步包中的 WaitGroup:http: //golang.org/pkg/sync/#example_WaitGroup

使用这种策略,您基本上在每次调用新的 goroutine 之前添加到一个计数器,并使用 defer 来确保它从计数器中删除(因此无论它超时还是由于其他原因返回,它仍然会从计数器中删除)。然后你使用一个wg.Wait()命令来确保在所有 go 例程都返回之前它不会继续前进。这是一个示例:http ://play.golang.org/p/wnm24TcBZg注意,如果没有wg.Wait()它,它将不会等待 go 例程在从 main 返回和终止之前完成。

2) 使用 time.Ticker 自动超时:http ://golang.org/pkg/time/#Ticker

这种方法基本上会设置一个计时器,该计时器将在设定的时间间隔内触发。您可以使用此计时器来控制基于时间的事件。基本上,这必须在等待通道被馈送的 for 循环中运行,就像在这个例子中一样:http ://play.golang.org/p/IHeqmiFBSS

同样,不完全确定您要完成什么,但您可以考虑将这两种方法结合起来,以便如果您的进程超时并处于循环中,则自动收报机将捕获它并在设定的时间后返回并调用 deferwg.Done()函数,以便等待它的代码部分继续运行。希望这至少有点帮助。

于 2013-10-29T00:28:22.840 回答
1
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
sem := semaphore.NewWeighted(int64(10))

if err := sem.Acquire(ctx, 1); err != nil {
 // - error means timeout else lock
}
于 2020-11-05T10:15:58.357 回答
0

一些假设:

  • 您一次需要大约5 台服务器才能通过锁定服务器。
  • 访问该资源的时间较短且长度相似。

使用配额服务器而不是锁定服务器。以平均(平均,第 75 次等)访问/锁定时间的 5 倍补充配额(一个简单的计数器)。仅在配额小于最大值时才补充配额。这样,您平均将维持大约 5 个并发访问/锁定。

一些高级功能:

  • 如果共享资源可以检测到它自己的负载,它可以告诉配额服务器它可以进行更多或更少的并发访问。
  • 服务器完成后可以 ping 配额服务器。这不是必需的,但可以更快地释放资源。
于 2013-10-29T01:13:41.677 回答
0

也许这会有所帮助,但我认为这种实现过于广泛
,我将不胜感激有关代码的任何建议。

package main

import (
   "fmt"
   "time"
   "math/rand"
   "strconv"
)

type Empty interface{}

type Semaphore struct {
    dur time.Duration
    ch  chan Empty
}

func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) {
    sem = new(Semaphore)
    sem.dur = dur
    sem.ch = make(chan Empty, max)
    return
}

type Timeout struct{}

type Work struct{}

var null Empty
var timeout Timeout
var work Work

var global = time.Now()

func (sem *Semaphore) StartJob(id int, job func()) {
    sem.ch <- null
    go func() {
        ch := make(chan interface{})
        go func() {
            time.Sleep(sem.dur)
            ch <- timeout
        }()
        go func() {
            fmt.Println("Job ", strconv.Itoa(id), " is started", time.Since(global))
            job()
            ch <- work
        }()
        switch (<-ch).(type) {
        case Timeout:
            fmt.Println("Timeout for job ", strconv.Itoa(id), time.Since(global))
        case Work:
            fmt.Println("Job ", strconv.Itoa(id), " is finished", time.Since(global))
        }
        <-sem.ch
    }()
}

func main() {
    rand.Seed(time.Now().Unix())
    sem := NewSemaphore(3, 3*time.Second)
    for i := 0; i < 10; i++ {
        id := i
        go sem.StartJob(i, func() {
            seconds := 2 + rand.Intn(5)
            fmt.Println("For job ", strconv.Itoa(id), " was allocated ", seconds, " secs")
            time.Sleep(time.Duration(seconds) * time.Second)
        })
    }
    time.Sleep(30 * time.Second)
}
于 2015-03-22T12:36:27.320 回答