并发相关
总结
| type | 作用 |
|---|---|
| Cond | 发令枪,一般预设一个条件让子任务等待,发出的信号可以是单个(Signal)也可集体广播(Broadcast) |
| Locker | 简单接口 |
| Mutex | 互斥锁 |
| Once | 并发运行,只允许一次 |
| RWMutex | 读写锁,多读少写,同时读锁,读写互斥. |
| WaitGroup | 分发任务,主线程等待所有任务完成 |
Cond
1
2
3
4
5
type Cond
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
加入到通知列表 -> 解锁 L -> 等待通知 -> 锁定 L
虽然放在最前面,但我花了最长的时间去理解这玩意.
按照我的理解,Cond就好比一个发令枪.比如我同时养了5条狗,并同时准备了5份食物,但是没有我的口令,我不准它们吃.示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func useCondBroadcast() {
var count int = 5
ch := make(chan struct{}, 5)
// 新建 cond
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// 争抢互斥锁的锁定
cond.L.Lock()
// 条件是否达成
for count > i {
cond.Wait()
fmt.Printf("收到一个通知 goroutine%d\n", i)
}
fmt.Printf("goroutine%d 执行结束\n", i)
cond.L.Unlock()
ch <- struct{}{}
}(i)
}
// 确保所有 goroutine 启动完成
time.Sleep(time.Millisecond * 20)
fmt.Println("broadcast...")
cond.L.Lock()
count = -1
cond.Broadcast()
cond.L.Unlock()
for i := 0; i < 5; i++ {
<-ch
}
}
func useCondSignal() {
var count int = 5
ch := make(chan struct{}, 5)
// 新建 cond
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// 争抢互斥锁的锁定
cond.L.Lock()
// 条件是否达成
for count > i {
cond.Wait()
fmt.Printf("收到一个通知 goroutine%d\n", i)
}
fmt.Printf("goroutine%d 执行结束\n", i)
cond.L.Unlock()
ch <- struct{}{}
}(i)
}
// 确保所有 goroutine 启动完成
time.Sleep(time.Millisecond * 20)
time.Sleep(time.Second)
fmt.Println("signal...")
cond.L.Lock()
count = -1
cond.Signal()
cond.L.Unlock()
//fatal error: all goroutines are asleep - deadlock!
for i := 0; i < 1; i++ {
<-ch
}
}
这时
场景:喂狗
goroutine:每一条狗吃饭的行为
Broadcast()方法:通知所有狗吃饭
Signal()方法:通知随机一条狗吃饭
例子中count变量: 指示狗吃饭的信号
例子中的ch变量:狗拉的便便
useCondBroadcast()和useCondSignal这2个例子,差别只在于最后管道的读取游标(i).
Broadcast方法通知的对象是所有的狗,所以最后所有的狗都顺利开吃(i=4).
Signal只通知了一条狗,所以最后只有一条狗拉出了便便(i=0)
所以如果只有一条狗,那么使用Signal效果等同于Broadcast.
用Signal和Broadcast方法都好,如果设置了管道(ch := make(chan struct{}, 5))去接收最后的结果,要注意设置的临界值变化导致的最后出来的结果数量.
取少了没关系,取多了会出现fatal error: all goroutines are asleep - deadlock!这个panic(比如,在useCondSignal这个例子里面,把i<1改成i<2),后果不堪设想.
关于Cond实际的使用场景,我觉得把Cond应用于最优解.比如说我要爬取同一个网页,可能有ABCD四种方案,我只需要其中一个方案最快完成即可.那么只要其中一个任务完成,在主线程发起Broadcast,这样其他方案就不用白忙活了,可以退出舞台.
暂时没想到Signal的实际用法,以后有机会再补充吧.
真正理解了Cond锁的争抢方式之后,Broadcast和Signal交替使用也就不再有什么问题.
Locker
只是一个简单的接口.
1
2
3
4
type Locker interface {
Lock()
Unlock()
}
Mutex
1
2
3
type Mutex
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func useMutex() {
ch := make(chan struct{}, 2)
var l sync.Mutex
go func() {
l.Lock()
defer l.Unlock()
fmt.Println("goroutine1: 我会锁定大概 2s")
time.Sleep(time.Second * 2)
fmt.Println("goroutine1: 我解锁了,你们去抢吧")
ch <- struct{}{}
}()
go func() {
fmt.Println("groutine2: 等待解锁")
l.Lock()
defer l.Unlock()
fmt.Println("goroutine2: 哈哈,我锁定了")
ch <- struct{}{}
}()
// 等待 goroutine 执行结束
for i := 0; i < 2; i++ {
<-ch
}
}
Once
1
2
type Once
func (o *Once) Do(f func())
如其名,Once里的Do函数只会运行一次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func useOnce() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
RWMutex
1
2
3
4
5
6
type RWMutex
func (rw *RWMutex) Lock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RLocker() Locker
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Unlock()
RWMutex是基于Mutex实现的.
读写锁,一般用在大量读操作、少量写操作的情况
- 同时只能有一个 goroutine 能够获得写锁定。
- 同时可以有任意多个 gorouinte 获得读锁定。
- 同时只能存在写锁定或读锁定(读和写互斥)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func useRWMutex() {
ch := make(chan struct{}, 10)
for i := 0; i < 5; i++ {
go read(i, ch)
}
for i := 0; i < 5; i++ {
go write(i, ch)
}
for i := 0; i < 10; i++ {
<-ch
}
}
var count int
var rw sync.RWMutex
func read(n int, ch chan struct{}) {
rw.RLock()
fmt.Printf("goroutine %d 进入读操作...\n", n)
v := count
fmt.Printf("goroutine %d 读取结束,值为:%d\n", n, v)
rw.RUnlock()
ch <- struct{}{}
}
func write(n int, ch chan struct{}) {
rw.Lock()
fmt.Printf("goroutine %d 进入写操作...\n", n)
v := rand.Intn(1000)
count = v
fmt.Printf("goroutine %d 写入结束,新值为:%d\n", n, v)
rw.Unlock()
ch <- struct{}{}
}
WaitGroup
1
2
3
4
5
6
type WaitGroup
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
Examples(Expand All)
简单的多任务分发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func useWaitGroup() {
// 通过sync包中的WaitGroup 实现并发控制
var wg sync.WaitGroup
wg.Add(1)
go func(wg *sync.WaitGroup) {
time.Sleep(5 * time.Second)
fmt.Println("1 done")
wg.Done()
}(&wg)
wg.Add(1)
go func(wg *sync.WaitGroup) {
time.Sleep(9 * time.Second)
fmt.Println("2 done")
wg.Done()
}(&wg)
wg.Wait()
fmt.Println("handle2 done")
// 在 sync 包中,提供了 WaitGroup ,它会等待它收集的所有 goroutine 任务全部完成,在主 goroutine 中 Add(delta int) 索要等待goroutine 的数量。在每一个 goroutine 完成后 Done() 表示这一个goroutine 已经完成,当所有的 goroutine 都完成后,在主 goroutine 中 WaitGroup 返回。
}
数据结构
Map
1
2
3
4
5
6
type Map
func (m *Map) Delete(key interface{})
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
func (m *Map) Range(f func(key, value interface{}) bool)
func (m *Map) Store(key, value interface{})
适用场景
线程安全集合,在2个场景做了优化
- 只写1次,多次读
- 多个goroutines读写互不相同的键(比如goroutines1读写key1,goroutines2读写key2)
方法介绍
Load 读取
LoadOrStore 读取不到则写入
Store 写入
Range 无法直接遍历,得通过回调的方式遍历
具体用法见
Pool
1
2
3
type Pool
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func usePool(key, val string) {
// 获取临时对象,没有的话会自动创建
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
os.Stdout.Write(b.Bytes())
// 将临时对象放回到 Pool 中
bufPool.Put(b)
}
参考链接:
Concurrency Related
Summary
| type | purpose |
|---|---|
| Cond | Starting gun, usually pre-sets a condition for subtasks to wait; the signal can be single (Signal) or broadcast (Broadcast) |
| Locker | Simple interface |
| Mutex | Mutual exclusion lock |
| Once | Concurrent execution, only allowed once |
| RWMutex | Read-write lock, many reads and few writes, simultaneous read locks, read-write mutual exclusion. |
| WaitGroup | Distribute tasks, main thread waits for all tasks to complete |
Cond
1
2
3
4
5
type Cond
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
Add to notification list -> Unlock L -> Wait for notification -> Lock L
Although it’s placed at the very beginning, I spent the longest time trying to understand this thing.
According to my understanding, Cond is like a starting gun. For example, I have 5 dogs and 5 portions of food ready at the same time, but I won’t let them eat without my command. The example code is as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func useCondBroadcast() {
var count int = 5
ch := make(chan struct{}, 5)
// Create new cond
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// Compete for mutual exclusion lock
cond.L.Lock()
// Check if condition is met
for count > i {
cond.Wait()
fmt.Printf("Received a notification goroutine%d\n", i)
}
fmt.Printf("goroutine%d execution finished\n", i)
cond.L.Unlock()
ch <- struct{}{}
}(i)
}
// Ensure all goroutines have started
time.Sleep(time.Millisecond * 20)
fmt.Println("broadcast...")
cond.L.Lock()
count = -1
cond.Broadcast()
cond.L.Unlock()
for i := 0; i < 5; i++ {
<-ch
}
}
func useCondSignal() {
var count int = 5
ch := make(chan struct{}, 5)
// Create new cond
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// Compete for mutual exclusion lock
cond.L.Lock()
// Check if condition is met
for count > i {
cond.Wait()
fmt.Printf("Received a notification goroutine%d\n", i)
}
fmt.Printf("goroutine%d execution finished\n", i)
cond.L.Unlock()
ch <- struct{}{}
}(i)
}
// Ensure all goroutines have started
time.Sleep(time.Millisecond * 20)
time.Sleep(time.Second)
fmt.Println("signal...")
cond.L.Lock()
count = -1
cond.Signal()
cond.L.Unlock()
//fatal error: all goroutines are asleep - deadlock!
for i := 0; i < 1; i++ {
<-ch
}
}
At this point
Scenario: Feeding dogs
goroutine: The act of each dog eating
Broadcast() method: Notify all dogs to eat
Signal() method: Notify a random dog to eat
count variable in the example: Indicates the signal for dogs to eat
ch variable in the example: The poop the dogs pull
The difference between useCondBroadcast() and useCondSignal examples is only in the final pipe’s read cursor (i).
The Broadcast method notifies all dogs, so all dogs successfully start eating (i=4).
Signal only notifies one dog, so only one dog poops in the end (i=0).
So if there is only one dog, using Signal has the same effect as Broadcast.
Both Signal and Broadcast methods are fine. If a channel (ch := make(chan struct{}, 5)) is set up to receive the final result, be careful about the change in the threshold value, which affects the number of results.
It’s okay to take fewer, but taking too many will cause a fatal error: all goroutines are asleep - deadlock! panic (e.g., in the useCondSignal example, change i<1 to i<2), with unimaginable consequences.
Regarding the actual use case of Cond, I think Cond is best applied to an optimal solution. For example, if I want to crawl the same webpage, there might be four schemes A, B, C, D, and I only need one of them to finish the fastest. Then, as soon as one task completes, Broadcast is initiated in the main thread, so other schemes don’t have to work in vain and can exit the stage.
I haven’t thought of an actual use for Signal yet; I’ll add it later if I have a chance.
Once the contention mechanism of Cond lock is truly understood, alternating Broadcast and Signal no longer poses any problem.
Locker
Just a simple interface.
1
2
3
4
type Locker interface {
Lock()
Unlock()
}
Mutex
1
2
3
type Mutex
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
Mutual exclusion lock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func useMutex() {
ch := make(chan struct{}, 2)
var l sync.Mutex
go func() {
l.Lock()
defer l.Unlock()
fmt.Println("goroutine1: I will lock for about 2s")
time.Sleep(time.Second * 2)
fmt.Println("goroutine1: I unlocked, go grab it")
ch <- struct{}{}
}()
go func() {
fmt.Println("groutine2: Waiting for unlock")
l.Lock()
defer l.Unlock()
fmt.Println("goroutine2: Haha, I locked it")
ch <- struct{}{}
}()
// Wait for goroutines to finish
for i := 0; i < 2; i++ {
<-ch
}
}
Once
1
2
type Once
func (o *Once) Do(f func())
As its name suggests, the Do function in Once will only run once.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func useOnce() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
RWMutex
1
2
3
4
5
6
type RWMutex
func (rw *RWMutex) Lock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RLocker() Locker
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Unlock()
RWMutex is implemented based on Mutex.
Read-write lock, generally used in scenarios with a large number of read operations and a small number of write operations.
- Only one goroutine can acquire a write lock at a time.
- Any number of goroutines can acquire read locks simultaneously.
- Only a write lock or read lock can exist simultaneously (read and write are mutually exclusive).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func useRWMutex() {
ch := make(chan struct{}, 10)
for i := 0; i < 5; i++ {
go read(i, ch)
}
for i := 0; i < 5; i++ {
go write(i, ch)
}
for i := 0; i < 10; i++ {
<-ch
}
}
var count int
var rw sync.RWMutex
func read(n int, ch chan struct{}) {
rw.RLock()
fmt.Printf("goroutine %d entering read operation...\n", n)
v := count
fmt.Printf("goroutine %d finished reading, value is: %d\n", n, v)
rw.RUnlock()
ch <- struct{}{}
}
func write(n int, ch chan struct{}) {
rw.Lock()
fmt.Printf("goroutine %d entering write operation...\n", n)
v := rand.Intn(1000)
count = v
fmt.Printf("goroutine %d finished writing, new value is: %d\n", n, v)
rw.Unlock()
ch <- struct{}{}
}
WaitGroup
1
2
3
4
5
6
type WaitGroup
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
Examples(Expand All)
Simple multi-task distribution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func useWaitGroup() {
// Achieve concurrency control through WaitGroup in the sync package
var wg sync.WaitGroup
wg.Add(1)
go func(wg *sync.WaitGroup) {
time.Sleep(5 * time.Second)
fmt.Println("1 done")
wg.Done()
}(&wg)
wg.Add(1)
go func(wg *sync.WaitGroup) {
time.Sleep(9 * time.Second)
fmt.Println("2 done")
wg.Done()
}(&wg)
wg.Wait()
fmt.Println("handle2 done")
// The sync package provides WaitGroup, which waits for all goroutine tasks it collects to complete. In the main goroutine, Add(delta int) is used to specify the number of goroutines to wait for. After each goroutine completes, Done() indicates that this goroutine is finished. When all goroutines are completed, WaitGroup returns in the main goroutine.
}
Data Structures
Map
1
2
3
4
5
6
type Map
func (m *Map) Delete(key interface{})
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
func (m *Map) Range(f func(key, value interface{}) bool)
func (m *Map) Store(key, value interface{})
Applicable Scenarios
Thread-safe collection, optimized for 2 scenarios:
- Write once, read many times
- Multiple goroutines read and write different keys (e.g., goroutine1 reads/writes key1, goroutine2 reads/writes key2)
Method Introduction
Load: Read LoadOrStore: Read, if not found, then write Store: Write Range: Cannot iterate directly, must iterate through callback
Specific usage can be found in: Go 1.9 sync.Map揭秘
Pool
1
2
3
type Pool
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func usePool(key, val string) {
// Get temporary object, automatically create if not available
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
os.Stdout.Write(b.Bytes())
// Return temporary object to Pool
bufPool.Put(b)
}
Reference Links:
並行関連
まとめ
| type | 目的 |
|---|---|
| Cond | スタートガン、通常、サブタスクが待機する条件を事前に設定し、信号は単一(Signal)または集団ブロードキャスト(Broadcast)にすることができます |
| Locker | シンプルなインターフェース |
| Mutex | 相互排他ロック |
| Once | 並行実行、1回のみ許可 |
| RWMutex | 読み書きロック、多数の読み取りと少数の書き込み、同時読み取りロック、読み書き相互排他。 |
| WaitGroup | タスクを分散し、メインスレッドがすべてのタスクの完了を待機 |
Cond
1
2
3
4
5
type Cond
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
通知リストに追加 -> Lをアンロック -> 通知を待機 -> Lをロック
最初に配置されていますが、このことを理解するのに最も長い時間を費やしました。
私の理解によると、Condはスタートガンのようなものです。たとえば、5匹の犬を同時に飼い、5食分の食べ物を同時に準備しましたが、私の命令なしでは食べさせません。サンプルコードは次のとおりです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func useCondBroadcast() {
var count int = 5
ch := make(chan struct{}, 5)
// 新しいcondを作成
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// 相互排他ロックのロックを競う
cond.L.Lock()
// 条件が満たされているか確認
for count > i {
cond.Wait()
fmt.Printf("通知を受信しました goroutine%d\n", i)
}
fmt.Printf("goroutine%d 実行終了\n", i)
cond.L.Unlock()
ch <- struct{}{}
}(i)
}
// すべてのgoroutineが起動したことを確認
time.Sleep(time.Millisecond * 20)
fmt.Println("broadcast...")
cond.L.Lock()
count = -1
cond.Broadcast()
cond.L.Unlock()
for i := 0; i < 5; i++ {
<-ch
}
}
func useCondSignal() {
var count int = 5
ch := make(chan struct{}, 5)
// 新しいcondを作成
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// 相互排他ロックのロックを競う
cond.L.Lock()
// 条件が満たされているか確認
for count > i {
cond.Wait()
fmt.Printf("通知を受信しました goroutine%d\n", i)
}
fmt.Printf("goroutine%d 実行終了\n", i)
cond.L.Unlock()
ch <- struct{}{}
}(i)
}
// すべてのgoroutineが起動したことを確認
time.Sleep(time.Millisecond * 20)
time.Sleep(time.Second)
fmt.Println("signal...")
cond.L.Lock()
count = -1
cond.Signal()
cond.L.Unlock()
//fatal error: all goroutines are asleep - deadlock!
for i := 0; i < 1; i++ {
<-ch
}
}
この時点で
シナリオ:犬に餌を与える
goroutine:各犬が食べる行為
Broadcast()メソッド:すべての犬に食べることを通知
Signal()メソッド:ランダムな1匹の犬に食べることを通知
例のcount変数:犬が食べる信号を示す
例のch変数:犬が引っ張るうんち
useCondBroadcast()とuseCondSignalの2つの例の違いは、最後のパイプの読み取りカーソル(i)だけです。
Broadcastメソッドはすべての犬に通知するため、すべての犬が正常に食べ始めます(i=4)。
Signalは1匹の犬にのみ通知するため、最後に1匹の犬だけがうんちを出します(i=0)
したがって、犬が1匹しかいない場合、Signalを使用するとBroadcastと同じ効果があります。
SignalとBroadcastの両方のメソッドは問題ありません。チャネル(ch := make(chan struct{}, 5))を設定して最終結果を受信する場合、しきい値の変化に注意してください。これは結果の数に影響します。
少なく取るのは問題ありませんが、多すぎるとfatal error: all goroutines are asleep - deadlock!パニックが発生します(たとえば、useCondSignalの例で、i<1をi<2に変更)、想像を絶する結果になります。
Condの実際の使用ケースについては、Condを最適解に適用するのが最善だと思います。たとえば、同じWebページをクロールする場合、A、B、C、Dの4つのスキームがある可能性があり、そのうちの1つが最も速く完了すれば十分です。その後、1つのタスクが完了するとすぐに、メインスレッドでBroadcastが開始されるため、他のスキームは無駄に作業する必要がなく、ステージを退出できます。
Signalの実際の使用法はまだ考えていません。機会があれば後で追加します。
Condロックの競合メカニズムを本当に理解したら、BroadcastとSignalを交互に使用しても問題はなくなります。
Locker
単純なインターフェースです。
1
2
3
4
type Locker interface {
Lock()
Unlock()
}
Mutex
1
2
3
type Mutex
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
相互排他ロック
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func useMutex() {
ch := make(chan struct{}, 2)
var l sync.Mutex
go func() {
l.Lock()
defer l.Unlock()
fmt.Println("goroutine1: 約2秒間ロックします")
time.Sleep(time.Second * 2)
fmt.Println("goroutine1: ロックを解除しました。取りに行ってください")
ch <- struct{}{}
}()
go func() {
fmt.Println("groutine2: ロック解除を待機中")
l.Lock()
defer l.Unlock()
fmt.Println("goroutine2: はは、ロックしました")
ch <- struct{}{}
}()
// goroutineの実行終了を待機
for i := 0; i < 2; i++ {
<-ch
}
}
Once
1
2
type Once
func (o *Once) Do(f func())
その名のとおり、OnceのDo関数は1回だけ実行されます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func useOnce() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
RWMutex
1
2
3
4
5
6
type RWMutex
func (rw *RWMutex) Lock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RLocker() Locker
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Unlock()
RWMutexはMutexに基づいて実装されています。
読み書きロック、一般的に大量の読み取り操作と少数の書き込み操作のシナリオで使用されます。
- 同時に1つのgoroutineのみが書き込みロックを取得できます。
- 任意の数のgoroutineが同時に読み取りロックを取得できます。
- 書き込みロックまたは読み取りロックのみが同時に存在できます(読み取りと書き込みは相互排他的)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func useRWMutex() {
ch := make(chan struct{}, 10)
for i := 0; i < 5; i++ {
go read(i, ch)
}
for i := 0; i < 5; i++ {
go write(i, ch)
}
for i := 0; i < 10; i++ {
<-ch
}
}
var count int
var rw sync.RWMutex
func read(n int, ch chan struct{}) {
rw.RLock()
fmt.Printf("goroutine %d 読み取り操作に入っています...\n", n)
v := count
fmt.Printf("goroutine %d 読み取り終了、値は:%d\n", n, v)
rw.RUnlock()
ch <- struct{}{}
}
func write(n int, ch chan struct{}) {
rw.Lock()
fmt.Printf("goroutine %d 書き込み操作に入っています...\n", n)
v := rand.Intn(1000)
count = v
fmt.Printf("goroutine %d 書き込み終了、新しい値は:%d\n", n, v)
rw.Unlock()
ch <- struct{}{}
}
WaitGroup
1
2
3
4
5
6
type WaitGroup
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
Examples(Expand All)
シンプルなマルチタスク分散
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func useWaitGroup() {
// syncパッケージのWaitGroupを使用して並行制御を実現
var wg sync.WaitGroup
wg.Add(1)
go func(wg *sync.WaitGroup) {
time.Sleep(5 * time.Second)
fmt.Println("1 done")
wg.Done()
}(&wg)
wg.Add(1)
go func(wg *sync.WaitGroup) {
time.Sleep(9 * time.Second)
fmt.Println("2 done")
wg.Done()
}(&wg)
wg.Wait()
fmt.Println("handle2 done")
// syncパッケージはWaitGroupを提供し、収集したすべてのgoroutineタスクが完了するのを待機します。メインgoroutineでは、Add(delta int)を使用して待機するgoroutineの数を指定します。各goroutineが完了すると、Done()はこのgoroutineが完了したことを示します。すべてのgoroutineが完了すると、WaitGroupはメインgoroutineで返されます。
}
データ構造
Map
1
2
3
4
5
6
type Map
func (m *Map) Delete(key interface{})
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
func (m *Map) Range(f func(key, value interface{}) bool)
func (m *Map) Store(key, value interface{})
適用シナリオ
スレッドセーフなコレクション、2つのシナリオで最適化:
- 1回書き込み、複数回読み取り
- 複数のgoroutineが異なるキーを読み書き(たとえば、goroutine1がkey1を読み書き、goroutine2がkey2を読み書き)
メソッドの紹介
Load:読み取り LoadOrStore:読み取り、見つからない場合は書き込み Store:書き込み Range:直接反復できない、コールバック方式で反復する必要がある
具体的な使用方法は以下を参照: Go 1.9 sync.Map揭秘
Pool
1
2
3
type Pool
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func usePool(key, val string) {
// 一時オブジェクトを取得、利用できない場合は自動的に作成
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
os.Stdout.Write(b.Bytes())
// 一時オブジェクトをPoolに戻す
bufPool.Put(b)
}
参考リンク:
Связанное с параллелизмом
Резюме
| type | назначение |
|---|---|
| Cond | Стартовый пистолет, обычно предустанавливает условие для подзадач ожидания; сигнал может быть одиночным (Signal) или широковещательным (Broadcast) |
| Locker | Простой интерфейс |
| Mutex | Взаимоисключающая блокировка |
| Once | Параллельное выполнение, разрешено только один раз |
| RWMutex | Блокировка чтения-записи, много чтений и мало записей, одновременные блокировки чтения, взаимное исключение чтения-записи. |
| WaitGroup | Распределение задач, главный поток ждет завершения всех задач |
Cond
1
2
3
4
5
type Cond
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
Добавить в список уведомлений -> Разблокировать L -> Ждать уведомления -> Заблокировать L
Хотя это помещено в самом начале, я потратил больше всего времени, пытаясь понять эту вещь.
Согласно моему пониманию, Cond похож на стартовый пистолет. Например, у меня есть 5 собак и 5 порций еды, готовых одновременно, но я не позволю им есть без моей команды. Примерный код выглядит следующим образом:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func useCondBroadcast() {
var count int = 5
ch := make(chan struct{}, 5)
// Создать новый cond
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// Конкурировать за блокировку взаимного исключения
cond.L.Lock()
// Проверить, выполнено ли условие
for count > i {
cond.Wait()
fmt.Printf("Получено уведомление goroutine%d\n", i)
}
fmt.Printf("goroutine%d выполнение завершено\n", i)
cond.L.Unlock()
ch <- struct{}{}
}(i)
}
// Убедиться, что все goroutines запущены
time.Sleep(time.Millisecond * 20)
fmt.Println("broadcast...")
cond.L.Lock()
count = -1
cond.Broadcast()
cond.L.Unlock()
for i := 0; i < 5; i++ {
<-ch
}
}
func useCondSignal() {
var count int = 5
ch := make(chan struct{}, 5)
// Создать новый cond
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// Конкурировать за блокировку взаимного исключения
cond.L.Lock()
// Проверить, выполнено ли условие
for count > i {
cond.Wait()
fmt.Printf("Получено уведомление goroutine%d\n", i)
}
fmt.Printf("goroutine%d выполнение завершено\n", i)
cond.L.Unlock()
ch <- struct{}{}
}(i)
}
// Убедиться, что все goroutines запущены
time.Sleep(time.Millisecond * 20)
time.Sleep(time.Second)
fmt.Println("signal...")
cond.L.Lock()
count = -1
cond.Signal()
cond.L.Unlock()
//fatal error: all goroutines are asleep - deadlock!
for i := 0; i < 1; i++ {
<-ch
}
}
В этот момент
Сценарий: Кормление собак
goroutine: Действие каждой собаки, поедающей еду
Метод Broadcast(): Уведомить всех собак поесть
Метод Signal(): Уведомить случайную собаку поесть
Переменная count в примере: Указывает сигнал для собак поесть
Переменная ch в примере: Какашки, которые тянут собаки
Разница между примерами useCondBroadcast() и useCondSignal только в курсоре чтения финальной трубы (i).
Метод Broadcast уведомляет всех собак, поэтому все собаки успешно начинают есть (i=4).
Signal уведомляет только одну собаку, поэтому в конце только одна собака какает (i=0).
Так что если есть только одна собака, использование Signal имеет тот же эффект, что и Broadcast.
Оба метода Signal и Broadcast хороши. Если настроен канал (ch := make(chan struct{}, 5)) для получения финального результата, будьте осторожны с изменением порогового значения, которое влияет на количество результатов.
Можно взять меньше, но взять слишком много вызовет панику fatal error: all goroutines are asleep - deadlock! (например, в примере useCondSignal изменить i<1 на i<2), с непредсказуемыми последствиями.
Что касается фактического случая использования Cond, я думаю, что Cond лучше всего применять к оптимальному решению. Например, если я хочу сканировать одну и ту же веб-страницу, могут быть четыре схемы A, B, C, D, и мне нужна только одна из них, чтобы завершиться быстрее всего. Затем, как только одна задача завершится, Broadcast инициируется в главном потоке, так что другим схемам не нужно работать впустую и они могут покинуть сцену.
Я еще не придумал фактическое использование Signal; добавлю позже, если будет возможность.
Как только механизм конкуренции блокировки Cond действительно понят, чередование Broadcast и Signal больше не представляет проблемы.
Locker
Просто простой интерфейс.
1
2
3
4
type Locker interface {
Lock()
Unlock()
}
Mutex
1
2
3
type Mutex
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
Взаимоисключающая блокировка
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func useMutex() {
ch := make(chan struct{}, 2)
var l sync.Mutex
go func() {
l.Lock()
defer l.Unlock()
fmt.Println("goroutine1: Я заблокирую примерно на 2s")
time.Sleep(time.Second * 2)
fmt.Println("goroutine1: Я разблокировал, идите захватывать")
ch <- struct{}{}
}()
go func() {
fmt.Println("groutine2: Ожидание разблокировки")
l.Lock()
defer l.Unlock()
fmt.Println("goroutine2: Ха-ха, я заблокировал")
ch <- struct{}{}
}()
// Ждать завершения goroutines
for i := 0; i < 2; i++ {
<-ch
}
}
Once
1
2
type Once
func (o *Once) Do(f func())
Как следует из названия, функция Do в Once будет выполняться только один раз.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func useOnce() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
RWMutex
1
2
3
4
5
6
type RWMutex
func (rw *RWMutex) Lock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RLocker() Locker
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Unlock()
RWMutex реализован на основе Mutex.
Блокировка чтения-записи, обычно используется в сценариях с большим количеством операций чтения и небольшим количеством операций записи.
- Только одна goroutine может получить блокировку записи одновременно.
- Любое количество goroutines может одновременно получить блокировки чтения.
- Может существовать только блокировка записи или блокировка чтения одновременно (чтение и запись взаимно исключают друг друга).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func useRWMutex() {
ch := make(chan struct{}, 10)
for i := 0; i < 5; i++ {
go read(i, ch)
}
for i := 0; i < 5; i++ {
go write(i, ch)
}
for i := 0; i < 10; i++ {
<-ch
}
}
var count int
var rw sync.RWMutex
func read(n int, ch chan struct{}) {
rw.RLock()
fmt.Printf("goroutine %d входит в операцию чтения...\n", n)
v := count
fmt.Printf("goroutine %d закончил чтение, значение: %d\n", n, v)
rw.RUnlock()
ch <- struct{}{}
}
func write(n int, ch chan struct{}) {
rw.Lock()
fmt.Printf("goroutine %d входит в операцию записи...\n", n)
v := rand.Intn(1000)
count = v
fmt.Printf("goroutine %d закончил запись, новое значение: %d\n", n, v)
rw.Unlock()
ch <- struct{}{}
}
WaitGroup
1
2
3
4
5
6
type WaitGroup
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
Examples(Expand All)
Простое распределение многозадачности
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func useWaitGroup() {
// Достичь управления параллелизмом через WaitGroup в пакете sync
var wg sync.WaitGroup
wg.Add(1)
go func(wg *sync.WaitGroup) {
time.Sleep(5 * time.Second)
fmt.Println("1 done")
wg.Done()
}(&wg)
wg.Add(1)
go func(wg *sync.WaitGroup) {
time.Sleep(9 * time.Second)
fmt.Println("2 done")
wg.Done()
}(&wg)
wg.Wait()
fmt.Println("handle2 done")
// Пакет sync предоставляет WaitGroup, который ждет завершения всех собранных им задач goroutine. В главной goroutine Add(delta int) используется для указания количества goroutines для ожидания. После завершения каждой goroutine Done() указывает, что эта goroutine завершена. Когда все goroutines завершены, WaitGroup возвращается в главной goroutine.
}
Структуры данных
Map
1
2
3
4
5
6
type Map
func (m *Map) Delete(key interface{})
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
func (m *Map) Range(f func(key, value interface{}) bool)
func (m *Map) Store(key, value interface{})
Применимые сценарии
Потокобезопасная коллекция, оптимизирована для 2 сценариев:
- Записать один раз, читать много раз
- Несколько goroutines читают и пишут разные ключи (например, goroutine1 читает/пишет key1, goroutine2 читает/пишет key2)
Введение методов
Load: Чтение LoadOrStore: Чтение, если не найдено, затем запись Store: Запись Range: Нельзя итерировать напрямую, нужно итерировать через callback
Конкретное использование можно найти в: Go 1.9 sync.Map揭秘
Pool
1
2
3
type Pool
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func usePool(key, val string) {
// Получить временный объект, автоматически создать, если недоступен
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
os.Stdout.Write(b.Bytes())
// Вернуть временный объект в Pool
bufPool.Put(b)
}
Ссылки:
💬 讨论 / Discussion
对这篇文章有想法?欢迎在 GitHub 上发起讨论。
Have thoughts on this post? Start a discussion on GitHub.