go

golang的sync包

Posted by Zeusro on April 17, 2019
👈🏻 Select language

并发相关

总结

type 作用
Cond 发令枪,一般预设一个条件让子任务等待,发出的信号可以是单个(Signal)也可集体广播(Broadcast)
Locker 简单接口
Mutex 互斥锁
Once 并发运行,只允许一次
RWMutex 读写锁,多读少写,同时读锁,读写互斥.
WaitGroup 分发任务,主线程等待所有任务完成

Cond

1
2
3
4
5
type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()

加入到通知列表 -> 解锁 L -> 等待通知 -> 锁定 L

虽然放在最前面,但我花了最长的时间去理解这玩意.

按照我的理解,Cond就好比一个发令枪.比如我同时养了5条狗,并同时准备了5份食物,但是没有我的口令,我不准它们吃.示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func useCondBroadcast() {
	var count int = 5
	ch := make(chan struct{}, 5)

	// 新建 cond
	var l sync.Mutex
	cond := sync.NewCond(&l)

	for i := 0; i < 5; i++ {
		go func(i int) {
			// 争抢互斥锁的锁定
			cond.L.Lock()
			// 条件是否达成
			for count > i {
				cond.Wait()
				fmt.Printf("收到一个通知 goroutine%d\n", i)
			}

			fmt.Printf("goroutine%d 执行结束\n", i)
			cond.L.Unlock()
			ch <- struct{}{}

		}(i)
	}

	// 确保所有 goroutine 启动完成
	time.Sleep(time.Millisecond * 20)

	fmt.Println("broadcast...")
	cond.L.Lock()
	count = -1
	cond.Broadcast()
	cond.L.Unlock()

	for i := 0; i < 5; i++ {
		<-ch
	}
}


func useCondSignal() {
	var count int = 5
	ch := make(chan struct{}, 5)

	// 新建 cond
	var l sync.Mutex
	cond := sync.NewCond(&l)

	for i := 0; i < 5; i++ {
		go func(i int) {
			// 争抢互斥锁的锁定
			cond.L.Lock()
			// 条件是否达成
			for count > i {
				cond.Wait()
				fmt.Printf("收到一个通知 goroutine%d\n", i)
			}

			fmt.Printf("goroutine%d 执行结束\n", i)
			cond.L.Unlock()
			ch <- struct{}{}

		}(i)
	}

	// 确保所有 goroutine 启动完成
	time.Sleep(time.Millisecond * 20)

	time.Sleep(time.Second)
	fmt.Println("signal...")
	cond.L.Lock()
	count = -1
	cond.Signal()
	cond.L.Unlock()

	//fatal error: all goroutines are asleep - deadlock!
	for i := 0; i < 1; i++ {
		<-ch
	}
}

这时

场景:喂狗

goroutine:每一条狗吃饭的行为

Broadcast()方法:通知所有狗吃饭

Signal()方法:通知随机一条狗吃饭

例子中count变量: 指示狗吃饭的信号

例子中的ch变量:狗拉的便便

useCondBroadcast()useCondSignal这2个例子,差别只在于最后管道的读取游标(i).

Broadcast方法通知的对象是所有的狗,所以最后所有的狗都顺利开吃(i=4).

Signal只通知了一条狗,所以最后只有一条狗拉出了便便(i=0)

所以如果只有一条狗,那么使用Signal效果等同于Broadcast.

SignalBroadcast方法都好,如果设置了管道(ch := make(chan struct{}, 5))去接收最后的结果,要注意设置的临界值变化导致的最后出来的结果数量.

取少了没关系,取多了会出现fatal error: all goroutines are asleep - deadlock!这个panic(比如,在useCondSignal这个例子里面,把i<1改成i<2),后果不堪设想.

关于Cond实际的使用场景,我觉得把Cond应用于最优解.比如说我要爬取同一个网页,可能有ABCD四种方案,我只需要其中一个方案最快完成即可.那么只要其中一个任务完成,在主线程发起Broadcast,这样其他方案就不用白忙活了,可以退出舞台.

暂时没想到Signal的实际用法,以后有机会再补充吧.

真正理解了Cond锁的争抢方式之后,BroadcastSignal交替使用也就不再有什么问题.

Locker

只是一个简单的接口.

1
2
3
4
type Locker interface {
        Lock()
        Unlock()
}

