前言

本文介绍 conc - 一个更好的 go 并发库, sourcegraph 在日常开发中使用go原生并发出现了问题,由此开发了 conc ,相比标准并发代码更优雅,代码更少,下面展示一个例子,可以看出代码减少了许多.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type propagatedPanic struct {
    val   any
    stack []byte
}
 
func main() {
    done := make(chan *propagatedPanic)
    go func() {
        defer func() {
            if v := recover(); v != nil {
                done <- &propagatedPanic{
                    val:   v,
                    stack: debug.Stack(),
                }
            } else {
                done <- nil
            }
        }()
        doSomethingThatMightPanic()
    }()
    if val := <-done; val != nil {
        panic(val)
    }
}

// conc

func main() {
    var wg conc.WaitGroup
    wg.Go(doSomethingThatMightPanic)
    // panics with a nice stacktrace
    wg.Wait()
}

安装

使用以下命令进行安装: go get github.com/sourcegraph/conc

接下来对 conc 如何使用进行介绍。

介绍

conc.WaitGroup

conc.WaitGroup 与标准库 sync.WaitGroup 的区别在于 conc.WaitGroup中子goroutine中的panic会被传递给Wait方法的调用方,避免去 recover goroutine的panic.

例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func main() {
	var count atomic.Int64

	var wg conc.WaitGroup
	for i := 1; i < 100; i++ {
		wg.Go(func() {
			count.Add(1)
		})
	}
	wg.Wait()

	fmt.Println(count.Load())
}

如果想要 recover 某个 goroutine 发生的panic,可以使用 WaitAndRecover 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func main() {
	var count atomic.Int64

	var wg conc.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Go(func() {
			if i == 7 {
				panic("bad thing")
			}
			count.Add(1)
		})
	}
	wg.WaitAndRecover()

	fmt.Println(count.Load())
}

goroutine 池

Pool 是用于处理并发任务的 goroutine 池,Pool中的goroutine是延迟启动的,所以创建一个新的Pool是廉价的。产生的 goroutine 永远不会多于提交的任务。池是高效的,但不是零成本。它不应该用于非常短的任务。启动和拆卸的开销约为 1μs,每个任务的开销约为 300ns。

例子:

1
2
3
4
5
6
7
8
9
func main() {
	p := pool.New().WithMaxGoroutines(3)
	for i := 0; i < 5; i++ {
		p.Go(func() {
			fmt.Println("conc")
		})
	}
	p.Wait()
}

使用WithContext可以创建一个传递Context的Pool,通过这个父Context来控制池中的goroutine。默认情况下,在取消父Context之前,Pool中的Context不会取消。如果需要在出现 panic 或错误时取消 context 可以通过配置 WithCancelOnError来实现,例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func main() {
	p := pool.New().
		WithMaxGoroutines(4).
		WithContext(context.Background()).
		WithCancelOnError()
	for i := 0; i < 3; i++ {
		i := i
		p.Go(func(ctx context.Context) error {
			if i == 2 {
				return errors.New("I will cancel all other tasks!")
			}
			<-ctx.Done()
			return nil
		})
	}
	err := p.Wait()
	fmt.Println(err)
}

ResultPool是一个执行返回泛型结果的任务池。使用Go()在池中执行任务,然后由Wait()返回任务的结果。例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
p := pool.NewWithResults[int]()
	for i := 0; i < 10; i++ {
		i := i
		p.Go(func() int {
			return i * 2
		})
	}
	res := p.Wait()
	// Result order is nondeterministic, so sort them first
	sort.Ints(res)
	fmt.Println(res)

Stream

Pool 执行任务返回的顺序是无序的,想要有序的结果可以使用 Stream 。要使用Stream,您需要提交一定数量的 Task,每个任务都会返回一个回调。每个任务都将在任务池中同时执行,并且回调将按照任务提交的顺序顺序执行。 任务提交完需使用 Wait() 方法等待任务执行完并传递 panic.

同Pool一样,Stream也不适用于非常短的任务。启动和拆卸会增加几微秒的开销,每个任务的开销大约是500ns。

例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func main() {
	times := []int{20, 52, 16, 45, 4, 80}

	stream := stream2.New()
	for _, millis := range times {
		dur := time.Duration(millis) * time.Millisecond
		stream.Go(func() stream2.Callback {
			time.Sleep(dur)
			// This will print in the order the tasks were submitted
			return func() { fmt.Println(dur) }
		})
	}
	stream.Wait()
}

小结

conc 相较于标准库的并发处理库,代码更加简洁,使用更加方便,是一套非常适合初学者的并发工具。

参考