Golang 并发编程

并发基础

并发主流的实现模型

  • 多进程 :在操作系统层面进行并发的基本模式,同时也是开销最大的模式
  • 多线程 : 在大部分操作系统上都属于系统层面的并发模式,使用最多的最有效的一种模式。它比多进程的开销小很多,但是开销依旧比较大,且在高并发模式下,效率会有影响
  • 基于回调的非阻塞/异步IO : 通过事件驱动的方式使用异步IO,使服务器持续运转,且尽可能地少用线程,降低开销,在Node.js中得到很好的实践。但这种模式,编程比多线程要复杂
  • 协程(coroutine) :寄存于线程中,系统开销极小,可以有效提高线程的任务并发性。优点是编程简单,结构清晰;缺点是需要语言支持。目前原生支持协程的语言还很少

协程

操作系统自己掌管的进程(process)、进程内的线程(thread)以及进程内的协程(coroutine,也叫轻量级线程)。

goroutine

Go语言在支持轻量级线程叫goroutine,goroutine是Go语言中的轻量级线程实现,由Go运行时(runtime)管理

并发通信

package main

import (
    "fmt"
    "sync"
    "runtime"
)

var counter int = 0

func Count(lock *sync.Mutex) {
    lock.Lock() // 加锁
    counter++
    fmt.Println(lock)
    lock.Unlock() // 解锁
}

func main() {
    lock := &sync.Mutex{}

    // 10个goroutine是并发执行的
    for i := 0; i < 10; i++ {
        go Count(lock) // goroutine
    }

    // 用for循环来不断检查counter的值(同样需要加锁)
    for {
        lock.Lock()

        c := counter

        lock.Unlock()

        runtime.Gosched()

        if c >= 10 {
            break // 当其值达到10时,说明所有goroutine都执行完 毕了,这时主函数返回,程序退出
        }
    }

}

不要通过共享内存来通信,而应该通过通信来共享内存

channel

Go语言在语言级别提供的goroutine间的通信方式。channel是类型相关的,一个channel只能传递一种类型的值,这个类型需要在声明channel时指定

package main

import "fmt"

func Count(ch chan int) {
    ch <- 1 // 向对应的channel中写入一个数据
    fmt.Println("Counting")
}

func main() {
    chs := make([]chan int, 10)
    for i := 0; i < 10; i++ {
        chs[i] = make(chan int)
        go Count(chs[i])
    }

    for _, ch := range(chs) {
        <-ch // 读取10个channel数据\
    }
}
基本语法
  1. 声明形式
    var chanName chan ElementType
    与一般的变量声明不同的地方仅仅是在类型之前加了chan关键字。ElementType指定元素类型。例如声明一个类型为int的channel: var ch chan int
    或者声明一个map,元素是bool型的channel:
    var m map[string] chan bool
  2. 定义
    直接使用内置的函数make()即可:
    ch := make(chan int) // 声明并初始化一个int类型名为ch的 channel var ch chan int
  3. 写入
    ch <- value
    向channel写入数据通常会导致程序阻塞,直到有其他goroutine从这个channel中读取数据
  4. 读出
    value := <-ch
    如果channel之前没有写入数据,读取数据会导致阻塞,直到channel中被写入数据为止
select

监控一系列的文件句柄,一旦其中一个文件句柄发生了IO动作,该select()调用就会被返回。Go 语言直接在语言级别支持select关键字,用于处理异步IO 问题

select {
    case <-chan1:
    // 如果chan1成功读到数据,则进行该case处理语句 
    case chan2 <- 1:
    // 如果成功向chan2写入数据,则进行该case处理语句
    default:
    // 如果上面都没有成功,则进入default处理流程 
}
缓冲机制

需要持续传输大量数据的场景,带上缓冲从而达到消息队列的效果
创建一个带缓冲的channel,将缓冲区大小作为第二个参数传入即可,写入方也可以一直往channel里写入,在缓冲区被 填完之前都不会阻塞
c := make(chan int, 1024)

