Go并发实践-Channel
概念介绍
想必在Go的并发领域中都听说过一句话:
Don’t communicate by sharing memory, share memory by communicating.
解释就是:不要用共享内存的方式进行通信,而是用通信的方式共享内存
而这里的通信方式也就是go所提供的channel,channel 作为Go中特有的数据结构。和其他的并发原语不一样的是,可以直接使用,无需引用额外的包。
通常结合Go中著名的goroutine使用一起提供更加轻便的并发方案,同时也演进了很多并发模式
基本使用
在channel定义上可以分为只能接受、只能发送和既可以发送和接收这三种:
var xx chan struct{} // 可以发送接收
var xx chan<- struct{} // 只能接收
var xx <-chan struct{} // 只能发送
箭头的匹配遵循最左结合规则
通过make可以初始化channel,当然和切片一样,可以声明其容量。默认则为0。因为区分为buffered channel和 unbuffered channel
ch1 := make(chan int) //unbuffered channel
ch2 := make(chan int,123) //buffered channel
常用操作包含了发送、接收、关闭等
ch1 <- 100 //发送数据
x <- ch1 //接收数据
<- ch1 //丢弃处理
close(ch1) //关闭chan
以上都是一些基本使用,有兴趣或者详情的可以查阅官方文档即可,不是本次重点
底层理解
如果要想掌握channel的功能和特性,就需要了解本身的底层实现。
数据结构
首先看下基本数据结构:https://github.com/golang/go/blob/master/src/runtime/chan.go#L32
type hchan struct {
qcount uint // 循环队列的元素大小 == len()
dataqsiz uint // 循环队列的大小 == cap()
buf unsafe.Pointer // 循环队列的指针
elemsize uint16 // chan中元素大小和元素类型相关
closed uint32 // 是否close
elemtype *_type // chan中的元素类型
sendx uint // send 在buf中的索引
recvx uint // recv 在buf中的索引
recvq waitq // receiver 等待队列
sendq waitq // send 等待队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥锁,并发基础,资源保护
}
type _type struct {
size uintptr
ptrdata uintptr // size of memory prefix holding all pointers
hash uint32
tflag tflag
align uint8
fieldAlign uint8
kind uint8
// function for comparing objects of this type
// (ptr to object A, ptr to object B) -> ==?
equal func(unsafe.Pointer, unsafe.Pointer) bool
// gcdata stores the GC type data for the garbage collector.
// If the KindGCProg bit is set in kind, gcdata is a GC program.
// Otherwise it is a ptrmask bitmap. See mbitmap.go for details.
gcdata *byte
str nameOff
ptrToThis typeOff
}
type waitq struct {
first *sudog
last *sudog
}
初始化
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
上面的整体初始化流程可以概括为:
- channel初始化size检查
- 声明的是unbuffered channel,不创建buf。见 c.buf = c.raceaddr()
- 声明的不是指针类型,则分配一段连续的内存和buf给到chan
- 声明的为指针类型,单独申请buf:c.buf = mallocgc(mem, elem, true)
- 初始化结构参数
数据Send
这里以数据发送为例,最终编译时会调用chansend1: https://github.com/golang/go/blob/master/src/runtime/chan.go#L158
我们解析下期中的逻辑
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
......
if !block && c.closed == 0 && full(c) { return false }
}
判断chan是否为nil,如果是的话,通过gopark将调用者的groutine阻塞并休眠
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
如果本身chan没有close,并且已满时,如果不想阻塞(取决于block)当前的调用直接返回
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
这里是值得注意的一部分,经常会遇到这样的场景:如果chan已经被close了,再往里面发数据就会panic
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
如果等待的队列有等待的recevier,那么将数据交给他进行处理,这样就可以不用再放入buf中,减少操作开支
具体数据处理参考:memmove(dst, src, t.size)
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
到这里就是常见的发送数据场景:当前没有receiver,这时数据押入循环队列,返回成功
if !block {
unlock(&c.lock)
return false
}
...
最后就是处理buf满的情况,这时的goroutine就会加入到发送等待队列中,直到下次唤醒。
数据Receive
https://github.com/golang/go/blob/master/src/runtime/chan.go#L454
Chan Close
https://github.com/golang/go/blob/master/src/runtime/chan.go#L355
避坑指南
问题Case
无论是在开源的知名项目还是日常开发中,我们在引入channle这个新的概念都会遇到不少问题,最常见的无非就是panic和goroutine泄露
经过上面的底层结果分析,不难发现,panic的情况总结可以为三种
- Close 为nil的channel
- Close 已经close的channel
- Send 已经被close的channel

