12、Golang 教程 - channel

channel

  • 不要通过共享内存来通信,要通过通信来共享内存
  • channel是用来协程通信的
     
  • 错误做法

 package main

import "fmt"

func chanDemo() {

    //  var c chan int // chan  通道里面是int类型  c == nil
    c := make(chan int) // 这里定义没有缓存,所以接收到一个他就会阻塞的到这个值被取走

    c <- 1 // 把 1 送到c中
    c <- 2
    c <- 3
    n:= <-c
    //fatal error: all goroutines are asleep - deadlock!
    //这样因为没人其他的协程接这个通道的数据 所以报错 死锁
    fmt.Println(n)
}

func main() {

    chanDemo()
}
  • 开一个协程接收数据

func chanDemo() {

    c := make(chan int)
    go func() {

        for {

            n := <-c
            fmt.Println(n)
            //1
            //2
            //3

        }
    }()
    c <- 1 // 把 1 送到c中
    c <- 2
    c <- 3
    time.Sleep(time.Millisecond)
}

func main() {

    chanDemo()
}

// chan 作为参数
func worker(c chan int) {

    for {

        //n := <-c
        //fmt.Println(n)
        fmt.Printf("%c",<-c)

    }
}
func chanDemo() {

    var channels [10]chan int // 数组
    for i:=0;i<10;i++{

        channels[i] = make(chan int)
        go worker(channels[i])
    }

    for i:=0;i<10;i++{

        channels[i] <- 'a'+i
    }
    time.Sleep(time.Millisecond)
}

//func createWorker(id int) <-chan int { //这样写表示只用来发送数据 
func createWorker(id int) chan<- int {

      //这样写表示只用来接收数据 用于发送数据会报错
    c := make(chan int)
    go func() {

        for {

            fmt.Printf("worker %d receiver:%c\n", id, <-c)

        }
    }()
    return c
}
func chanDemo() {

    var channels [10]chan<- int // 数组
    for i := 0; i < 10; i++ {

        channels[i] = createWorker(i)
    }

    for i := 0; i < 10; i++ {

        channels[i] <- 'a' + i
    }
    time.Sleep(time.Millisecond)
}
/、
func bufferedChannel() {

    c := make(chan int, 3) //定义缓冲区
    //因为有大小为3的缓冲区 所以就算没有协程来接,传入数据不大于3个是不会报错的
    c <- 1
    c <- 2
    c <- 3
}

func main() {

    bufferedChannel()
    //chanDemo()
}

//
close(c) // 通道也可以关闭 通道关闭之后,另一边还是可以接收到数据,是这个类型的默认0值
    // 可以用两个数来接收 n, ok := <-c ok表示是否有值 然后用if ok 做判断
    // 也可以用for n:= range c 用range接收数据 等到c发完就跳出来
    // 如果c不close 会永远接收下去,等到main退出,他也就退出,chan可以不关闭
  • channel
  • buffered channel
  • range
  • 发送数据c`<-1
  • 接收数据n:=`<-c
  • chan<-int只能发数据给channel<-chan int只能从channel接收数据
  • close(c):关闭channel,发送方一旦关闭channel,接收方一直能接收到数据,但是收到的是channel中具体类型的零值
  • 只有发送方才能close channel,range会自动检查channel是否close
  • 如果发送方没有使用close方法,接收方进入循环后会阻塞,也就是说,一个chan如果有close方法,那就是非阻塞的,没有则是阻塞的

使用Channel等待任务结束

方法一

 // done用于接收,这样来替换  time.Sleep(time.Millisecond)
type workers struct {

    in   chan int
    done chan bool
}

func createWorker(id int, w workers) {

    go func() {

        for {

            fmt.Printf("worker %d receiver:%c\n", id, <-w.in)
            w.done <- true //打印完就送一个true给done

        }
    }()
}

func chanDemo() {

    var work [10]workers // 数组
    for i := 0; i < 10; i++ {

        work[i] = workers{

            in:   make(chan int),
            done: make(chan bool),
        }
        createWorker(i, work[i])
    }
    for i := 0; i < 10; i++ {

        work[i].in <- 'a' + i
        <- work[i].done // 如果done没有数据 这里会阻塞
    }

}

func main() {

    chanDemo()
}

缺点:顺序打印,不如不用

 worker 0 receiver:a
worker 1 receiver:b
worker 2 receiver:c
worker 3 receiver:d
worker 4 receiver:e
worker 5 receiver:f
worker 6 receiver:g
worker 7 receiver:h
worker 8 receiver:i
worker 9 receiver:j
  • 解决方式
 func chanDemo() {

    var work [10]workers // 数组
    for i := 0; i < 10; i++ {

        work[i] = workers{

            in:   make(chan int),
            done: make(chan bool),
        }
        createWorker(i, work[i])
    }
    for i := 0; i < 10; i++ {

        work[i].in <- 'a' + i
    }
    for _, worker := range work {

        <-worker.done
    }

}

通过waitGroup等待

 type workers struct {

    in   chan int
    wg *sync.WaitGroup //指针才能使用同一个wg
}

func createWorker(id int, wg *sync.WaitGroup) workers{

    w := workers{

        in : make(chan int),
        wg : wg,
    }
    go func() {

        for {

            fmt.Printf("worker %d receiver:%c\n", id, <-w.in)
            wg.Done()
        }
    }()
    return w
}

