Channel
背景
在多核 CPU 机器下,为了充分利用多核计算机的资源,需要进行并发编程,提高对CPU的利用率,并发从大的角度上讲,并发分为传统并发和基于消息传递的并发。
基本概念
传统并发
1[线程1] ----\
2 >--[共享内存区域]--<
3[线程2] ----/ | \
4 | \
5[线程3] --------------/ \
6[线程4] --------------------------/
即多线程开发,使用共享内存的方式实现,如 Java、C++ 等。
- 多个线程通过读写共享内存通信
- 需要复杂的锁机制,容易产生竞态条件
基于消息的并发模型
Actor
1[Actor A] --(消息)--> [邮箱] --> [Actor B]
2 ↑ |
3 |_______________________|
4 (消息)
- 每个 Actor 有自己的状态和邮箱
- 通过异步消息传递通信
CSP
Communicating Sequential Processes
1[进程/goroutine X] --[channel]--> [进程/goroutine Y]
2 ↑ |
3 |________________________________|
4 [channel]
- 通信实体通过 channel 直接连接
- 同步/异步通信取决于 channel 类型
维度 | CSP (Go) | Actor |
---|---|---|
通信焦点 | Channel(媒介为中心) | Actor(实体为中心) |
连接方式 | 显式 Channel 连接 | 通过地址/引用发送消息 |
同步性 | 支持同步和异步 | 通常是异步 |
状态管理 | 无强制要求(可共享可不共享) | 强制封装(状态不共享) |
组合方式 | Channel 可灵活组合 | 通常通过消息转发链实现 |
Channel
Channel 是 Go 语言中实现 CSP (Communicating Sequential Processes) 并发模型的关键结构,它提供了一种安全、高效的方式让 goroutine 之间进行数据传递。S
基本操作
创建
1type hchan struct {
2 // chan 里元素数量
3 qcount uint
4 // chan 底层循环数组的长度
5 dataqsiz uint
6 // 指向底层循环数组的指针
7 // 只针对有缓冲的 channel
8 buf unsafe.Pointer
9 // chan 中元素大小
10 elemsize uint16
11 // chan 是否被关闭的标志
12 closed uint32
13 // chan 中元素类型
14 elemtype *_type // element type
15 // 已发送元素在循环数组中的索引
16 sendx uint // send index
17 // 已接收元素在循环数组中的索引
18 recvx uint // receive index
19 // 等待接收的 goroutine 队列
20 recvq waitq // list of recv waiters
21 // 等待发送的 goroutine 队列
22 sendq waitq // list of send waiters
23
24 // 保护 hchan 中所有字段
25 lock mutex
26}
1ch := make(chan int) // 无缓冲 channel
2ch := make(chan int, 10) // 有缓冲 channel,容量为10
1// src/runtime/chan.go
2func makechan(t *chantype, size int) *hchan {
3 elem := t.Elem
4
5 // compiler checks this but be safe.
6 if elem.Size_ >= 1<<16 {
7 throw("makechan: invalid channel element type")
8 }
9 if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
10 throw("makechan: bad alignment")
11 }
12
13 mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
14 if overflow || mem > maxAlloc-hchanSize || size < 0 {
15 panic(plainError("makechan: size out of range"))
16 }
17
18 // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
19 // buf points into the same allocation, elemtype is persistent.
20 // SudoG's are referenced from their owning thread so they can't be collected.
21 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
22 var c *hchan
23 switch {
24 case mem == 0:
25 // 零内存需求(无缓冲或零大小元素)
26 // 只分配 hchan 结构体本身的内存
27 c = (*hchan)(mallocgc(hchanSize, nil, true))
28 // 竞态检测器使用这个位置进行同步
29 c.buf = c.raceaddr()
30 case elem.PtrBytes == 0:
31 // 元素不包含指针
32 // 一次性分配 hchan 和 buf 的内存
33 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
34 c.buf = add(unsafe.Pointer(c), hchanSize)
35 default:
36 // 元素包含指针,分别分配 hchan 结构体和缓冲区内存
37 c = new(hchan)
38 c.buf = mallocgc(mem, elem, true)
39 }
40
41 c.elemsize = uint16(elem.Size_)
42 c.elemtype = elem
43 c.dataqsiz = uint(size)
44 lockInit(&c.lock, lockRankHchan)
45
46 if debugChan {
47 print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
48 }
49 return c
50}
发送和接收
1ch <- value // 发送值到 channel
2value := <-ch // 从 channel 接收值
特性 | 无缓冲 Channel | 有缓冲 Channel |
---|---|---|
创建方式 | make(chan T) | make(chan T, capacity) |
同步性 | 完全同步 | 异步(缓冲区未满/非空时) |
阻塞时机 | 双方就绪才不阻塞 | 发送方在满时阻塞,接收方在空时阻塞 |
内存分配 | 无额外内存 | 需要分配缓冲区内存 |
适用场景 | 强同步需求 | 生产消费速率不一致时缓冲 |
无缓冲 Channel 同步:
1 Goroutine A Channel Goroutine B 2 | | | 3 |-- 发送数据 --> | | 4 | |<-- 等待接收 ---| 5 | | | 6 | |-- 数据传递 --->| 7 | | |
有缓冲 Channel 异步:
1 Goroutine A Channel Goroutine B 2 | [ ] | 3 |-- 数据1 ----> [1] | 4 | [1][ ] | 5 |-- 数据2 ----> [1][2] | 6 | [1][2][ ] | 7 | |<-- 取数据1 ----| 8 | [2][ ] |
发送
1// 省略了一些内容 2func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { 3 if c == nil { 4 // 非阻塞模式:直接返回 false 5 if !block { 6 return false 7 } 8 // 阻塞模式:永久挂起 goroutine 9 gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) 10 throw("unreachable") 11 } 12 13 // 对于不阻塞的 send,快速检测失败场景,避免不必要的锁获取,提升性能 14 // 非阻塞模式,channel 未关闭,channel 已满 15 // 1. 无缓冲通道,且等待接收队列里没有 goroutine 16 // 2. 有缓冲通道,循环数组已满 17 if !block && c.closed == 0 && full(c) { 18 return false 19 } 20 21 var t0 int64 22 if blockprofilerate > 0 { 23 t0 = cputicks() 24 } 25 26 // 锁住 channel,并发安全 27 lock(&c.lock) 28 29 // 向已关闭 channel 发送会 panic 30 if c.closed != 0 { 31 unlock(&c.lock) 32 panic(plainError("send on closed channel")) 33 } 34 35 // 场景1:有等待的接收者 36 // 绕过缓冲区,直接将数据交给接收者,避免不必要的缓冲区操作 37 if sg := c.recvq.dequeue(); sg != nil { 38 send(c, sg, ep, func() { unlock(&c.lock) }, 3) 39 return true 40 } 41 42 // 场景2:缓冲区有空间 43 // 对于缓冲型的 channel 44 // 计算缓冲区位置,将数据拷贝到缓冲区,更新发送索引,增加元素计数,释放锁 45 if c.qcount < c.dataqsiz { 46 qp := chanbuf(c, c.sendx) 47 if raceenabled { 48 racenotify(c, c.sendx, nil) 49 } 50 typedmemmove(c.elemtype, qp, ep) 51 c.sendx++ 52 if c.sendx == c.dataqsiz { 53 c.sendx = 0 54 } 55 c.qcount++ 56 unlock(&c.lock) 57 return true 58 } 59 60 // 场景3:阻塞等待 61 // 非阻塞模式,直接返回false 62 if !block { 63 unlock(&c.lock) 64 return false 65 } 66 // 阻塞模式,获取当前 goroutine,创建并初始化调度单元,加入 channel 的发送队列,挂起当前 goroutine 67 gp := getg() 68 mysg := acquireSudog() 69 mysg.releasetime = 0 70 if t0 != 0 { 71 mysg.releasetime = -1 72 } 73 mysg.elem = ep 74 mysg.waitlink = nil 75 mysg.g = gp 76 mysg.isSelect = false 77 mysg.c = c 78 gp.waiting = mysg 79 gp.param = nil 80 c.sendq.enqueue(mysg) 81 gp.parkingOnChan.Store(true) 82 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) 83 KeepAlive(ep) 84 85 // 唤醒后处理 86 if mysg != gp.waiting { 87 throw("G waiting list is corrupted") 88 } 89 gp.waiting = nil 90 gp.activeStackChans = false 91 closed := !mysg.success 92 gp.param = nil 93 if mysg.releasetime > 0 { 94 blockevent(mysg.releasetime-t0, 2) 95 } 96 mysg.c = nil 97 releaseSudog(mysg) 98 if closed { 99 if c.closed == 0 { 100 throw("chansend: spurious wakeup") 101 } 102 panic(plainError("send on closed channel")) 103 } 104 return true 105}
发送流程总结:
- 检查 nil channel 和快速失败路径
- 加锁并检查 channel 状态
- 按优先级处理三种情况:
- 有等待接收者 → 直接传递
- 缓冲区有空间 → 缓冲数据
- 需要阻塞等待 → 挂起 goroutine
- 唤醒后清理状态并返回结果
接收
- 相同点:
- 都使用快速路径优化
- 类似的加锁模式
- 相同的阻塞/唤醒机制
- 共享 sudog 数据结构
- 不同点:
- 发送 panic 关闭的 channel,接收返回零值
- 发送关注 recvq,接收关注 sendq
- 发送处理缓冲区满,接收处理缓冲区空
关闭
1close(ch) // 关闭 channel
- 只有发送者可以关闭 channel
- 关闭一个已关闭的 channel 会导致 panic
- 可以从已关闭的 channel 接收数据,直到 channel 为空
1func closechan(c *hchan) {
2 if c == nil {
3 panic(plainError("close of nil channel"))
4 }
5
6 lock(&c.lock)
7 // 幂等性检查:重复关闭已关闭的 channel 会 panic
8 if c.closed != 0 {
9 unlock(&c.lock)
10 panic(plainError("close of closed channel"))
11 }
12
13 if raceenabled {
14 callerpc := getcallerpc()
15 racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
16 racerelease(c.raceaddr())
17 }
18
19 c.closed = 1
20
21 var glist gList
22
23 // 处理接收队列 (recvq)
24 for {
25 sg := c.recvq.dequeue()
26 if sg == nil {
27 break
28 }
29 if sg.elem != nil {
30 // 清空接收数据:将接收者的 elem 置零(返回零值)
31 typedmemclr(c.elemtype, sg.elem)
32 sg.elem = nil
33 }
34 if sg.releasetime != 0 {
35 sg.releasetime = cputicks()
36 }
37 gp := sg.g
38 gp.param = unsafe.Pointer(sg)
39 sg.success = false
40 if raceenabled {
41 raceacquireg(gp, c.raceaddr())
42 }
43 glist.push(gp)
44 }
45
46 // 处理发送队列 (sendq)
47 for {
48 sg := c.sendq.dequeue()
49 if sg == nil {
50 break
51 }
52 // 清空发送数据:发送者的 elem 置 nil
53 sg.elem = nil
54 if sg.releasetime != 0 {
55 sg.releasetime = cputicks()
56 }
57 gp := sg.g
58 gp.param = unsafe.Pointer(sg)
59 sg.success = false
60 if raceenabled {
61 raceacquireg(gp, c.raceaddr())
62 }
63 glist.push(gp)
64 }
65 unlock(&c.lock)
66
67 // 批量唤醒 goroutine
68 for !glist.empty() {
69 gp := glist.pop()
70 gp.schedlink = 0
71 goready(gp, 3)
72 }
73}
关闭流程全景:
- 验证阶段:
- 检查 nil channel
- 检查重复关闭
- 标记阶段:
- 设置关闭标志
- 处理竞态检测
- 清理阶段:
- 处理所有等待接收者(返回零值)
- 处理所有等待发送者(将 panic)
- 唤醒阶段:
- 释放 channel 锁
- 批量唤醒所有等待的 goroutine
close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中分别保存了阻塞的发送者和接收者。关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有接收者的情况下,不能贸然关闭 channel。
close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。
唤醒之后,该干嘛干嘛。sender 会继续执行 chansend 函数里 goparkunlock 函数之后的代码,很不幸,检测到 channel 已经关闭了,panic。receiver 则比较幸运,进行一些扫尾工作后,返回。这里,selected 返回 true,而返回值 received 则要根据 channel 是否关闭,返回不同的值。如果 channel 关闭,received 为 false,否则为 true。这我们分析的这种情况下,received 返回 false。
从一个有缓冲的 channel 里读数据,当 channel 被关闭,仍然可以读出有效值,只有当返回的 received 为 false 时,读出的数据才是无效的。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
特性
- 线程安全:channel 操作是原子性的,不需要额外的锁
- 阻塞行为:无缓冲 channel 会阻塞直到发送和接收都准备好
- 先进先出:channel 中的数据按照发送顺序被接收
- 类型安全:channel 有明确的类型,只能发送和接收指定类型的数据
使用模式
同步通信
1func worker(done chan bool) {
2 fmt.Println("working...")
3 time.Sleep(time.Second)
4 fmt.Println("done")
5 done <- true
6}
7
8func main() {
9 done := make(chan bool)
10 go worker(done)
11 <-done // 等待worker完成
12}
我经常用于监听信号量,用于优雅退出进程
生产者-消费者模式
1func producer(ch chan<- int) {
2 for i := 0; i < 10; i++ {
3 ch <- i
4 }
5 close(ch)
6}
7
8func consumer(ch <-chan int) {
9 for num := range ch {
10 fmt.Println("Received:", num)
11 }
12}
13
14func main() {
15 ch := make(chan int, 5)
16 go producer(ch)
17 consumer(ch)
18}
多路复用
1select {
2case msg1 := <-ch1:
3 fmt.Println("received", msg1)
4case msg2 := <-ch2:
5 fmt.Println("received", msg2)
6case ch3 <- 3:
7 fmt.Println("sent 3")
8default:
9 fmt.Println("no communication")
10}
超时控制
1select {
2case res := <-ch:
3 fmt.Println(res)
4case <-time.After(1 * time.Second):
5 fmt.Println("timeout")
6}
方向性
可以指定 channel 是只发送或只接收的:
1func ping(pings chan<- string, msg string) { // 只发送channel
2 pings <- msg
3}
4
5func pong(pings <-chan string, pongs chan<- string) { // 接收和发送channel
6 msg := <-pings
7 pongs <- msg
8}
注意事项
- 不要关闭接收端的 channel:这会导致 panic
- 避免向已关闭的 channel 发送数据:这会导致 panic
- nil channel:对 nil channel 的发送和接收会永久阻塞
- 资源泄漏:不使用的 goroutine 和 channel 可能导致内存泄漏
- 死锁:不正确的 channel 使用可能导致所有 goroutine 阻塞
操作 | nil channel | closed channel | not nil, not closed channel |
---|---|---|---|
close | panic | panic | 正常关闭 |
读 <- ch | 阻塞 | 读到对应类型的零值 | 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞 |
写 ch <- | 阻塞 | panic | 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞 |
发生 panic 的情况有三种:向一个关闭的 channel 进行写操作;关闭一个 nil 的 channel;重复关闭一个 channel。
读、写一个 nil channel 都会被阻塞。
性能考虑
- 无缓冲 channel 比有缓冲 channel 性能更高,因为减少了内存分配
- channel 操作比 mutex 更重,在简单场景下 mutex 可能更高效
- 大量小消息通过 channel 传递可能影响性能