Mutex

1
2
3
type Mutex
    func (m *Mutex) Lock()
    func (m *Mutex) Unlock()

互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func useMutex() {
	ch := make(chan struct{}, 2)

	var l sync.Mutex
	go func() {
		l.Lock()
		defer l.Unlock()
		fmt.Println("goroutine1: 我会锁定大概 2s")
		time.Sleep(time.Second * 2)
		fmt.Println("goroutine1: 我解锁了,你们去抢吧")
		ch <- struct{}{}
	}()

	go func() {
		fmt.Println("groutine2: 等待解锁")
		l.Lock()
		defer l.Unlock()
		fmt.Println("goroutine2: 哈哈,我锁定了")
		ch <- struct{}{}
	}()

	// 等待 goroutine 执行结束
	for i := 0; i < 2; i++ {
		<-ch
	}
}

Once

1
2
type Once
    func (o *Once) Do(f func())

如其名,Once里的Do函数只会运行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func useOnce() {
	var once sync.Once
	onceBody := func() {
		fmt.Println("Only once")
	}
	done := make(chan bool)
	for i := 0; i < 10; i++ {
		go func() {
			once.Do(onceBody)
			done <- true
		}()
	}
	for i := 0; i < 10; i++ {
		<-done
	}
}

RWMutex

1
2
3
4
5
6
type RWMutex
    func (rw *RWMutex) Lock()
    func (rw *RWMutex) RLock()
    func (rw *RWMutex) RLocker() Locker
    func (rw *RWMutex) RUnlock()
    func (rw *RWMutex) Unlock()

RWMutex是基于Mutex实现的.

读写锁,一般用在大量读操作、少量写操作的情况

  1. 同时只能有一个 goroutine 能够获得写锁定。
  2. 同时可以有任意多个 gorouinte 获得读锁定。
  3. 同时只能存在写锁定或读锁定(读和写互斥)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func useRWMutex() {
	ch := make(chan struct{}, 10)
	for i := 0; i < 5; i++ {
		go read(i, ch)
	}
	for i := 0; i < 5; i++ {
		go write(i, ch)
	}

	for i := 0; i < 10; i++ {
		<-ch
	}
}

var count int
var rw sync.RWMutex

func read(n int, ch chan struct{}) {
	rw.RLock()
	fmt.Printf("goroutine %d 进入读操作...\n", n)
	v := count
	fmt.Printf("goroutine %d 读取结束,值为:%d\n", n, v)
	rw.RUnlock()
	ch <- struct{}{}
}

func write(n int, ch chan struct{}) {
	rw.Lock()
	fmt.Printf("goroutine %d 进入写操作...\n", n)
	v := rand.Intn(1000)
	count = v
	fmt.Printf("goroutine %d 写入结束,新值为:%d\n", n, v)
	rw.Unlock()
	ch <- struct{}{}
}

WaitGroup

1
2
3
4
5
6
type WaitGroup
    func (wg *WaitGroup) Add(delta int)
    func (wg *WaitGroup) Done()
    func (wg *WaitGroup) Wait()
Examples(Expand All)

简单的多任务分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func useWaitGroup() {
	// 通过sync包中的WaitGroup 实现并发控制

	var wg sync.WaitGroup

	wg.Add(1)
	go func(wg *sync.WaitGroup) {
		time.Sleep(5 * time.Second)
		fmt.Println("1 done")
		wg.Done()
	}(&wg)

	wg.Add(1)
	go func(wg *sync.WaitGroup) {
		time.Sleep(9 * time.Second)
		fmt.Println("2 done")
		wg.Done()
	}(&wg)
	wg.Wait()
	fmt.Println("handle2 done")

	// 在 sync 包中,提供了 WaitGroup ,它会等待它收集的所有 goroutine 任务全部完成,在主 goroutine 中 Add(delta int) 索要等待goroutine 的数量。在每一个 goroutine 完成后 Done() 表示这一个goroutine 已经完成,当所有的 goroutine 都完成后,在主 goroutine 中 WaitGroup 返回。
}

数据结构

Map

1
2
3
4
5
6
type Map
    func (m *Map) Delete(key interface{})
    func (m *Map) Load(key interface{}) (value interface{}, ok bool)
    func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
    func (m *Map) Range(f func(key, value interface{}) bool)
    func (m *Map) Store(key, value interface{})

