并发过高的问题

1func main() {
2    userCount := math.MaxInt64
3    for i := 0; i < userCount; i++ {
4        go func(i int) {
5            fmt.Printf("go func: %d\n", i)
6            time.Sleep(time.Second)
7        }(i)
8    }
9}

对程序运行会发现系统抛出异常

panic: too many concurrent operations on a single file or socket (max 1048575)

异常信息中描述创建了太多的协程,超过了系统协程数支持的上限1048575,由此可以看出系统并不能无限创建协程。为什么是1048575了?这其实是对单个file/socket的并发操作个数超过了系统上限,这个是标准输出造成的,具体一点,就是文件句柄数量达到限制。删掉打印后,仍然会报错 errno 1455,即Out of Memory 错误。

  • 系统资源限制:协程创建也是需要消耗几kb的内存空间,虽然并不是很多,但是一个系统资源是有限的,如果一次性创建过多的协程但没有来得及执行完并回收这样就会造成内存大的大量消耗,影响程序系统的性能和稳定性。
  • 协程调度器:在go中协程是基于GMP模型运行,在模型GMP中G就是指协程,P是指调度器,M是指系统线程,当我们创建一个协程会先写入到协程的队列中由P调度分配给M执行,过程中调度器P会根据一定的策略来分配协程的执行时间和资源。如果同时创建大量的协程,会导致调度器的负担增加,从而影响系统的性能和稳定性。
  • 任务类型和执行时间:在我们的系统中会存在多种类型的任务,而任务在运行中需要的协程数和执行行时间是不同的。例如,I/O密集型任务需要较多的协程来处理,而CPU密集型任务需要较少的协程来处理。如果同时创建大量的协程,会导致系统的负载不均衡,从而影响系统的性能和稳定性。因此,需要根据任务类型和执行时间来控制协程数,以保持系统的负载均衡。

控制协程数量

信号量模式

带缓冲的通道

channel 理解为管道,协程可以向管道中发送数据和接收数据。如果管道中有数据,接收者就可以读取数据并进行处理;如果管道中没有数据,接收者就会阻塞等待,直到管道中有数据为止。同样的,发送者向管道中发送数据,如果管道已满,发送者就会阻塞等待,直到管道有空间为止。

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6	"time"
 7)
 8
 9func main() {
10	const maxGoroutines = 5 // 最大协程数
11	const totalTasks = 20   // 总任务数
12
13	sem := make(chan struct{}, maxGoroutines) // 信号量通道
14	var wg sync.WaitGroup
15
16	for i := 0; i < totalTasks; i++ {
17		wg.Add(1)
18		sem <- struct{}{} // 获取信号量,如果已满则阻塞
19
20		go func(taskID int) {
21			defer func() {
22				<-sem // 释放信号量
23				wg.Done()
24			}()
25
26			// 模拟任务处理
27			fmt.Printf("任务 %d 开始执行\n", taskID)
28			time.Sleep(time.Second)
29			fmt.Printf("任务 %d 完成\n", taskID)
30		}(i)
31	}
32
33	wg.Wait()
34	fmt.Println("所有任务完成")
35}
  • make(chan struct{}, n) 创建缓冲区大小为 n 的 channel,在没有被接收的情况下,至多发送 n 个消息则被阻塞。
  • 开启协程前,调用 ch <- struct{}{},若缓存区满,则阻塞。
  • 协程任务结束,调用 <-ch 释放缓冲区。

Semaphore