主要原因也说的比较清楚:创建了unbuffered chan:applyConfChan,这个gorountine中只有一个receiver,但是send是在业务逻辑的一个loop(循环)中,这样子gorountine就会block因为没有接收和发送同步。
如果要解决的话也很简单:修改发送逻辑,业务循环处理完成后再send即可
总结建议
channel在不同状态时我们都需要注意,具体总结如下
nil | empty | Buf full | Not empty&full | closed |
---|---|---|---|---|
send | block | ok | block | ok |
receiver | block | block | ok | ok |
close | panic | block | ok&保留已有数据 | ok&保留已有数据 |
实践运用
综合channel的基本特性和原理,通常我们可以总结为如下的场景:
1-发布订阅
最典型的应该就是生产消费,是可以当做内部消息队列的。而其设计的也是多生产者和多消费者模型
2-数据传递
作为消息的通信渠道,在通信上可以将数据进行处理传递
3-信号处理
其本身也是基于生产和发布的模式再结合chan的特性,可以很好的实现基于信号的处理
4-锁
基于基本结构实现的互斥,因此也可以实现互斥锁的机制
数据传递
这个场景其实比较常见,举例来说就是:有4个goroutine,可以为其编号。如果需要定时顺序打印各自编号,这个就是个数据传递的过程。
应用到现有的设计模式就是责任链模式,在很多上下文处理都有所使用
那怎么实现这个过程,其实可以定义变量令牌作为标识,哪个goroutine拿到令牌就做任务处理,这里我们只做打印
//定义令牌
type token struct{}
func newWorker(id int, ch chan token, nextCh chan token) {
for {
// 取得令牌
token := <-ch
// 打印处理,这里可以针对token做一系列的业务处理,最终再下发token
fmt.Println("data = ", id)
time.Sleep(time.Second)
nextCh <- token
}
}
func TestChannel(t *testing.T) {
chs := []chan token{make(chan token), make(chan token), make(chan token), make(chan token)}
// 创建n个worker goroutine
for i := 0; i < 4; i++ {
go newWorker(i, chs[i], chs[(i+1)%4])
}
//令牌开始传递,不用指定顺序,这里从0开始
chs[0] <- struct{}{}
//常驻执行
select {}
}
=== RUN TestChannel
data = 0
data = 1
data = 2
data = 3
data = 0
data = 1
...
这样的场景需要注意的是,当前的goroutine只需要关注自己的token即可,内部的处理也会是基于接收到的token进行处理。处理完成之后进行传递即可
信号处理
想象一下传统的notify/wait的场景,其实利用channnel本身的特性也可以实现。主要就是基于chan为空,那么receiver就会进行阻塞等待,当有消息来时或者关闭chan会进行响应处理这个特性
举例来说就是通常我们需要在程序退出之前执行一些连接关闭、文件close等操作。实际上很多框架的优雅退出都是基于此来实现的
可以先看一个最简单的信号通知的case:
func TestQuit(t *testing.T) {
g := make(chan int)
quit := make(chan struct{})
go func() {
for {
select {
case v := <-g:
t.Log(v)
case <-quit:
t.Log("service 退出")
return
}
}
}()
for i := 0; i < 3; i++ {
g <- i
}
quit <- struct{}{}
t.Log("TestQuit 退出")
}
而在实现程序的优雅退出时,我们需要考虑这样的情况:
程序在关闭时会有一些流程需要处理,这些流程中间都会花费一些时间,比如说上面的的链接关闭,文件关闭等。这个时候程序是处理一个关闭中的状态
而这个关闭的过程如果太久肯定是不可接受的,因此需要设置超时时间,到达这个时间,可以直接退出。
程序完全退出时,这个时候才是真正的closed
因此针对以上考虑,我们可以设计优雅关闭的流程如下:
func TestGracefulDown(t *testing.T) {
var closing = make(chan struct{})
var closed = make(chan struct{})
go func() {
// 模拟业务处理
for {
select {
case <-closing:
return
default:
// 业务处理流程
time.Sleep(100 * time.Millisecond)
}
}
}()
// 处理CTRL+C等中断信号
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
close(closing)
// 执行退出之前的清理动作
go doCleanup(closed)
select {
case <-closed:
case <-time.After(time.Second):
t.Log("清理超时,不等了")
}
t.Log("graceful log out...")
}
func doCleanup(closed chan struct{}) {
//中间处理执行一分钟
time.Sleep(1 * time.Minute)
close(closed)
}
锁实现
在底层结构的分析中就可以看到,chan的内部就有互斥锁的保护。因此是可以保证只有当数据进入队列后(不一定指进入buf中)才可以被消费
最常见基于channel的实现主要有两种,这两种都是基于chan的特性决定的
- 初始化一个buffer为1的channel。上面也说了内部可以保证只有进入channel的数据才可以被receive,因此获得这个元素的就等价于获得了锁
- 初始化一个buffer为1的channel。在发送给一个full chan的时候会block这个前提下,可以知道可以发送元素到这个chan的就代表了获得了锁
通常锁场景是需要重试和超时的,而chan结合select可以很好的实现这些特性
两种方式其实都大同小异,这里以第一个场景为例
// 定义互斥锁
type Mutex struct {
ch chan struct{}
}
// 初始化
func NewMutex() *Mutex {
// buffer为1
mu := &Mutex{make(chan struct{}, 1)}
// 放入元素
mu.ch <- struct{}{}
return mu
}
// 请求锁,直到获取到
func (m *Mutex) Lock() {
<-m.ch
}
// 释放锁
func (m *Mutex) Unlock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 尝试获取锁
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
// 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
timer.Stop()
return true
case <-timer.C:
}
return false
}
// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
func TestLock(t *testing.T) {
m := NewMutex()
ok := m.TryLock()
t.Logf("locked %v\n", ok)
ok = m.TryLock()
t.Logf("locked %v\n", ok)
}
整个流程就是:
- 初始化锁的时候,先将一个元素推入chan,
- 之后成功获取这个元素的就代表了获得这个锁。
- 完成操作后再将这个元素推入chan即可,推入的过程就是释放锁。
因为元素在没有再次推入chan之前是不会有任何goroutine可以拿到这个元素的。这也就实现了锁的互斥