func chanDemo() {

    var wg sync.WaitGroup
    wg.Add(20) //20个任务
    var work [10]workers // 数组
    for i := 0; i < 10; i++ {

        work[i] = createWorker(i, &wg) //所有的要用同一个wg 所以传指针
    }
    for i := 0; i < 10; i++ {

        work[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {

        work[i].in <- 'A' + i
    }
    wg.Wait() //等待20个任务完成再往下执行
}

func main() {

    chanDemo()
}

使用Select来进行调度

 func main() {

    var c1, c2 chan int // nil 
    select {

      //哪一个条件满足就走哪个条件
    case n := <-c1:
        fmt.Println("received from c1:", n)
    case n := <-c2:
        fmt.Println("received from c2:", n)
    default:
        fmt.Println("no value") //这里 
    }
}
/
func generator() chan int {

    c := make(chan int)
    i := 0
    go func() {

        for {

            time.Sleep(time.Millisecond * time.Duration(rand.Intn(1500)))
            c <- i
            i++
        }
    }()
    return c
}

func main() {

    var c1, c2  = generator(), generator()
    for  {

    select {

      //哪一个条件满足就走哪个条件
    case n := <-c1:
        fmt.Println("received from c1:", n)
    case n := <-c2:
        fmt.Println("received from c2:", n)
    }

    }
}
//
func main() {

    var c1, c2 = generator(), generator()
    w := createWorker(0)
    var n int
    for {

        select {

      //哪一个条件满足就走哪个条件
        case n = <-c1:
            fmt.Println("received from c1:", n)
        case n = <-c2:
            fmt.Println("received from c2:", n)
        case w <- n: //也可以发 但是因为w不是nil 如果n没有值他会输出零值 这里就会无限循环走这个逻辑
        }
    }
}
///解决办法利用chan的nil值

func createWorker(id int) chan int {

    w := make(chan int)
    go func() {

        for {

            fmt.Printf("worker %d receiver:%d\n", id, <-w)
        }
    }()
    return w
}
func generator() chan int {

    c := make(chan int)
    i := 0
    go func() {

        for {

            time.Sleep(time.Millisecond * time.Duration(rand.Intn(1500)))
            c <- i
            i++
        }
    }()
    return c
}

func main() {

    var c1, c2 = generator(), generator()
    w := createWorker(0)
    var n int
    hasValue := false
    for {

        var activeWork chan int
        if hasValue {

            activeWork = w
        }
        select {

      //哪一个条件满足就走哪个条件
        case n = <-c1:
            hasValue = true
            fmt.Println("received from c1:", n)
        case n = <-c2:
            hasValue = true
            fmt.Println("received from c2:", n)
        case activeWork <- n: //这样的话如果hasValue为false 这里activeWork为nil这样就会被阻塞住
            hasValue = false // 但是这样数据会丢失因为接收和发送的速度不匹配,可以弄一个队列然后存到队列中去 然后hasValue的判断规则为队列长度是否大于0
        }
    }
}
//

func main() {

    var c1, c2 = generator(), generator()
    w := createWorker(0)
    var n int
    var values []int
    for {

        var activeWork chan int
        if len(values) > 0 {

            activeWork = w
        }
        select {

      //哪一个条件满足就走哪个条件
        case n = <-c1:
            values = append(values, n)
            fmt.Println("received from c1:", n)
        case n = <-c2:
            values = append(values, n)
            fmt.Println("received from c2:", n)
        case activeWork <- n: 
            values = values[1:] //弄一个队列然后存到队列中去 然后hasValue的判断规则为队列长度是否大于0
        }
    }
}

计时器的使用

 func main() {

    var c1, c2 = generator(), generator()
    w := createWorker(0)
    var n int
    var values []int
    tm := time.After(10 * time.Second) //10s过后,会往tm通道发送数据
    for {

        var activeWork chan int
        if len(values) > 0 {

            activeWork = w
        }
        select {

      //哪一个条件满足就走哪个条件
        case n = <-c1:
            values = append(values, n)
            fmt.Println("received from c1:", n)
        case n = <-c2:
            values = append(values, n)
            fmt.Println("received from c2:", n)
        case activeWork <- n:
            values = values[1:] //弄一个队列然后存到队列中去 然后hasValue的判断规则为队列长度是否大于0
        case <-tm:
            fmt.Println("bye") //10s过后bye
            return
        }
    }
}
  • Select的使用
  • 定时器的使用
  • 在Select中使用nil channel

传统的同步机制(尽量少用)

  • waitGroup
  • mutex
  • Cond
 type atomicInt struct {

    value int
    lock sync.Mutex
}
func (a *atomicInt) get() int{

    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}
func (a *atomicInt) increment() {

    a.lock.Lock()
    defer a.lock.Unlock() //defer的正确用法
    a.value++
}
func main() {

    var a atomicInt
    a.increment()
    go func() {

        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get())
}

// 如果要在一段代码端同步,那就变成一个匿名函数就好了
func (a *atomicInt) increment() {

    func() {

        a.lock.Lock()
        defer a.lock.Unlock() //defer的正确用法
        //............
        a.value++
        //.........
    }()
}