go语言之并发

haoqirui haoqirui     2023-02-23     341

关键词:

Go语言直接支持内置支持并发。当一个函数创建为goroutine时,Go会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行。

Go语言运行时的调度器是一个复杂的软件,这个调度器在操作系统之上。操作系统的线程与语言运行时的逻辑处理器绑定,并在逻辑处理器上运行goroutine。

Go语言的并发同步逻辑来自一个叫做通信顺讯进程(CSP)的范型。CSP是一种消息传递模型,通过在goroutine之间传递数据来传递消息,而不是通过对数据进行加锁来实现同步访问。这种数据的类型叫做通道(channel) 。

并发与并行

在操作系统中,一个应用程序就可以看作一个进程,而每个进程至少包含一个线程。每个进程的初始线程被称为主线程。

操作系统会在物理处理器(CPU)上调度线程来运行,而Go语言会在逻辑处理器来调度goroutine来运行。1.5版本之上,Go语言的运行时默认会为每个可用的物理处理器分配一个逻辑处理器。1.5之前,默认给整个应用程序只分配一个逻辑处理器。

如下图,在运行时把goroutine调度到逻辑处理器上运行,逻辑处理器绑定到唯一的操作系统线程。

技术分享图片

当goroutine执行了一个阻塞的系统调用(就是一个非纯CPU的任务)时,调度器会将这个线程与处理器分离,并创建一个新线程来运行这个处理器上提供的服务。

技术分享图片

语言运行默认限制每个程序最多创建10000个线程。

注意并发≠并行!并行需要至少2个逻辑处理器。

goroutine

