Channel

背景

在多核 CPU 机器下,为了充分利用多核计算机的资源,需要进行并发编程,提高对CPU的利用率,并发从大的角度上讲,并发分为传统并发和基于消息传递的并发。

基本概念

传统并发

1[线程1] ----\
2             >--[共享内存区域]--<
3[线程2] ----/        |         \
4                    |          \
5[线程3] --------------/          \
6[线程4] --------------------------/

即多线程开发,使用共享内存的方式实现,如 Java、C++ 等。

  • 多个线程通过读写共享内存通信
  • 需要复杂的锁机制,容易产生竞态条件

基于消息的并发模型

Actor

1[Actor A]  --(消息)--> [邮箱] --> [Actor B]
2   ↑                       |
3   |_______________________|
4             (消息)
  • 每个 Actor 有自己的状态和邮箱
  • 通过异步消息传递通信

CSP

Communicating Sequential Processes

1[进程/goroutine X] --[channel]--> [进程/goroutine Y]
2      ↑                                |
3      |________________________________|
4               [channel]
  • 通信实体通过 channel 直接连接
  • 同步/异步通信取决于 channel 类型
维度CSP (Go)Actor
通信焦点Channel(媒介为中心)Actor(实体为中心)
连接方式显式 Channel 连接通过地址/引用发送消息
同步性支持同步和异步通常是异步
状态管理无强制要求(可共享可不共享)强制封装(状态不共享)
组合方式Channel 可灵活组合通常通过消息转发链实现

Channel

Channel 是 Go 语言中实现 CSP (Communicating Sequential Processes) 并发模型的关键结构,它提供了一种安全、高效的方式让 goroutine 之间进行数据传递。S

基本操作

创建

 1type hchan struct {
 2	// chan 里元素数量
 3	qcount   uint
 4	// chan 底层循环数组的长度
 5	dataqsiz uint
 6	// 指向底层循环数组的指针
 7	// 只针对有缓冲的 channel
 8	buf      unsafe.Pointer
 9	// chan 中元素大小
10	elemsize uint16
11	// chan 是否被关闭的标志
12	closed   uint32
13	// chan 中元素类型
14	elemtype *_type // element type
15	// 已发送元素在循环数组中的索引
16	sendx    uint   // send index
17	// 已接收元素在循环数组中的索引
18	recvx    uint   // receive index
19	// 等待接收的 goroutine 队列
20	recvq    waitq  // list of recv waiters
21	// 等待发送的 goroutine 队列
22	sendq    waitq  // list of send waiters
23
24	// 保护 hchan 中所有字段
25	lock mutex
26}
1ch := make(chan int)     // 无缓冲 channel
2ch := make(chan int, 10) // 有缓冲 channel,容量为10
 1// src/runtime/chan.go
 2func makechan(t *chantype, size int) *hchan {
 3	elem := t.Elem
 4
 5	// compiler checks this but be safe.
 6	if elem.Size_ >= 1<<16 {
 7		throw("makechan: invalid channel element type")
 8	}
 9	if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
10		throw("makechan: bad alignment")
11	}
12
13	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
14	if overflow || mem > maxAlloc-hchanSize || size < 0 {
15		panic(plainError("makechan: size out of range"))
16	}
17
18	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
19	// buf points into the same allocation, elemtype is persistent.
20	// SudoG's are referenced from their owning thread so they can't be collected.
21	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
22	var c *hchan
23	switch {
24	case mem == 0:
25		// 零内存需求(无缓冲或零大小元素)
26        // 只分配 hchan 结构体本身的内存
27		c = (*hchan)(mallocgc(hchanSize, nil, true))
28		// 竞态检测器使用这个位置进行同步
29		c.buf = c.raceaddr()
30	case elem.PtrBytes == 0:
31		// 元素不包含指针
32    	// 一次性分配 hchan 和 buf 的内存
33		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
34		c.buf = add(unsafe.Pointer(c), hchanSize)
35	default:
36		// 元素包含指针,分别分配 hchan 结构体和缓冲区内存
37		c = new(hchan)
38		c.buf = mallocgc(mem, elem, true)
39	}
40
41	c.elemsize = uint16(elem.Size_)
42	c.elemtype = elem
43	c.dataqsiz = uint(size)
44	lockInit(&c.lock, lockRankHchan)
45
46	if debugChan {
47		print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
48	}
49	return c
50}

