81
type Stat struct {
    counters     map[string]*int64
    countersLock sync.RWMutex
    averages     map[string]*int64
    averagesLock sync.RWMutex
}

它在下面被称为

func (s *Stat) Count(name string) {
    s.countersLock.RLock()
    counter := s.counters[name]
    s.countersLock.RUnlock()
    if counter != nil {
        atomic.AddInt64(counter, int64(1))
        return
    }
}

我的理解是,我们首先锁定接收器 s(这是一个 Stat 类型),然后如果计数器确实存在,我们将添加到它。

问题:

Q1:为什么我们需要锁定它?甚至是什么RWMutex意思?

Q2:s.countersLock.RLock()- 这会锁定整个接收器还是仅锁定 Stat 类型的计数器字段?

Q3:s.countersLock.RLock()- 这会锁定平均场吗?

Q4:我们为什么要使用RWMutex? 我认为通道是在 Golang 中处理并发的首选方式?

Q5:这是什么atomic.AddInt64。为什么在这种情况下我们需要原子?

Q6:为什么我们会在添加之前立即解锁?

4

2 回答 2

140

When more than one thread* needs to mutate the same value, a locking mechanism is needed to synchronizes access. Without it two or more threads* could be writing to the same value at the same time, resulting in corrupt memory that typically results in a crash.

The atomic package provides a fast and easy way to synchronize access to primitive values. For a counter it is the fastest synchronization method. It has methods with well defined use cases, such as incrementing, decrementing, swapping, etc.

The sync package provides a way to synchronize access to more complicated values, such as maps, slices, arrays, or groups of values. You use this for use cases that are not defined in atomic.

In either case locking is only required when writing. Multiple threads* can safely read the same value without a locking mechanism.

Lets take a look at the code you provided.

type Stat struct {
    counters     map[string]*int64
    countersLock sync.RWMutex
    averages     map[string]*int64
    averagesLock sync.RWMutex
}

func (s *Stat) Count(name string) {
    s.countersLock.RLock()
    counter := s.counters[name]
    s.countersLock.RUnlock()
    if counter != nil {
        atomic.AddInt64(counter, int64(1))
        return
    }
}

What's missing here is how the map's themselves are initialized. And so far the maps are not being mutated. If the counter names are predetermined and cannot be added to later, you don't need the RWMutex. That code might look something like this:

type Stat struct {
    counters map[string]*int64
}

func InitStat(names... string) Stat {
    counters := make(map[string]*int64)
    for _, name := range names {
        counter := int64(0)
        counters[name] = &counter
    }
    return Stat{counters}
}

func (s *Stat) Count(name string) int64 {
    counter := s.counters[name]
    if counter == nil {
        return -1 // (int64, error) instead?
    }
    return atomic.AddInt64(counter, 1)
}