官方扩展包为我们提供了一个基于权重的信号量 Semaphore,并给出了示例

 1func Example_workerPool() {
 2	ctx := context.TODO()
 3
 4	var (
 5		maxWorkers = runtime.GOMAXPROCS(0)
 6		sem        = semaphore.NewWeighted(int64(maxWorkers))
 7		out        = make([]int, 32)
 8	)
 9
10	for i := range out {
11		if err := sem.Acquire(ctx, 1); err != nil {
12			log.Printf("Failed to acquire semaphore: %v", err)
13			break
14		}
15
16		go func(i int) {
17			defer sem.Release(1)
18			out[i] = collatzSteps(i + 1)
19		}(i)
20	}
21
22	if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
23		log.Printf("Failed to acquire semaphore: %v", err)
24	}
25
26	fmt.Println(out)
27}
  • Acquire 方法尝试获取权重为 n 的信号量资源,如果资源不足会阻塞,直到满足以下条件之一:
    1. 成功获取资源
    2. 上下文被取消
    3. 请求的资源数超过信号量总容量
  • 每次循环尝试获取 1 个信号量资源,获取成功后启动一个 goroutine,goroutine 完成后释放资源。
  • sem.Acquire(ctx, int64(maxWorkers))尝试获取所有资源(等于总容量),这回阻塞直到所有 worker goroutine 都完成并释放了资源

工作池模式

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6	"time"
 7)
 8
 9func worker(id int, jobs <-chan int, results chan<- int) {
10	for job := range jobs {
11		fmt.Printf("Worker %d 处理任务 %d\n", id, job)
12		time.Sleep(time.Second) // 模拟工作
13		results <- job * 2      // 返回结果
14	}
15}
16
17func main() {
18	const numJobs = 20
19	const numWorkers = 5 // 工作协程数量
20
21	jobs := make(chan int, numJobs)
22	results := make(chan int, numJobs)
23
24	// 创建工作池
25	for w := 1; w <= numWorkers; w++ {
26		go worker(w, jobs, results)
27	}
28
29	// 发送任务
30	for j := 1; j <= numJobs; j++ {
31		jobs <- j
32	}
33	close(jobs)
34
35	// 收集结果
36	for r := 1; r <= numJobs; r++ {
37		fmt.Printf("任务结果: %d\n", <-results)
38	}
39}
特性信号量模式工人池模式
协程生命周期按需创建,任务完成即退出预先创建,长期存在
并发控制通过计数信号量限制通过固定worker数量限制
实现复杂度简单相对复杂
适用场景任务执行时间不固定任务执行时间较短且均匀
资源利用率可能频繁创建/销毁协程协程复用,资源利用率高

协程池

ants

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/panjf2000/ants/v2"
 8)
 9
10func task(i interface{}) {
11	n := i.(int)
12	fmt.Printf("任务 %d 开始执行\n", n)
13	time.Sleep(time.Second)
14	fmt.Printf("任务 %d 完成\n", n)
15}
16
17func main() {
18	defer ants.Release()
19
20	pool, _ := ants.NewPool(5) // 创建容量为5的协程池
21
22	for i := 0; i < 20; i++ {
23		_ = pool.Submit(func() {
24			task(i)
25		})
26	}
27
28	// 等待所有任务完成
29	time.Sleep(3 * time.Second)
30	fmt.Println("所有任务完成")
31}
  • 调用 ants.NewPool()ants.NewPoolWithFunc() 创建协程池;
  • 调用 p.Submit()p.Invoke() 提交任务,流程参考上图;
  • 调用 p.Release() 安全关闭协程池,释放所有资源,并确保所有正在等待的调用者不会被永久阻塞。

调整系统资源的上限

ulimit

有些场景下,即使我们有效地限制了协程的并发数量,但是仍旧出现了某一类资源不足的问题,例如:

  • too many open files
  • out of memory

操作系统通常会限制同时打开文件数量、栈空间大小等,ulimit -a 可以看到系统当前的设置:

使用 ulimit -n 9999,将同时打开的文件句柄数量调整为 9999 来解决这个问题,其他的参数也可以按需调整。

虚拟内存

在内存不足时,将磁盘映射为内存使用,但是磁盘的 I/O 读写性能和内存条相差是非常大的,当应用程序长期高频度读写大量内存,那么虚拟内存对性能的影响就比较明显了。