适用场景

线程安全集合,在2个场景做了优化

  1. 只写1次,多次读
  2. 多个goroutines读写互不相同的键(比如goroutines1读写key1,goroutines2读写key2)

方法介绍

Load 读取

LoadOrStore 读取不到则写入

Store 写入

Range 无法直接遍历,得通过回调的方式遍历

具体用法见

Go 1.9 sync.Map揭秘

Pool

1
2
3
type Pool
    func (p *Pool) Get() interface{}
    func (p *Pool) Put(x interface{})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var bufPool = sync.Pool{
	New: func() interface{} {
		return new(bytes.Buffer)
	},
}

func usePool(key, val string) {
	// 获取临时对象,没有的话会自动创建
	b := bufPool.Get().(*bytes.Buffer)
	b.Reset()
	b.WriteString(key)
	b.WriteByte('=')
	b.WriteString(val)
	os.Stdout.Write(b.Bytes())
	// 将临时对象放回到 Pool 中
	bufPool.Put(b)
}

参考链接:

  1. 浅谈 Golang sync 包的相关使用方法
  2. map性能对比

Summary

type purpose
Cond Starting gun, usually pre-sets a condition for subtasks to wait; the signal can be single (Signal) or broadcast (Broadcast)
Locker Simple interface
Mutex Mutual exclusion lock
Once Concurrent execution, only allowed once
RWMutex Read-write lock, many reads and few writes, simultaneous read locks, read-write mutual exclusion.
WaitGroup Distribute tasks, main thread waits for all tasks to complete

Cond

1
2
3
4
5
type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()

Add to notification list -> Unlock L -> Wait for notification -> Lock L

Although it’s placed at the very beginning, I spent the longest time trying to understand this thing.

According to my understanding, Cond is like a starting gun. For example, I have 5 dogs and 5 portions of food ready at the same time, but I won’t let them eat without my command. The example code is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func useCondBroadcast() {
	var count int = 5
	ch := make(chan struct{}, 5)

	// Create new cond
	var l sync.Mutex
	cond := sync.NewCond(&l)

	for i := 0; i < 5; i++ {
		go func(i int) {
			// Compete for mutual exclusion lock
			cond.L.Lock()
			// Check if condition is met
			for count > i {
				cond.Wait()
				fmt.Printf("Received a notification goroutine%d\n", i)
			}

			fmt.Printf("goroutine%d execution finished\n", i)
			cond.L.Unlock()
			ch <- struct{}{}

		}(i)
	}

	// Ensure all goroutines have started
	time.Sleep(time.Millisecond * 20)

	fmt.Println("broadcast...")
	cond.L.Lock()
	count = -1
	cond.Broadcast()
	cond.L.Unlock()

	for i := 0; i < 5; i++ {
		<-ch
	}
}


func useCondSignal() {
	var count int = 5
	ch := make(chan struct{}, 5)

	// Create new cond
	var l sync.Mutex
	cond := sync.NewCond(&l)

	for i := 0; i < 5; i++ {
		go func(i int) {
			// Compete for mutual exclusion lock
			cond.L.Lock()
			// Check if condition is met
			for count > i {
				cond.Wait()
				fmt.Printf("Received a notification goroutine%d\n", i)
			}

			fmt.Printf("goroutine%d execution finished\n", i)
			cond.L.Unlock()
			ch <- struct{}{}

		}(i)
	}

	// Ensure all goroutines have started
	time.Sleep(time.Millisecond * 20)

	time.Sleep(time.Second)
	fmt.Println("signal...")
	cond.L.Lock()
	count = -1
	cond.Signal()
	cond.L.Unlock()

	//fatal error: all goroutines are asleep - deadlock!
	for i := 0; i < 1; i++ {
		<-ch
	}
}

At this point

Scenario: Feeding dogs

goroutine: The act of each dog eating

Broadcast() method: Notify all dogs to eat

Signal() method: Notify a random dog to eat

count variable in the example: Indicates the signal for dogs to eat

ch variable in the example: The poop the dogs pull

The difference between useCondBroadcast() and useCondSignal examples is only in the final pipe’s read cursor (i).

The Broadcast method notifies all dogs, so all dogs successfully start eating (i=4).

Signal only notifies one dog, so only one dog poops in the end (i=0).

So if there is only one dog, using Signal has the same effect as Broadcast.