以并发的形式分别显示大写和小写的英文字母

  1: package main
  2: 
  3: import (
  4: 	"fmt"
  5: 	"runtime"
  6: 	"sync"
  7: )
  8: 
  9: func main() 
 10: 	// 分配一个逻辑处理器给调度器使用
 11: 	runtime.GOMAXPROCS(1)
 12: 	// wg用来等待程序完成
 13: 	var wg sync.WaitGroup
 14: 	// 计数器加2,表示要等待两个goroutine
 15: 	wg.Add(2)
 16: 	fmt.Println("Start!")
 17: 	// 声明一个匿名函数,并创建一个goroutime
 18: 	go func() 
 19: 		// 通知main函数工作已经完成
 20: 		defer wg.Done()
 21: 		// 显示字母表3次
 22: 		for count:=0; count<3;count++ 
 23: 			for char:=‘a‘;char<‘a‘+26;char++ 
 24: 				fmt.Printf("%c ", char)
 25: 			
 26: 		
 27: 	()
 28: 	// 同上
 29: 	go func() 
 30: 		// 通知main函数工作已经完成
 31: 		defer wg.Done()
 32: 		// 显示字母表3次
 33: 		for count:=0; count<3;count++ 
 34: 			for char:=‘A‘;char<‘A‘+26;char++ 
 35: 				fmt.Printf("%c ", char)
 36: 			
 37: 		
 38: 	()
 39: 	// 等待goroutine结束
 40: 	fmt.Println("Waiting!")
 41: 	wg.Wait()
 42: 	fmt.Println("
Finish!")
 43: 

运行结果后,可以看到先输出的是所有的大写字母,最后才是小写字母。是因为第一个goroutine完成所有显示需要花时间太短了,以至于在调度器切换到第二个goroutine之前,就完成了所有任务。

调度器为了防止某个goroutine长时间占用逻辑处理器,会停止当前正运行的goroutine,运行其他可运行的goroutine运行的机会。

创建两个相同的长时间才能完成其工作的goroutine就可以看到,比如说显示5000以内的素数值。

代码结构如下

  1: go printPrime("A")
  2: go printPrime("B")
  3: 
  4: func printPrime(prefix string) 
  5: 	...
  6: 

结果类似

  1: B:2
  2: B:3
  3: ...
  4: B:4591
  5: A:3
  6: A:5
  7: ...
  8: A:4561
  9: A:4567
 10: B:4603
 11: B:4621
 12: ...
 13: // Completed B
 14: A:4457
 15: ...
 16: // Completed A

如何修改逻辑处理器的数量

  1: runtime.GOMAXPROCS(runtime.NUMCPU())

稍微改动下上面的代码,结果就会大不同

  1: package main
  2: 
  3: import (
  4: "fmt"
  5: "runtime"
  6: "sync"
  7: )
  8: 
  9: func main() 
 10: 	// 分配两个逻辑处理器给调度器使用
 11: 	runtime.GOMAXPROCS(2)
 12: 	// wg用来等待程序完成
 13: 	var wg sync.WaitGroup
 14: 	// 计数器加2,表示要等待两个goroutine
 15: 	wg.Add(2)
 16: 	fmt.Println("Start!")
 17: 	// 声明一个匿名函数,并创建一个goroutime
 18: 	go func() 
 19: 		// 通知main函数工作已经完成
 20: 		defer wg.Done()
 21: 		// 显示字母表3次
 22: 		for count:=0; count<10;count++ 
 23: 			for char:=‘a‘;char<‘a‘+26;char++ 
 24: 				fmt.Printf("%c ", char)
 25: 			
 26: 		
 27: 	()
 28: 	// 同上
 29: 	go func() 
 30: 		// 通知main函数工作已经完成
 31: 		defer wg.Done()
 32: 		// 显示字母表3次
 33: 		for count:=0; count<10;count++ 
 34: 			for char:=‘A‘;char<‘A‘+26;char++ 
 35: 				fmt.Printf("%c ", char)
 36: 			
 37: 		
 38: 	()
 39: 	// 等待goroutine结束
 40: 	fmt.Println("Waiting!")
 41: 	wg.Wait()
 42: 	fmt.Println("
Finish!")
 43: 

结果类似下面的(根据CPU单核的性能结果可能结果稍微不一样)

  1: Start!
  2: Waiting!
  3: a b c d e f g h i j k l m n o A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g
  4: h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
  5: a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s
  6: t u v w x y z M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X
  7: Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q
  8: R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
  9: Finish!
可以发现,goroutine是并行运行的。

只有在有多个逻辑处理器且可以同时让每个goroutine运行在一个可用的物理处理器上的时候,goroutine才会并行运行。

竞争状态

如果两个或者多个goroutine在没有互相同步的情况下,访问某个共享的资源,并且试图同时读和写这个资源,就处于相互竞争的状态。

在竞争状态,每个goroutine都会覆盖另一个goroutine的工作。这种覆盖发生在goroutine发生切换的时候。

每个goroutien都会创造自己的共享变量副本。当切换到领另一个goroutine时,如果这个变量的值在上一个goroutine发生改变,这个goroutine再次运行时,虽然变量的值改变了,但是由于这个goroutine没有更新自己的那个副本的值,而是继续使用,并且将其存回变量的值,从而覆盖上一个goroutine 的工作。

go build –race用来竞争检测器标志来编译程序

锁住共享资源

原子函数

原子函数能够以底层的枷锁机制来同步访问整型变量和指针。省略部分代码如下:

  1: var counter int64
  2: go incCounter(1)
  3: go incCounter(2)
  4: func incCounter(id int) 
  5: 	for count:=0;count<2;count++
  6: 		//安全地对counter加1
  7: 		atomic.AddInt64(&counter, 1)
  8: 		//当前goroutine从线程退出,并放回队列
  9: 		runtime.Gosched()
 10: 	
 11: 

使用atmoic包的AddInt64函数。这些goroutine都会自动根据所引用的变量做同步处理。

另外两个原子函数是LoadInt64和StoreInt64。用法如下:

  1: // shutdown是通知正在执行的goroutine停止工作的标志
  2: var shutdown int64
  3: var wg sync.WaitGroup
  4: // 该停止工作了,安全地设置shutdown标志
  5: atomic.StoreInt64(&shutdown, 1)
  6: // 等待goroutine结束
  7: wg.Wait()
  8: // 检测是否停止工作,如果shutdown==1那么goroutine就会终止
  9: if atomic.LoadInt64(&shutdown) == 1 
 10: 	break
 11: 
 12: 

互斥锁

另一种同步访问共享资源的方式是互斥锁。主要代码如下:

  1: var (
  2: 	// counter是所有goroutine都要增加其值的变量
  3: 	counter int
  4: 	wg sync.WaitGroup
  5: 	// mutex用来定义一段代码临界区
  6: 	mutex sync.Mutex
  7: )
  8: func main...
  9: // 业务代码
 10: func incCounter(id int) 
 11: 	defer wg.Done()
 12: 	for i:=0;i<2;i++ 
 13: 		//同一时期只允许一个goroutine进入
 14: 		mutex.Lock()
 15: 		//大括号并不是必须的
 16: 		
 17: 			//捕获counter的值
 18: 			value := counter
 19: 			//当前goroutine从线程退出,并返回到队列
 20: 			runtime.Gosched()
 21: 			//增加本地value变量的值
 22: 			value++
 23: 			//将该值保存回counter
 24: 			counter = value
 25: 		
 26: 		// 释放锁,允许其他正在等待的goroutine
 27: 		mutex.Unlock()
 28: 	
 29: 

通道

通道在goroutine之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。

可以通过通道共享内置类型,命名类型,结构类型和引用类型的值或者指针。

go语言需要使用make来创建一个通道,chan是关键字:

  1: // 无缓冲的整型通道
  2: unbuffered := make(chan int)
  3: // 有缓冲的字符串通道
  4: buffered := make(chan string, 10)
向通道发送值
  1: buffered := make(chan string, 10)
  2: // 通过通道发送一个字符串
  3: buffered <- "Gopher"
  4: // 从通道接收一个字符串
  5: value := <-buffered

无缓冲的通道是指在接收前没有能力保存任何值的通道。发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。如果没有准备好,通道会导致goroutine阻塞等待。所以无缓冲通道保证了goroutine之间同一时间进行数据交换。

  1: // 四个goroutine间的接力比赛
  2: package main
  3: 
  4: import (
  5: 	"fmt"
  6: 	"sync"
  7: 	"time"
  8: )
  9: 
 10: var wg sync.WaitGroup
 11: 
 12: func main()  
 13: 	//创建一个无缓冲的通道
 14: 	baton := make(chan int)
 15: 	wg.Add(1)
 16: 	// 第一步跑步者持有接力棒
 17: 	go Runner(baton)
 18: 	// 开始比赛
 19: 	baton <- 1
 20: 	// 等待比赛结束
 21: 	wg.Wait()
 22: 
 23: 
 24: // Ruuner模拟接力比赛中的一位跑步者
 25: func Runner(baton chan int) 
 26: 	var newRunner int
 27: 	// 等待接力棒
 28: 	runner := <-baton
 29: 	// 开始跑步
 30: 	fmt.Printf("运动员%d带着Baton跑
", runner)
 31: 	// 创建下一步跑步者
 32: 	if runner != 4
 33: 		newRunner = runner + 1
 34: 		fmt.Printf("运动员%d上线
", newRunner)
 35: 		go Runner(baton)
 36: 	
 37: 	// 围绕跑到跑
 38: 	time.Sleep(100 * time.Millisecond)
 39: 	// 比赛结束了吗?
 40: 	if runner == 4
 41: 		fmt.Printf("运动员%d完成,比赛结束
", runner)
 42: 		wg.Done()
 43: 		return
 44: 	
 45: 	// 将接力棒交给下一位跑步者
 46: 	fmt.Printf("运动员%d与运动员%d交换
", runner, newRunner)
 47: 	baton <- newRunner
 48: 

结果

  1: 运动员1带着Baton跑
  2: 运动员2上线
  3: 运动员1与运动员2交换
  4: 运动员2带着Baton跑
  5: 运动员3上线
  6: 运动员2与运动员3交换
  7: 运动员3带着Baton跑
  8: 运动员4上线
  9: 运动员3与运动员4交换
 10: 运动员4带着Baton跑
 11: 运动员4完成,比赛结束

有缓冲的通道则能在接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求goroutine之间必须同时完成发送和接收。只有在通道没有可用缓冲区或者没有要接收的值时,发送或者接收才会阻塞。

  1: package main
  2: 
  3: import (
  4: 	"fmt"
  5: 	"math/rand"
  6: 	"sync"
  7: 	"time"
  8: )
  9: 
 10: const (
 11: 	// goroutine的数量
 12: 	numberGoroutines = 4
 13: 	// 工作的数量
 14: 	taskLoad = 10
 15: )
 16: 
 17: var wg sync.WaitGroup
 18: 
 19: // 初始化随机数种子
 20: func init() 
 21: 	rand.Seed(time.Now().Unix())
 22: 
 23: func main() 
 24: 	// 创建一个有缓冲的通道来管理工作
 25: 	tasks := make(chan string, taskLoad)
 26: 	wg.Add(numberGoroutines)
 27: 	// 增加一组要完成的工作
 28: 	for post:=1;post<taskLoad;post++ 
 29: 		tasks <- fmt.Sprintf("Task:%d", post)
 30: 	
 31: 	// 启动goroutine来处理工作
 32: 	for i:=1;i<numberGoroutines+1;i++ 
 33: 		go worker(tasks, i)
 34: 	
 35: 	// 所有工作处理完时关闭通道
 36: 	close(tasks)
 37: 
 38: 	wg.Wait()
 39: 	fmt.Printf("all finished!")
 40: 
 41: 
 42: 
 43: func worker(tasks chan string, worker_id int) 
 44: 	defer wg.Done()
 45: 
 46: 	for 
 47: 		//等待分配工作
 48: 		task, ok := <-tasks
 49: 		if !ok 
 50: 			//通道变空
 51: 			fmt.Printf("Worker%d shut down
", worker_id)
 52: 			return
 53: 		
 54: 		// 开始工作
 55: 		fmt.Printf("Worker%d start %s
", worker_id, task)
 56: 
 57: 		// 随机等待一段时间
 58: 		sleep := rand.Int63n(100)
 59: 		time.Sleep(time.Duration(sleep)*time.Millisecond)
 60: 		// 显示完成了工作
 61: 		fmt.Printf("Worker%d Completed %s
", worker_id, task)
 62: 	
 63: 
输出结果:
  1: Worker4 start Task:1
  2: Worker1 start Task:2
  3: Worker2 start Task:3
  4: Worker3 start Task:4
  5: Worker3 Completed Task:4
  6: Worker3 start Task:5
  7: Worker4 Completed Task:1
  8: Worker4 start Task:6
  9: Worker2 Completed Task:3
 10: Worker2 start Task:7
 11: Worker3 Completed Task:5
 12: Worker3 start Task:8
 13: Worker2 Completed Task:7
 14: Worker2 start Task:9
 15: Worker3 Completed Task:8
 16: Worker3 shut down
 17: Worker4 Completed Task:6
 18: Worker4 shut down
 19: Worker1 Completed Task:2
 20: Worker1 shut down
 21: Worker2 Completed Task:9
 22: Worker2 shut down
 23: all finished!

由于程序和Go语言的调度器有随机的成分,结果每次都会不一样。不过总流程不会大变。

当通道关闭后,goroutine依旧从通道里的缓冲区获取数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,兵返回一个通道类型的零值。

关于实际工程里的并发模式,下一篇再讲。

go语言系列之并发编程(代码片段)

Go语言中的并发编程并发与并行并发:同一时间段内执行多个任务(你在用微信和两个女朋友聊天)。并行:同一时刻执行多个任务(你和你朋友都在用微信和女朋友聊天)。Go语言的并发通过goroutine实现。goroutine类似于线程,... 查看详情

go语言学习之旅--并发编程

Go语言学习之旅--并发编程golang并发编程之协程golang并发编程之通道golang并发编程之WaitGroup实现同步golang并发编程之runtime包golang并发编程之Mutex互斥锁实现同步golang并发编程之channel的遍历golang并发编程之selectswitchgolang并发编程之T... 查看详情

go_11:go语言基础之并发concurrency

...称的高并发的根本原因。另外,goroutine的简单易用,也在语言层面上给予了开发者巨大的遍历。  高并发当中一定要注意:并发可不是并行。   查看详情

go语言基础之并发concurrency

...称的高并发的根本原因。另外,goroutine的简单易用,也在语言层面上给予了开发者巨大的遍历。  高并发当中一定要注意:并发可不是并行。   查看详情

go语言基础之并发(代码片段)

并发是编程里面一个非常重要的概念,Go语言在语言层面天生支持并发,这也是Go语言流行的一个很重要的原因。Go语言中的并发编程并发与并行并发:同一时间段内执行多个任务并行:同一时刻执行多个任务Go语言的并发通过goro... 查看详情

go语言之并发资源竞争

并发本身并不复杂,但是因为有了资源竞争的问题,就使得我们开发出好的并发程序变得复杂起来,因为会引起很多莫名其妙的问题。package mainimport (    "fmt"    "runtime"    "sync")... 查看详情

go语言基础之并发(代码片段)

Go语言中的并发编程——并发是编程里面一个非常重要的概念,Go语言在语言层面天生支持并发,这也是Go语言流行的一个很重要的原因。并发与并行并发:同一时间段内执行多个任务(你在用微信和两个女朋友聊天)。并... 查看详情

go语言基础之并发和网络

1、goroutine在这章中将展示Go使用channel和goroutine开发并行程序的能力。goroutine是Go并发能力的核心要素。但是,goroutine到底是什么?叫做goroutine是因为已有的短语——线程、协程、进程等等——传递了不准确的含义。goroutine有简单... 查看详情

19.go语言基础之并发(代码片段)

...执行多个任务(windows中360在杀毒,同时你也在写代码)Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而... 查看详情

go语言之并发(代码片段)

一:并发基础1并发和并行并发和并行是两个不同的概念:1并行意味着程序在任意时刻都是同时运行的:2并发意味着程序在单位时间内是同时运行的详解:  并行就是在任一粒度的时间内都具备同时执行的能力:最简单的并行... 查看详情

go语言之并发示例-pool

针对这个资源池管理的一步步都实现了,而且做了详细的讲解,下面就看下整个示例代码,方便理解。package commonimport (    "errors"    "io"    "sync"    "log")//一 查看详情

go语言之并发示例-pool

这篇文章演示使用有缓冲的通道实现一个资源池,这个资源池可以管理在任意多个goroutine之间共享的资源,比如网络连接、数据库连接等,我们在数据库操作的时候,比较常见的就是数据连接池,也可以基于我们实现的资源池来... 查看详情

go语言之并发编程channel

单向channel:单向通道可分为发送通道和接收通道。但是无论哪一种单向通道,都不应该出现在变量的声明中,假如初始化了这样一个变量varuselessChanchan<-int=make(chan<-int,10)这样一个变量该如何使用呢,这样一个只进不出的通道... 查看详情

go语言之context

控制并发有两种经典的方式,一种是WaitGroup,另外一种就是Context,今天我就谈谈Context。什么是WaitGroupWaitGroup以前我们在并发的时候介绍过,它是一种控制并发的方式,它的这种方式是控制多个goroutine同时完成。 funcmain(){ ... 查看详情

go语言之并发示例(runner)

这篇通过一个例子,演示使用通道来监控程序的执行时间,生命周期,甚至终止程序等。我们这个程序叫runner,我们可以称之为执行者,它可以在后台执行任何任务,而且我们还可以控制这个执行者,比如强制终止它等。现在开... 查看详情

融云开发漫谈:你是否了解go语言并发编程的第一要义?

2007年诞生的Go语言,凭借其近C的执行性能和近解析型语言的开发效率,以及近乎完美的编译速度,席卷全球。Go语言相关书籍也如雨后春笋般涌现,前不久,一本名为《Go语言并发之道》的书籍被翻译引进国内,并迅速引起广泛... 查看详情

go语言之并发

Go语言直接支持内置支持并发。当一个函数创建为goroutine时,Go会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行。Go语言运行时的调度器是一个复杂的软件,这个调度器在操作系统之上。操作系统的线... 查看详情

go语言之路—博客目录

Go语言介绍为什么你应该学习Go语言?开发环境准备从零开始搭建Go语言开发环境VSCode配置Go语言开发环境Go语言基础Go语言基础之变量和常量Go语言基础之基本数据类型Go语言基础之运算符Go语言基础之流程控制Go语言基础之数组Go... 查看详情