发送和接收

1ch <- value   // 发送值到 channel
2value := <-ch // 从 channel 接收值
特性无缓冲 Channel有缓冲 Channel
创建方式make(chan T)make(chan T, capacity)
同步性完全同步异步(缓冲区未满/非空时)
阻塞时机双方就绪才不阻塞发送方在满时阻塞,接收方在空时阻塞
内存分配无额外内存需要分配缓冲区内存
适用场景强同步需求生产消费速率不一致时缓冲
  1. 无缓冲 Channel 同步:

    1 Goroutine A       Channel      Goroutine B
    2    |               |               |
    3    |-- 发送数据 -->  |               |
    4    |               |<-- 等待接收 ---|
    5    |               |               |
    6    |               |-- 数据传递 --->|
    7    |               |               |
    
  2. 有缓冲 Channel 异步:

    1 Goroutine A       Channel      Goroutine B
    2    |               [ ]              |
    3    |-- 数据1 ----> [1]              |
    4    |               [1][ ]           |
    5    |-- 数据2 ----> [1][2]           |
    6    |               [1][2][ ]        |
    7    |               |<-- 取数据1 ----|
    8    |               [2][ ]           |
    
    发送
      1// 省略了一些内容
      2func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      3	if c == nil {
      4        // 非阻塞模式:直接返回 false
      5		if !block {
      6			return false
      7		}
      8        // 阻塞模式:永久挂起 goroutine
      9		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
     10		throw("unreachable")
     11	}
     12
     13    // 对于不阻塞的 send,快速检测失败场景,避免不必要的锁获取,提升性能
     14	// 非阻塞模式,channel 未关闭,channel 已满
     15    // 1. 无缓冲通道,且等待接收队列里没有 goroutine
     16    // 2. 有缓冲通道,循环数组已满
     17	if !block && c.closed == 0 && full(c) {
     18		return false
     19	}
     20
     21	var t0 int64
     22	if blockprofilerate > 0 {
     23		t0 = cputicks()
     24	}
     25
     26    // 锁住 channel,并发安全
     27	lock(&c.lock)
     28
     29 	// 向已关闭 channel 发送会 panic
     30	if c.closed != 0 {
     31		unlock(&c.lock)
     32		panic(plainError("send on closed channel"))
     33	}
     34
     35    // 场景1:有等待的接收者
     36    // 绕过缓冲区,直接将数据交给接收者,避免不必要的缓冲区操作
     37	if sg := c.recvq.dequeue(); sg != nil {
     38		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
     39		return true
     40	}
     41
     42    // 场景2:缓冲区有空间
     43    // 对于缓冲型的 channel
     44    // 计算缓冲区位置,将数据拷贝到缓冲区,更新发送索引,增加元素计数,释放锁
     45	if c.qcount < c.dataqsiz {
     46		qp := chanbuf(c, c.sendx)
     47		if raceenabled {
     48			racenotify(c, c.sendx, nil)
     49		}
     50		typedmemmove(c.elemtype, qp, ep)
     51		c.sendx++
     52		if c.sendx == c.dataqsiz {
     53			c.sendx = 0
     54		}
     55		c.qcount++
     56		unlock(&c.lock)
     57		return true
     58	}
     59
     60 	// 场景3:阻塞等待   
     61    // 非阻塞模式,直接返回false
     62	if !block {
     63		unlock(&c.lock)
     64		return false
     65	}
     66	// 阻塞模式,获取当前 goroutine,创建并初始化调度单元,加入 channel 的发送队列,挂起当前 goroutine
     67	gp := getg()
     68	mysg := acquireSudog()
     69	mysg.releasetime = 0
     70	if t0 != 0 {
     71		mysg.releasetime = -1
     72	}
     73	mysg.elem = ep
     74	mysg.waitlink = nil
     75	mysg.g = gp
     76	mysg.isSelect = false
     77	mysg.c = c
     78	gp.waiting = mysg
     79	gp.param = nil
     80	c.sendq.enqueue(mysg)
     81	gp.parkingOnChan.Store(true)
     82	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
     83	KeepAlive(ep)
     84
     85	// 唤醒后处理
     86	if mysg != gp.waiting {
     87		throw("G waiting list is corrupted")
     88	}
     89	gp.waiting = nil
     90	gp.activeStackChans = false
     91	closed := !mysg.success
     92	gp.param = nil
     93	if mysg.releasetime > 0 {
     94		blockevent(mysg.releasetime-t0, 2)
     95	}
     96	mysg.c = nil
     97	releaseSudog(mysg)
     98	if closed {
     99		if c.closed == 0 {
    100			throw("chansend: spurious wakeup")
    101		}
    102		panic(plainError("send on closed channel"))
    103	}
    104	return true
    105}
    

    发送流程总结:

    1. 检查 nil channel 和快速失败路径
    2. 加锁并检查 channel 状态
    3. 按优先级处理三种情况:
      • 有等待接收者 → 直接传递
      • 缓冲区有空间 → 缓冲数据
      • 需要阻塞等待 → 挂起 goroutine
    4. 唤醒后清理状态并返回结果
    接收
    1. 相同点
      • 都使用快速路径优化
      • 类似的加锁模式
      • 相同的阻塞/唤醒机制
      • 共享 sudog 数据结构
    2. 不同点
      • 发送 panic 关闭的 channel,接收返回零值
      • 发送关注 recvq,接收关注 sendq
      • 发送处理缓冲区满,接收处理缓冲区空