Both Signal and Broadcast methods are fine. If a channel (ch := make(chan struct{}, 5)) is set up to receive the final result, be careful about the change in the threshold value, which affects the number of results.

It’s okay to take fewer, but taking too many will cause a fatal error: all goroutines are asleep - deadlock! panic (e.g., in the useCondSignal example, change i<1 to i<2), with unimaginable consequences.

Regarding the actual use case of Cond, I think Cond is best applied to an optimal solution. For example, if I want to crawl the same webpage, there might be four schemes A, B, C, D, and I only need one of them to finish the fastest. Then, as soon as one task completes, Broadcast is initiated in the main thread, so other schemes don’t have to work in vain and can exit the stage.

I haven’t thought of an actual use for Signal yet; I’ll add it later if I have a chance.

Once the contention mechanism of Cond lock is truly understood, alternating Broadcast and Signal no longer poses any problem.

Locker

Just a simple interface.

1
2
3
4
type Locker interface {
        Lock()
        Unlock()
}

Mutex

1
2
3
type Mutex
    func (m *Mutex) Lock()
    func (m *Mutex) Unlock()

Mutual exclusion lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func useMutex() {
	ch := make(chan struct{}, 2)

	var l sync.Mutex
	go func() {
		l.Lock()
		defer l.Unlock()
		fmt.Println("goroutine1: I will lock for about 2s")
		time.Sleep(time.Second * 2)
		fmt.Println("goroutine1: I unlocked, go grab it")
		ch <- struct{}{}
	}()

	go func() {
		fmt.Println("groutine2: Waiting for unlock")
		l.Lock()
		defer l.Unlock()
		fmt.Println("goroutine2: Haha, I locked it")
		ch <- struct{}{}
	}()

	// Wait for goroutines to finish
	for i := 0; i < 2; i++ {
		<-ch
	}
}

Once

1
2
type Once
    func (o *Once) Do(f func())

As its name suggests, the Do function in Once will only run once.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func useOnce() {
	var once sync.Once
	onceBody := func() {
		fmt.Println("Only once")
	}
	done := make(chan bool)
	for i := 0; i < 10; i++ {
		go func() {
			once.Do(onceBody)
			done <- true
		}()
	}
	for i := 0; i < 10; i++ {
		<-done
	}
}

RWMutex

1
2
3
4
5
6
type RWMutex
    func (rw *RWMutex) Lock()
    func (rw *RWMutex) RLock()
    func (rw *RWMutex) RLocker() Locker
    func (rw *RWMutex) RUnlock()
    func (rw *RWMutex) Unlock()

RWMutex is implemented based on Mutex.

Read-write lock, generally used in scenarios with a large number of read operations and a small number of write operations.

  1. Only one goroutine can acquire a write lock at a time.
  2. Any number of goroutines can acquire read locks simultaneously.
  3. Only a write lock or read lock can exist simultaneously (read and write are mutually exclusive).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func useRWMutex() {
	ch := make(chan struct{}, 10)
	for i := 0; i < 5; i++ {
		go read(i, ch)
	}
	for i := 0; i < 5; i++ {
		go write(i, ch)
	}

	for i := 0; i < 10; i++ {
		<-ch
	}
}

var count int
var rw sync.RWMutex

func read(n int, ch chan struct{}) {
	rw.RLock()
	fmt.Printf("goroutine %d entering read operation...\n", n)
	v := count
	fmt.Printf("goroutine %d finished reading, value is: %d\n", n, v)
	rw.RUnlock()
	ch <- struct{}{}
}

func write(n int, ch chan struct{}) {
	rw.Lock()
	fmt.Printf("goroutine %d entering write operation...\n", n)
	v := rand.Intn(1000)
	count = v
	fmt.Printf("goroutine %d finished writing, new value is: %d\n", n, v)
	rw.Unlock()
	ch <- struct{}{}
}

WaitGroup

1
2
3
4
5
6
type WaitGroup
    func (wg *WaitGroup) Add(delta int)
    func (wg *WaitGroup) Done()
    func (wg *WaitGroup) Wait()
Examples(Expand All)

