Go基础(并发)
package main
import (
"fmt"
"runtime"
"strconv"
"sync"
"time"
)
// sync包中的WaitGroup实现了一个类似任务队列的结构,你可以向队列中加入任务,
// 任务完成后就把任务从队列中移除,如果队列中的任务没有全部完成,队列就会触发阻塞以阻止程序继续运行
var wg sync.WaitGroup // 实现goroutine的同步
var lock sync.Mutex // 互斥锁
var rwlock sync.RWMutex // 读写互斥锁
// 1. 启动单个 goroutine
func hello() {
fmt.Println("Hello!")
}
func f1() {
go hello()
fmt.Println("Hello Goroutine Down!")
time.Sleep(time.Second)
/*
// 注意顺序
Hello Goroutine Down!
Hello!
*/
}
// 2. 启动多个 goroutine
func hello1(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello1 Goroutine: ", i)
}
func f2() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello1(i)
/*
Hello1 Goroutine: 0
Hello1 Goroutine: 9
Hello1 Goroutine: 4
Hello1 Goroutine: 2
Hello1 Goroutine: 5
Hello1 Goroutine: 6
Hello1 Goroutine: 1
Hello1 Goroutine: 8
Hello1 Goroutine: 7
Hello1 Goroutine: 3
*/
}
wg.Wait() // 等待所有登记的goroutine都结束
}
// 3. GOMAXPROCS
/*
Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。
一个操作系统线程对应用户态多个goroutine。
go程序可以同时使用多个操作系统线程。
goroutine和OS线程是多对多的关系,即m:n。
*/
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func f3() {
v := runtime.GOMAXPROCS(2) // 设置 1 顺序输出; 2 随机交替输出
fmt.Println("默认机器核心数是:", v)
go a()
go b()
time.Sleep(time.Second)
}
// 4. 通道 Go 提倡通过通信共享内存而不是通过共享内存而实现通信
// var ch chan int
// make(chan 元素类型, [缓冲大小])
// 无缓冲区通道,同步通道
func syncChannel(c chan int) {
ret := <-c
fmt.Println("通道取出值:", ret)
}
func f4() {
ch := make(chan int)
go syncChannel(ch) // 启用goroutine从通道接收值
ch <- 10
fmt.Println("通道发送值成功!")
}
// 5 有缓冲区的通道
func f5() {
ch := make(chan int, 2) // 创建一个容量位2的 有缓冲区通道
ch <- 10
fmt.Printf("ch 的长度:%d,ch 的容量:%d\n", len(ch), cap(ch)) // ch 的长度:1,ch 的容量:2
}
// 6. 从通道循环取值
func f6() {
ch1 := make(chan int)
ch2 := make(chan int)
// 给 ch2 赋值
go func() {
for i := 0; i < 3; i++ {
ch1 <- i
}
close(ch1)
}()
go func() {
// 从通道取值的第一种方法
for {
i, ok := <-ch1
if !ok { // 判断通道是否为空
break
}
ch2 <- i * i
}
close(ch2)
}()
// 通道 ch3 关闭会退出 for range 循环
for i := range ch2 {
// 从通道取值的第二种方法
fmt.Println("从ch2中取出值:", i)
// 从ch2中取出值: 0
// 从ch2中取出值: 1
// 从ch2中取出值: 4
}
}
// 7. 单向通道
func outCh(out <-chan int) {
for i := range out {
println("输出通道, 值:", i)
}
}
func inCh(in chan<- int) {
for i := 0; i < 3; i++ {
in <- i
}
close(in)
}
func square(in chan<- int, out <-chan int) {
for i := range out {
in <- i * i
}
close(in)
}
func f7() {
ch1 := make(chan int)
ch2 := make(chan int)
go inCh(ch1) // 限制通道在函数中只能接收
go square(ch2, ch1) // 限制一个可发送、一个可接收的通道
outCh(ch2) // 限制通道在函数中只能发送
}
// 8. 线程池 worker pool
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worder %d start job %d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("Worder %d end job %d\n", id, j)
results <- 2 * j
}
}
func f8() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 开启三个 goroutine
for w := 1; w < 4; w++ {
go worker(w, jobs, results)
}
// 5个任务
for j := 1; j < 6; j++ {
jobs <- j
}
close(jobs)
// 打印结果
for a := 1; a < 6; a++ {
<-results
}
// worker:3 start job:1
// worker:1 start job:2
// worker:2 start job:3
// worker:1 end job:2
// worker:1 start job:4
// worker:3 end job:1
// worker:3 start job:5
// worker:2 end job:3
// worker:1 end job:4
// worker:3 end job:5
}
// 9. select 多路复用
// 可处理一个或多个channel的发送/接收操作。
// 如果多个case同时满足,select会随机选择一个。
// 对于没有case的select{}会一直等待,可用于阻塞main函数。
func f9() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
// ch 能取出值
case x := <-ch:
fmt.Printf("%d\t", x) // 0 2 4 6 8
// 值能放进 ch
case ch <- i:
}
}
}
// 10. 互斥锁
// 不加锁的情况
func add(x *int64) {
for i := 0; i < 5000; i++ {
*x += 1
}
wg.Done()
}
// 加锁的情况
func add1(x *int64) {
for i := 0; i < 5000; i++ {
lock.Lock()
*x += 1
lock.Unlock()
}
wg.Done()
}
func f10() {
var x int64
wg.Add(2)
// go add(&x) // 不加锁
// go add(&x) // 不加锁
go add1(&x)
go add1(&x)
wg.Wait()
fmt.Println(x)
}
// 11. 读写互斥锁
// 读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来
func write(x *int64) {
// lock.Lock()
rwlock.Lock() // 写锁
*x += 1
rwlock.Unlock()
// lock.Unlock()
wg.Done()
}
func read() {
// lock.Lock()
rwlock.RLock()
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwlock.RUnlock()
// lock.Unlock()
wg.Done()
}
func f11() {
var x int64
start := time.Now()
for i := 0; i < 500; i++ {
wg.Add(1)
go write(&x)
}
for j := 0; j < 5000; j++ {
wg.Add(1)
go read()
}
wg.Wait()
fmt.Println(x, time.Since(start))
// lock: 500 6.0364704s
// rwlock: 500 10.9918ms
}
// 12. sync.map
func f12() {
var m = sync.Map{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n) // map 存
value, _ := m.Load(key) // map 取
fmt.Printf("n=:%d,k=:%v,v:=%v\n", n, key, value)
wg.Done()
}(i)
}
wg.Wait()
}
func main() {
f12()
}