关闭

1close(ch)     // 关闭 channel
  • 只有发送者可以关闭 channel
  • 关闭一个已关闭的 channel 会导致 panic
  • 可以从已关闭的 channel 接收数据,直到 channel 为空
 1func closechan(c *hchan) {
 2	if c == nil {
 3		panic(plainError("close of nil channel"))
 4	}
 5
 6	lock(&c.lock)
 7    // 幂等性检查:重复关闭已关闭的 channel 会 panic
 8	if c.closed != 0 {
 9		unlock(&c.lock)
10		panic(plainError("close of closed channel"))
11	}
12
13	if raceenabled {
14		callerpc := getcallerpc()
15		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
16		racerelease(c.raceaddr())
17	}
18
19	c.closed = 1
20
21	var glist gList
22
23	// 处理接收队列 (recvq)
24	for {
25		sg := c.recvq.dequeue()
26		if sg == nil {
27			break
28		}
29		if sg.elem != nil {
30            // 清空接收数据:将接收者的 elem 置零(返回零值)
31			typedmemclr(c.elemtype, sg.elem)
32			sg.elem = nil
33		}
34		if sg.releasetime != 0 {
35			sg.releasetime = cputicks()
36		}
37		gp := sg.g
38		gp.param = unsafe.Pointer(sg)
39		sg.success = false
40		if raceenabled {
41			raceacquireg(gp, c.raceaddr())
42		}
43		glist.push(gp)
44	}
45
46	// 处理发送队列 (sendq)
47	for {
48		sg := c.sendq.dequeue()
49		if sg == nil {
50			break
51		}
52        // 清空发送数据:发送者的 elem 置 nil
53		sg.elem = nil
54		if sg.releasetime != 0 {
55			sg.releasetime = cputicks()
56		}
57		gp := sg.g
58		gp.param = unsafe.Pointer(sg)
59		sg.success = false
60		if raceenabled {
61			raceacquireg(gp, c.raceaddr())
62		}
63		glist.push(gp)
64	}
65	unlock(&c.lock)
66
67	// 批量唤醒 goroutine
68	for !glist.empty() {
69		gp := glist.pop()
70		gp.schedlink = 0
71		goready(gp, 3)
72	}
73}

