概念介绍

想必在Go的并发领域中都听说过一句话:

Don’t communicate by sharing memory, share memory by communicating.

解释就是:不要用共享内存的方式进行通信,而是用通信的方式共享内存
而这里的通信方式也就是go所提供的channel,channel 作为Go中特有的数据结构。和其他的并发原语不一样的是,可以直接使用,无需引用额外的包。
通常结合Go中著名的goroutine使用一起提供更加轻便的并发方案,同时也演进了很多并发模式

基本使用

在channel定义上可以分为只能接受、只能发送和既可以发送和接收这三种:

var xx chan struct{}        // 可以发送接收
var xx chan<- struct{}      // 只能接收
var xx <-chan struct{}      // 只能发送

箭头的匹配遵循最左结合规则
通过make可以初始化channel,当然和切片一样,可以声明其容量。默认则为0。因为区分为buffered channel和 unbuffered channel

ch1 := make(chan int)          //unbuffered channel
ch2 := make(chan int,123)      //buffered channel

常用操作包含了发送、接收、关闭等

ch1 <- 100   //发送数据
x <- ch1     //接收数据
<- ch1       //丢弃处理
close(ch1)   //关闭chan

以上都是一些基本使用,有兴趣或者详情的可以查阅官方文档即可,不是本次重点

底层理解

如果要想掌握channel的功能和特性,就需要了解本身的底层实现。

数据结构

首先看下基本数据结构:https://github.com/golang/go/blob/master/src/runtime/chan.go#L32

type hchan struct {
        qcount   uint           // 循环队列的元素大小 == len()
        dataqsiz uint           // 循环队列的大小 == cap()
        buf      unsafe.Pointer // 循环队列的指针
        elemsize uint16         // chan中元素大小和元素类型相关
        closed   uint32         // 是否close
        elemtype *_type         // chan中的元素类型
        sendx    uint           // send 在buf中的索引
        recvx    uint           // recv 在buf中的索引
        recvq    waitq          // receiver 等待队列
        sendq    waitq          // send 等待队列

        // lock protects all fields in hchan, as well as several
        // fields in sudogs blocked on this channel.
        //
        // Do not change another G's status while holding this lock
        // (in particular, do not ready a G), as this can deadlock
        // with stack shrinking.
        lock mutex               //互斥锁,并发基础,资源保护
}

type _type struct {
   size       uintptr
   ptrdata    uintptr // size of memory prefix holding all pointers
   hash       uint32
   tflag      tflag
   align      uint8
   fieldAlign uint8
   kind       uint8
   // function for comparing objects of this type
   // (ptr to object A, ptr to object B) -> ==?
   equal func(unsafe.Pointer, unsafe.Pointer) bool
   // gcdata stores the GC type data for the garbage collector.
   // If the KindGCProg bit is set in kind, gcdata is a GC program.
   // Otherwise it is a ptrmask bitmap. See mbitmap.go for details.
   gcdata    *byte
   str       nameOff
   ptrToThis typeOff
}

type waitq struct {
   first *sudog
   last  *sudog
}

初始化
func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   if hchanSize%maxAlign != 0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

上面的整体初始化流程可以概括为:

  1. channel初始化size检查
  2. 声明的是unbuffered channel,不创建buf。见 c.buf = c.raceaddr()
  3. 声明的不是指针类型,则分配一段连续的内存和buf给到chan
  4. 声明的为指针类型,单独申请buf:c.buf = mallocgc(mem, elem, true)
  5. 初始化结构参数

数据Send

这里以数据发送为例,最终编译时会调用chansend1: https://github.com/golang/go/blob/master/src/runtime/chan.go#L158
我们解析下期中的逻辑

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
      if !block {
        return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
    }
    ......
    if !block && c.closed == 0 && full(c) { return false }
}

判断chan是否为nil,如果是的话,通过gopark将调用者的groutine阻塞并休眠

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
   (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
   return false
}

如果本身chan没有close,并且已满时,如果不想阻塞(取决于block)当前的调用直接返回

lock(&c.lock)

if c.closed != 0 {
   unlock(&c.lock)
   panic(plainError("send on closed channel"))
}

这里是值得注意的一部分,经常会遇到这样的场景:如果chan已经被close了,再往里面发数据就会panic