(Note: I removed averages because it wasn't being used in the original example.)

Now, lets say you didn't want your counters to be predetermined. In that case you would need a mutex to synchronize access.

Lets try it with just a Mutex. It's simple because only one thread* can hold Lock at a time. If a second thread* tries to Lock before the first releases theirs with Unlock, it waits (or blocks)** until then.

type Stat struct {
    counters map[string]*int64
    mutex    sync.Mutex
}

func InitStat() Stat {
    return Stat{counters: make(map[string]*int64)}
}

func (s *Stat) Count(name string) int64 {
    s.mutex.Lock()
    counter := s.counters[name]
    if counter == nil {
        value := int64(0)
        counter = &value
        s.counters[name] = counter
    }
    s.mutex.Unlock()
    return atomic.AddInt64(counter, 1)
}

The code above will work just fine. But there are two problems.

  1. If there is a panic between Lock() and Unlock() the mutex will be locked forever, even if you were to recover from the panic. This code probably won't panic, but in general it's better practice to assume it might.
  2. An exclusive lock is taken while fetching the counter. Only one thread* can read from the counter at one time.

Problem #1 is easy to solve. Use defer:

func (s *Stat) Count(name string) int64 {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    counter := s.counters[name]
    if counter == nil {
        value := int64(0)
        counter = &value
        s.counters[name] = counter
    }
    return atomic.AddInt64(counter, 1)
}

This ensures that Unlock() is always called. And if for some reason you have more then one return, you only need to specify Unlock() once at the head of the function.

Problem #2 can be solved with RWMutex. How does it work exactly, and why is it useful?

RWMutex is an extension of Mutex and adds two methods: RLock and RUnlock. There are a few points that are important to note about RWMutex:

  • RLock is a shared read lock. When a lock is taken with it, other threads* can also take their own lock with RLock. This means multiple threads* can read at the same time. It's semi-exclusive.

  • If the mutex is read locked, a call to Lock is blocked**. If one or more readers hold a lock, you cannot write.

  • If the mutex is write locked (with Lock), RLock will block**.

A good way to think about it is RWMutex is a Mutex with a reader counter. RLock increments the counter while RUnlock decrements it. A call to Lock will block as long as that counter is > 0.

You may be thinking: If my application is read heavy, would that mean a writer could be blocked indefinitely? No. There is one more useful property of RWMutex:

  • If the reader counter is > 0 and Lock is called, future calls to RLock will also block until the existing readers have released their locks, the writer has obtained his lock and later releases it.

Think of it as the light above a register at the grocery store that says a cashier is open or not. The people in line get to stay there and they will be helped, but new people cannot get in line. As soon as the last remaining customer is helped the cashier goes on break, and that register either remains closed until they come back or they are replaced with a different cashier.

Lets modify the earlier example with an RWMutex:

type Stat struct {
    counters map[string]*int64
    mutex    sync.RWMutex
}

func InitStat() Stat {
    return Stat{counters: make(map[string]*int64)}
}

func (s *Stat) Count(name string) int64 {
    var counter *int64
    if counter = getCounter(name); counter == nil {
        counter = initCounter(name);
    }
    return atomic.AddInt64(counter, 1)
}

func (s *Stat) getCounter(name string) *int64 {
    s.mutex.RLock()
    defer s.mutex.RUnlock()
    return s.counters[name]
}

func (s *Stat) initCounter(name string) *int64 {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    counter := s.counters[name]
    if counter == nil {
        value := int64(0)
        counter = &value
        s.counters[name] = counter    
    }
    return counter
}

With the code above I've separated the logic out into getCounter and initCounter functions to:

  • Keep the code simple to understand. It would be difficult to RLock() and Lock() in the same function.
  • Release the locks as early as possible while using defer.

The code above, unlike the Mutex example, allows you to increment different counters simultaneously.

Another thing I wanted to point out is with all the examples above, the map map[string]*int64 contains pointers to the counters, not the counters themselves. If you were to store the counters in the map map[string]int64 you would need to use Mutex without atomic. That code would look something like this:

type Stat struct {
    counters map[string]int64
    mutex    sync.Mutex
}

func InitStat() Stat {
    return Stat{counters: make(map[string]int64)}
}

func (s *Stat) Count(name string) int64 {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    s.counters[name]++
    return s.counters[name]
}

You may want to do this to reduce garbage collection - but that would only matter if you had thousands of counters - and even then the counters themselves don't take up a whole lot of space (compared to something like a byte buffer).

* When I say thread I mean go-routine. A thread in other languages is a mechanism for running one or more sets of code simultaneously. A thread is expensive to create and tear-down. A go-routine is built on top of threads, but re-uses them. When a go-routine sleeps the underlying thread can be used by another go-routine. When a go-routine wakes up, it might be on a different thread. Go handles all this behind the scenes. -- But for all intents and purposes you would treat a go-routine like a thread when it comes to memory access. However, you don't have to be as conservative when using go-routines as you do threads.

** When a go-routine is blocked by Lock, RLock, a channel, or Sleep, the underlying thread might be re-used. No cpu is used by that go-routine - think of it as waiting in line. Like other languages an infinite loop like for {} would block while keeping the cpu and go-routine busy - think of that as running around in a circle - you'll get dizzy, throw up, and the people around you won't be very happy.

于 2013-10-03T20:03:28.343 回答
48

问题:

Q1:为什么我们需要锁定它?甚至是什么RWMutex意思?

RW 代表读/写。CF 文档: http: //golang.org/pkg/sync/#RWMutex

我们需要锁定它以防止其他例程/线程在我们处理它时更改值。

Q2:s.countersLock.RLock()- 这会锁定整个接收器还是仅锁定 Stat 类型的计数器字段?

作为互斥体,只有在调用RLock()函数时才会发生锁定。如果任何其他 goroutine 已经调用了WLock(),那么它会阻塞。您可以在同一个 goroutine 中调用任意数量的RLock(),它不会锁定。

所以它不会锁定任何其他字段,甚至s.counters. 在您的示例中,您锁定地图查找以找到正确的计数器。

Q3:s.countersLock.RLock()- 这会锁定平均场吗?

不,正如 Q2 中所说,aRLock只锁定自己。

Q4:我们为什么要使用RWMutex? 我认为通道是在 Golang 中处理并发的首选方式?

频道非常有用,但有时还不够,有时没有意义。

在这里,当您锁定映射访问时,互斥锁是有意义的。使用 chan,您必须有一个 1 的缓冲 chan,之前发送和之后接收。不是很直观。

Q5:这是什么atomic.AddInt64。为什么在这种情况下我们需要原子?

此函数将以原子方式递增给定变量。在您的情况下,您有一个竞争条件:counter是一个指针,实际变量可以在释放锁之后和调用atomic.AddInt64. 如果您不熟悉这类事情,我建议您坚持使用互斥锁并在锁定/解锁之间进行您需要的所有处理。

Q6:为什么我们会在添加之前立即解锁?

你不应该。

我不知道您要做什么,但这是一个(简单)示例: https: //play.golang.org/p/cVFPB-05dw

于 2013-10-03T01:00:12.597 回答