关闭流程全景:

  1. 验证阶段
    • 检查 nil channel
    • 检查重复关闭
  2. 标记阶段
    • 设置关闭标志
    • 处理竞态检测
  3. 清理阶段
    • 处理所有等待接收者(返回零值)
    • 处理所有等待发送者(将 panic)
  4. 唤醒阶段
    • 释放 channel 锁
    • 批量唤醒所有等待的 goroutine

close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中分别保存了阻塞的发送者和接收者。关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有接收者的情况下,不能贸然关闭 channel。

close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。

唤醒之后,该干嘛干嘛。sender 会继续执行 chansend 函数里 goparkunlock 函数之后的代码,很不幸,检测到 channel 已经关闭了,panic。receiver 则比较幸运,进行一些扫尾工作后,返回。这里,selected 返回 true,而返回值 received 则要根据 channel 是否关闭,返回不同的值。如果 channel 关闭,received 为 false,否则为 true。这我们分析的这种情况下,received 返回 false。

从一个有缓冲的 channel 里读数据,当 channel 被关闭,仍然可以读出有效值,只有当返回的 received 为 false 时,读出的数据才是无效的。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

特性

  1. 线程安全:channel 操作是原子性的,不需要额外的锁
  2. 阻塞行为:无缓冲 channel 会阻塞直到发送和接收都准备好
  3. 先进先出:channel 中的数据按照发送顺序被接收
  4. 类型安全:channel 有明确的类型,只能发送和接收指定类型的数据

使用模式

同步通信

 1func worker(done chan bool) {
 2    fmt.Println("working...")
 3    time.Sleep(time.Second)
 4    fmt.Println("done")
 5    done <- true
 6}
 7
 8func main() {
 9    done := make(chan bool)
10    go worker(done)
11    <-done // 等待worker完成
12}

我经常用于监听信号量,用于优雅退出进程

生产者-消费者模式

 1func producer(ch chan<- int) {
 2    for i := 0; i < 10; i++ {
 3        ch <- i
 4    }
 5    close(ch)
 6}
 7
 8func consumer(ch <-chan int) {
 9    for num := range ch {
10        fmt.Println("Received:", num)
11    }
12}
13
14func main() {
15    ch := make(chan int, 5)
16    go producer(ch)
17    consumer(ch)
18}

多路复用

 1select {
 2case msg1 := <-ch1:
 3    fmt.Println("received", msg1)
 4case msg2 := <-ch2:
 5    fmt.Println("received", msg2)
 6case ch3 <- 3:
 7    fmt.Println("sent 3")
 8default:
 9    fmt.Println("no communication")
10}

超时控制

1select {
2case res := <-ch:
3    fmt.Println(res)
4case <-time.After(1 * time.Second):
5    fmt.Println("timeout")
6}

方向性

可以指定 channel 是只发送或只接收的:

1func ping(pings chan<- string, msg string) { // 只发送channel
2    pings <- msg
3}
4
5func pong(pings <-chan string, pongs chan<- string) { // 接收和发送channel
6    msg := <-pings
7    pongs <- msg
8}

注意事项

  1. 不要关闭接收端的 channel:这会导致 panic
  2. 避免向已关闭的 channel 发送数据:这会导致 panic
  3. nil channel:对 nil channel 的发送和接收会永久阻塞
  4. 资源泄漏:不使用的 goroutine 和 channel 可能导致内存泄漏
  5. 死锁:不正确的 channel 使用可能导致所有 goroutine 阻塞
操作nil channelclosed channelnot nil, not closed channel
closepanicpanic正常关闭
读 <- ch阻塞读到对应类型的零值阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞
写 ch <-阻塞panic阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞

发生 panic 的情况有三种:向一个关闭的 channel 进行写操作;关闭一个 nil 的 channel;重复关闭一个 channel。

读、写一个 nil channel 都会被阻塞。

性能考虑

  1. 无缓冲 channel 比有缓冲 channel 性能更高,因为减少了内存分配
  2. channel 操作比 mutex 更重,在简单场景下 mutex 可能更高效
  3. 大量小消息通过 channel 传递可能影响性能