if sg := c.recvq.dequeue(); sg != nil {
   // Found a waiting receiver. We pass the value we want to send
   // directly to the receiver, bypassing the channel buffer (if any).
   send(c, sg, ep, func() { unlock(&c.lock) }, 3)
   return true
}

如果等待的队列有等待的recevier,那么将数据交给他进行处理,这样就可以不用再放入buf中,减少操作开支
具体数据处理参考:memmove(dst, src, t.size)

if c.qcount < c.dataqsiz {
   // Space is available in the channel buffer. Enqueue the element to send.
   qp := chanbuf(c, c.sendx)
   if raceenabled {
      raceacquire(qp)
      racerelease(qp)
   }
   typedmemmove(c.elemtype, qp, ep)
   c.sendx++
   if c.sendx == c.dataqsiz {
      c.sendx = 0
   }
   c.qcount++
   unlock(&c.lock)
   return true
}

到这里就是常见的发送数据场景:当前没有receiver,这时数据押入循环队列,返回成功

if !block {
   unlock(&c.lock)
   return false
}
...

最后就是处理buf满的情况,这时的goroutine就会加入到发送等待队列中,直到下次唤醒。

数据Receive

https://github.com/golang/go/blob/master/src/runtime/chan.go#L454

Chan Close

https://github.com/golang/go/blob/master/src/runtime/chan.go#L355

避坑指南

问题Case

无论是在开源的知名项目还是日常开发中,我们在引入channle这个新的概念都会遇到不少问题,最常见的无非就是panic和goroutine泄露
经过上面的底层结果分析,不难发现,panic的情况总结可以为三种

  • Close 为nil的channel
  • Close 已经close的channel
  • Send 已经被close的channel

主要原因也说的比较清楚:创建了unbuffered chan:applyConfChan,这个gorountine中只有一个receiver,但是send是在业务逻辑的一个loop(循环)中,这样子gorountine就会block因为没有接收和发送同步。
如果要解决的话也很简单:修改发送逻辑,业务循环处理完成后再send即可

总结建议

channel在不同状态时我们都需要注意,具体总结如下

nil empty Buf full Not empty&full closed
send block ok block ok
receiver block block ok ok
close panic block ok&保留已有数据 ok&保留已有数据

实践运用

综合channel的基本特性和原理,通常我们可以总结为如下的场景:
1-发布订阅
最典型的应该就是生产消费,是可以当做内部消息队列的。而其设计的也是多生产者和多消费者模型
2-数据传递
作为消息的通信渠道,在通信上可以将数据进行处理传递
3-信号处理
其本身也是基于生产和发布的模式再结合chan的特性,可以很好的实现基于信号的处理
4-锁
基于基本结构实现的互斥,因此也可以实现互斥锁的机制

数据传递

这个场景其实比较常见,举例来说就是:有4个goroutine,可以为其编号。如果需要定时顺序打印各自编号,这个就是个数据传递的过程。
应用到现有的设计模式就是责任链模式,在很多上下文处理都有所使用
那怎么实现这个过程,其实可以定义变量令牌作为标识,哪个goroutine拿到令牌就做任务处理,这里我们只做打印

//定义令牌
type token struct{}

func newWorker(id int, ch chan token, nextCh chan token) {
   for {
      // 取得令牌
      token := <-ch 
      // 打印处理,这里可以针对token做一系列的业务处理,最终再下发token
      fmt.Println("data = ", id)
      time.Sleep(time.Second)
      nextCh <- token
   }
}

func TestChannel(t *testing.T) {
   chs := []chan token{make(chan token), make(chan token), make(chan token), make(chan token)}

   // 创建n个worker goroutine
   for i := 0; i < 4; i++ {
      go newWorker(i, chs[i], chs[(i+1)%4])
   }

   //令牌开始传递,不用指定顺序,这里从0开始
   chs[0] <- struct{}{}

   //常驻执行
   select {}
}

=== RUN   TestChannel
data =  0
data =  1
data =  2
data =  3
data =  0
data =  1
...

这样的场景需要注意的是,当前的goroutine只需要关注自己的token即可,内部的处理也会是基于接收到的token进行处理。处理完成之后进行传递即可

信号处理