Simple multi-task distribution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func useWaitGroup() {
	// Achieve concurrency control through WaitGroup in the sync package

	var wg sync.WaitGroup

	wg.Add(1)
	go func(wg *sync.WaitGroup) {
		time.Sleep(5 * time.Second)
		fmt.Println("1 done")
		wg.Done()
	}(&wg)

	wg.Add(1)
	go func(wg *sync.WaitGroup) {
		time.Sleep(9 * time.Second)
		fmt.Println("2 done")
		wg.Done()
	}(&wg)
	wg.Wait()
	fmt.Println("handle2 done")

	// The sync package provides WaitGroup, which waits for all goroutine tasks it collects to complete. In the main goroutine, Add(delta int) is used to specify the number of goroutines to wait for. After each goroutine completes, Done() indicates that this goroutine is finished. When all goroutines are completed, WaitGroup returns in the main goroutine.
}

Data Structures

Map

1
2
3
4
5
6
type Map
    func (m *Map) Delete(key interface{})
    func (m *Map) Load(key interface{}) (value interface{}, ok bool)
    func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
    func (m *Map) Range(f func(key, value interface{}) bool)
    func (m *Map) Store(key, value interface{})

Applicable Scenarios

Thread-safe collection, optimized for 2 scenarios:

  1. Write once, read many times
  2. Multiple goroutines read and write different keys (e.g., goroutine1 reads/writes key1, goroutine2 reads/writes key2)

Method Introduction

Load: Read LoadOrStore: Read, if not found, then write Store: Write Range: Cannot iterate directly, must iterate through callback

Specific usage can be found in: Go 1.9 sync.Map揭秘

Pool

1
2
3
type Pool
    func (p *Pool) Get() interface{}
    func (p *Pool) Put(x interface{})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var bufPool = sync.Pool{
	New: func() interface{} {
		return new(bytes.Buffer)
	},
}

func usePool(key, val string) {
	// Get temporary object, automatically create if not available
	b := bufPool.Get().(*bytes.Buffer)
	b.Reset()
	b.WriteString(key)
	b.WriteByte('=')
	b.WriteString(val)
	os.Stdout.Write(b.Bytes())
	// Return temporary object to Pool
	bufPool.Put(b)
}
  1. 浅谈 Golang sync 包的相关使用方法
  2. map性能对比

Связанное с параллелизмом

Резюме

type назначение
Cond Стартовый пистолет, обычно предустанавливает условие для подзадач ожидания; сигнал может быть одиночным (Signal) или широковещательным (Broadcast)
Locker Простой интерфейс
Mutex Взаимоисключающая блокировка
Once Параллельное выполнение, разрешено только один раз
RWMutex Блокировка чтения-записи, много чтений и мало записей, одновременные блокировки чтения, взаимное исключение чтения-записи.
WaitGroup Распределение задач, главный поток ждет завершения всех задач

Cond

1
2
3
4
5
type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()

Добавить в список уведомлений -> Разблокировать L -> Ждать уведомления -> Заблокировать L

Хотя это помещено в самом начале, я потратил больше всего времени, пытаясь понять эту вещь.

Согласно моему пониманию, Cond похож на стартовый пистолет. Например, у меня есть 5 собак и 5 порций еды, готовых одновременно, но я не позволю им есть без моей команды. Примерный код выглядит следующим образом:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func useCondBroadcast() {
	var count int = 5
	ch := make(chan struct{}, 5)

	// Создать новый cond
	var l sync.Mutex
	cond := sync.NewCond(&l)

	for i := 0; i < 5; i++ {
		go func(i int) {
			// Конкурировать за блокировку взаимного исключения
			cond.L.Lock()
			// Проверить, выполнено ли условие
			for count > i {
				cond.Wait()
				fmt.Printf("Получено уведомление goroutine%d\n", i)
			}

			fmt.Printf("goroutine%d выполнение завершено\n", i)
			cond.L.Unlock()
			ch <- struct{}{}

		}(i)
	}

	// Убедиться, что все goroutines запущены
	time.Sleep(time.Millisecond * 20)

	fmt.Println("broadcast...")
	cond.L.Lock()
	count = -1
	cond.Broadcast()
	cond.L.Unlock()

	for i := 0; i < 5; i++ {
		<-ch
	}
}