超时机制
// 首先,我们实现并执行一个匿名的超时等待函数 
timeout := make(chan bool, 1)
go func() {
    time.Sleep(1e9) // 等待1秒钟
    timeout <- true 
}()
// 然后我们把timeout这个channel利用起来 
select {
    case <-ch:
    // 从ch中读取到数据
    case <-timeout:
    // 一直没有从ch中读取到数据,但从timeout中读取到了数据
}
channel的传递

首先限定基本的数据结构:

type PipeData struct { 
    value int
    handler func(int) int
    next chan int 
}

然后写一个常规的处理函数。只要定义一系列PipeData的数据结构并一起传递给这个函数,就可以达到流式处理数据的目的:

func handle(queue chan *PipeData) { 
    for data := range queue {
        data.next <- data.handler(data.value)
    }
}
单向channel
  1. 单向channel变量的声明
    var ch1 chan int // ch1是一个正常的channel,不是单向的
    var ch2 chan<- float64// ch2是单向channel,只用于写float64数据
    var ch3 <-chan int // ch3是单向channel,只用于读取int数据
    
  2. 初始化
    // 基于ch4,通过类型转换初始化了两个单向channel:单向读的ch5和单向写的ch6
    ch4 := make(chan int)  
    ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel 
    ch6 := chan<- int(ch4) // ch6 是一个单向的写入channel
    
    关闭channel
    直接使用Go语言内置的close()函数即可:
    close(ch)
    判断一个channel是否已经被关 闭?可以在读取的时候使用多重返回值的方式
    x, ok := <-ch
    用法与map中的按键获取value的过程比较类似,只需要看第二个bool返回值即可,如果返回值是false则表示ch已经被关闭

多核并行化

type Vector []float64

// 分配给每个CPU的计算任务
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
    for i := 0; i < n; i++ {
        v[i] += u.Op(v[i])
    }
    c <- 1 //发信号告诉任务管理者我已经计算完成了
}

const NCPU = 16

func (v Vector) DoAll(u Vector) {
    c := make(chan int, NCPU) // 用于接收每个CPU的任务完成信号

    for i := 0; i < NCPU; i++ {
        go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c)
    }

    // 等待所有CPU的任务完成
    for i := 0; i < NCPU; i++ {
        <- c // 获取到一个数据,表示一个CPU计算完成了 }
    }
    // 到这里表示所有计算已经结束
}

通过设置环境变量GOMAXPROCS的值来控制使用多少个CPU核心,或者在代码中启动goroutine之前先调用runtime.GOMAXPROCS(16)来设置使用16CPU核数
最大化地利用服务器的多核计算能力方法可以利用runtime包中函数NumCPU()来获取核心数

出让时间片

使用runtime包中的Gosched()函数实现

同步

1、同步锁
sync包提供了两种锁类型:sync.Mutex和sync.RWMutex
sync.RWMutex在读锁占用的情 况下,会阻止写,但不阻止读,也就是多个goroutine可同时获取读锁(调用RLock()方法;而写锁(调用Lock()方法)会阻止任何其他goroutine(无论读和写)进来,整个锁相当于由该goroutine独占
对于两种所类型,任何一个Lock()Rlock()均需保证对应有Unlock()RUnlock()调用与之对应。否则可能导致等待该锁的所有goroutine处于饥饿状态,甚至可能导致死锁

var l sync.Mutex
func foo() {
    l.Lock()
    defer l.Unlock()
}

2、 全局唯一性操作
对于从全局的角度只需要运行一次的代码,比如全局初始化操作,Go语言提供了一个once类型来保证全局的唯一性操作

package main

import "sync"

var a string
var once sync.Once

func setup() {
    a = "hello, world"
}

func doprint() {
    once.Do(setup)
    print(a)
}

func twoprint() {
    go doprint()
    go doprint()
}

func main() {
    twoprint()
}

once的Do()方法可以保证在全局范围内只调用指定的函数一次(这里指 setup()函数),而且所有其他goroutine在调用到此语句时,将会先被阻塞,直至全局唯一的once.Do()调用结束后才继续