想象一下传统的notify/wait的场景,其实利用channnel本身的特性也可以实现。主要就是基于chan为空,那么receiver就会进行阻塞等待,当有消息来时或者关闭chan会进行响应处理这个特性
举例来说就是通常我们需要在程序退出之前执行一些连接关闭、文件close等操作。实际上很多框架的优雅退出都是基于此来实现的
可以先看一个最简单的信号通知的case:

func TestQuit(t *testing.T) {
   g := make(chan int)
   quit := make(chan struct{})

   go func() {
      for {
         select {
         case v := <-g:
            t.Log(v)
         case <-quit:
            t.Log("service 退出")
            return
         }
      }
   }()

   for i := 0; i < 3; i++ {
      g <- i
   }
   quit <- struct{}{}
   t.Log("TestQuit 退出")
}

而在实现程序的优雅退出时,我们需要考虑这样的情况:
程序在关闭时会有一些流程需要处理,这些流程中间都会花费一些时间,比如说上面的的链接关闭,文件关闭等。这个时候程序是处理一个关闭中的状态
而这个关闭的过程如果太久肯定是不可接受的,因此需要设置超时时间,到达这个时间,可以直接退出。
程序完全退出时,这个时候才是真正的closed
因此针对以上考虑,我们可以设计优雅关闭的流程如下:

func TestGracefulDown(t *testing.T) {
   var closing = make(chan struct{})
   var closed = make(chan struct{})

   go func() {
      // 模拟业务处理
      for {
         select {
         case <-closing:
            return
         default:
            // 业务处理流程
            time.Sleep(100 * time.Millisecond)
         }
      }
   }()

   // 处理CTRL+C等中断信号
   termChan := make(chan os.Signal)
   signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
   <-termChan

   close(closing)
   // 执行退出之前的清理动作
   go doCleanup(closed)

   select {
   case <-closed:
   case <-time.After(time.Second):
      t.Log("清理超时,不等了")
   }
   t.Log("graceful log out...")
}

func doCleanup(closed chan struct{}) {
   //中间处理执行一分钟
   time.Sleep(1 * time.Minute)
   close(closed)
}

锁实现

在底层结构的分析中就可以看到,chan的内部就有互斥锁的保护。因此是可以保证只有当数据进入队列后(不一定指进入buf中)才可以被消费
最常见基于channel的实现主要有两种,这两种都是基于chan的特性决定的

  • 初始化一个buffer为1的channel。上面也说了内部可以保证只有进入channel的数据才可以被receive,因此获得这个元素的就等价于获得了锁
  • 初始化一个buffer为1的channel。在发送给一个full chan的时候会block这个前提下,可以知道可以发送元素到这个chan的就代表了获得了锁

通常锁场景是需要重试和超时的,而chan结合select可以很好的实现这些特性
两种方式其实都大同小异,这里以第一个场景为例

// 定义互斥锁
type Mutex struct {
   ch chan struct{}
}

// 初始化
func NewMutex() *Mutex {
   // buffer为1
   mu := &Mutex{make(chan struct{}, 1)}
   // 放入元素
   mu.ch <- struct{}{}
   return mu
}

// 请求锁,直到获取到
func (m *Mutex) Lock() {
   <-m.ch
}

// 释放锁
func (m *Mutex) Unlock() {
   select {
   case m.ch <- struct{}{}:
   default:
      panic("unlock of unlocked mutex")
   }
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
   select {
   case <-m.ch:
      return true
   default:
   }
   return false
}

// 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
   timer := time.NewTimer(timeout)
   select {
   case <-m.ch:
      timer.Stop()
      return true
   case <-timer.C:
   }
   return false
}

// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
   return len(m.ch) == 0
}

func TestLock(t *testing.T) {
   m := NewMutex()
   ok := m.TryLock()
   t.Logf("locked %v\n", ok)
   ok = m.TryLock()
   t.Logf("locked %v\n", ok)
}

整个流程就是:

  1. 初始化锁的时候,先将一个元素推入chan,
  2. 之后成功获取这个元素的就代表了获得这个锁。
  3. 完成操作后再将这个元素推入chan即可,推入的过程就是释放锁。
    因为元素在没有再次推入chan之前是不会有任何goroutine可以拿到这个元素的。这也就实现了锁的互斥