func useCondSignal() {
	var count int = 5
	ch := make(chan struct{}, 5)

	// Создать новый cond
	var l sync.Mutex
	cond := sync.NewCond(&l)

	for i := 0; i < 5; i++ {
		go func(i int) {
			// Конкурировать за блокировку взаимного исключения
			cond.L.Lock()
			// Проверить, выполнено ли условие
			for count > i {
				cond.Wait()
				fmt.Printf("Получено уведомление goroutine%d\n", i)
			}

			fmt.Printf("goroutine%d выполнение завершено\n", i)
			cond.L.Unlock()
			ch <- struct{}{}

		}(i)
	}

	// Убедиться, что все goroutines запущены
	time.Sleep(time.Millisecond * 20)

	time.Sleep(time.Second)
	fmt.Println("signal...")
	cond.L.Lock()
	count = -1
	cond.Signal()
	cond.L.Unlock()

	//fatal error: all goroutines are asleep - deadlock!
	for i := 0; i < 1; i++ {
		<-ch
	}
}

В этот момент

Сценарий: Кормление собак

goroutine: Действие каждой собаки, поедающей еду

Метод Broadcast(): Уведомить всех собак поесть

Метод Signal(): Уведомить случайную собаку поесть

Переменная count в примере: Указывает сигнал для собак поесть

Переменная ch в примере: Какашки, которые тянут собаки

Разница между примерами useCondBroadcast() и useCondSignal только в курсоре чтения финальной трубы (i).

Метод Broadcast уведомляет всех собак, поэтому все собаки успешно начинают есть (i=4).

Signal уведомляет только одну собаку, поэтому в конце только одна собака какает (i=0).

Так что если есть только одна собака, использование Signal имеет тот же эффект, что и Broadcast.

Оба метода Signal и Broadcast хороши. Если настроен канал (ch := make(chan struct{}, 5)) для получения финального результата, будьте осторожны с изменением порогового значения, которое влияет на количество результатов.

Можно взять меньше, но взять слишком много вызовет панику fatal error: all goroutines are asleep - deadlock! (например, в примере useCondSignal изменить i<1 на i<2), с непредсказуемыми последствиями.

Что касается фактического случая использования Cond, я думаю, что Cond лучше всего применять к оптимальному решению. Например, если я хочу сканировать одну и ту же веб-страницу, могут быть четыре схемы A, B, C, D, и мне нужна только одна из них, чтобы завершиться быстрее всего. Затем, как только одна задача завершится, Broadcast инициируется в главном потоке, так что другим схемам не нужно работать впустую и они могут покинуть сцену.

Я еще не придумал фактическое использование Signal; добавлю позже, если будет возможность.

Как только механизм конкуренции блокировки Cond действительно понят, чередование Broadcast и Signal больше не представляет проблемы.

Locker

Просто простой интерфейс.

1
2
3
4
type Locker interface {
        Lock()
        Unlock()
}

Mutex

1
2
3
type Mutex
    func (m *Mutex) Lock()
    func (m *Mutex) Unlock()

Взаимоисключающая блокировка

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func useMutex() {
	ch := make(chan struct{}, 2)

	var l sync.Mutex
	go func() {
		l.Lock()
		defer l.Unlock()
		fmt.Println("goroutine1: Я заблокирую примерно на 2s")
		time.Sleep(time.Second * 2)
		fmt.Println("goroutine1: Я разблокировал, идите захватывать")
		ch <- struct{}{}
	}()

	go func() {
		fmt.Println("groutine2: Ожидание разблокировки")
		l.Lock()
		defer l.Unlock()
		fmt.Println("goroutine2: Ха-ха, я заблокировал")
		ch <- struct{}{}
	}()

	// Ждать завершения goroutines
	for i := 0; i < 2; i++ {
		<-ch
	}
}

Once

1
2
type Once
    func (o *Once) Do(f func())

Как следует из названия, функция Do в Once будет выполняться только один раз.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func useOnce() {
	var once sync.Once
	onceBody := func() {
		fmt.Println("Only once")
	}
	done := make(chan bool)
	for i := 0; i < 10; i++ {
		go func() {
			once.Do(onceBody)
			done <- true
		}()
	}
	for i := 0; i < 10; i++ {
		<-done
	}
}

RWMutex

1
2
3
4
5
6
type RWMutex
    func (rw *RWMutex) Lock()
    func (rw *RWMutex) RLock()
    func (rw *RWMutex) RLocker() Locker
    func (rw *RWMutex) RUnlock()
    func (rw *RWMutex) Unlock()

RWMutex реализован на основе Mutex.

Блокировка чтения-записи, обычно используется в сценариях с большим количеством операций чтения и небольшим количеством операций записи.

  1. Только одна goroutine может получить блокировку записи одновременно.
  2. Любое количество goroutines может одновременно получить блокировки чтения.
  3. Может существовать только блокировка записи или блокировка чтения одновременно (чтение и запись взаимно исключают друг друга).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func useRWMutex() {
	ch := make(chan struct{}, 10)
	for i := 0; i < 5; i++ {
		go read(i, ch)
	}
	for i := 0; i < 5; i++ {
		go write(i, ch)
	}

	for i := 0; i < 10; i++ {
		<-ch
	}
}

var count int
var rw sync.RWMutex

func read(n int, ch chan struct{}) {
	rw.RLock()
	fmt.Printf("goroutine %d входит в операцию чтения...\n", n)
	v := count
	fmt.Printf("goroutine %d закончил чтение, значение: %d\n", n, v)
	rw.RUnlock()
	ch <- struct{}{}
}

func write(n int, ch chan struct{}) {
	rw.Lock()
	fmt.Printf("goroutine %d входит в операцию записи...\n", n)
	v := rand.Intn(1000)
	count = v
	fmt.Printf("goroutine %d закончил запись, новое значение: %d\n", n, v)
	rw.Unlock()
	ch <- struct{}{}
}

WaitGroup

1
2
3
4
5
6
type WaitGroup
    func (wg *WaitGroup) Add(delta int)
    func (wg *WaitGroup) Done()
    func (wg *WaitGroup) Wait()
Examples(Expand All)

Простое распределение многозадачности

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func useWaitGroup() {
	// Достичь управления параллелизмом через WaitGroup в пакете sync

	var wg sync.WaitGroup

	wg.Add(1)
	go func(wg *sync.WaitGroup) {
		time.Sleep(5 * time.Second)
		fmt.Println("1 done")
		wg.Done()
	}(&wg)

	wg.Add(1)
	go func(wg *sync.WaitGroup) {
		time.Sleep(9 * time.Second)
		fmt.Println("2 done")
		wg.Done()
	}(&wg)
	wg.Wait()
	fmt.Println("handle2 done")

	// Пакет sync предоставляет WaitGroup, который ждет завершения всех собранных им задач goroutine. В главной goroutine Add(delta int) используется для указания количества goroutines для ожидания. После завершения каждой goroutine Done() указывает, что эта goroutine завершена. Когда все goroutines завершены, WaitGroup возвращается в главной goroutine.
}

Структуры данных

Map

1
2
3
4
5
6
type Map
    func (m *Map) Delete(key interface{})
    func (m *Map) Load(key interface{}) (value interface{}, ok bool)
    func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
    func (m *Map) Range(f func(key, value interface{}) bool)
    func (m *Map) Store(key, value interface{})

Применимые сценарии

Потокобезопасная коллекция, оптимизирована для 2 сценариев:

  1. Записать один раз, читать много раз
  2. Несколько goroutines читают и пишут разные ключи (например, goroutine1 читает/пишет key1, goroutine2 читает/пишет key2)

Введение методов

Load: Чтение LoadOrStore: Чтение, если не найдено, затем запись Store: Запись Range: Нельзя итерировать напрямую, нужно итерировать через callback

Конкретное использование можно найти в: Go 1.9 sync.Map揭秘

Pool

1
2
3
type Pool
    func (p *Pool) Get() interface{}
    func (p *Pool) Put(x interface{})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var bufPool = sync.Pool{
	New: func() interface{} {
		return new(bytes.Buffer)
	},
}

func usePool(key, val string) {
	// Получить временный объект, автоматически создать, если недоступен
	b := bufPool.Get().(*bytes.Buffer)
	b.Reset()
	b.WriteString(key)
	b.WriteByte('=')
	b.WriteString(val)
	os.Stdout.Write(b.Bytes())
	// Вернуть временный объект в Pool
	bufPool.Put(b)
}

Ссылки:

  1. Кратко о связанных методах использования пакета Golang sync
  2. Сравнение производительности map


💬 讨论 / Discussion

对这篇文章有想法?欢迎在 GitHub 上发起讨论。
Have thoughts on this post? Start a discussion on GitHub.

在 GitHub 参与讨论 / Discuss on GitHub