前言
Go 语言最吸引人的地方是它内建的并发支持。
作为天然支持高并发的语言,写并发比java和python要简单方便的多.
在并发编程中,对共享资源的正确访问需要精确的控制,在目前的绝大多数语言中,都是通过加锁等线程同步方案来解决这一困难问题,而 Go 语言却另辟蹊径,它将共享的值通过 Channel 传递(实际上多个独立执行的线程很少主动共享资源)。在任意给定的时刻,最好只有一个 Goroutine 能够拥有该资源。数据竞争从设计层面上就被杜绝了。为了提倡这种思考方式,Go 语言将其并发编程哲学化为一句口号:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而应通过通信来共享内存。
这是更高层次的并发编程哲学(通过管道来传值是 Go 语言推荐的做法)。虽然像引用计数这类简单的并发问题通过原子操作或互斥锁就能很好地实现,但是通过 Channel 来控制访问能够让你写出更简洁正确的程序。
goroutine基础
goroutine 语法格式
go 函数名( 参数列表 )
例如:
go f(x, y, z)
开启一个新的 goroutine:
f(x, y, z)
Go 允许使用 go 语句开启一个新的运行期线程, 即 goroutine,以一个不同的、新创建的 goroutine 来执行一个函数。 同一个程序中的所有 goroutine 共享同一个地址空间。
实例
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
//你会看到输出的 hello 和 world 是没有固定先后顺序。因为它们是两个 goroutine 在执行
func main() {
go say("world")
say("hello")
}
注意!main函数也是也是一个协程,还是主协程,比如说上述的例子,如果给say("hello")也加上go,那么大概率终端什么都不会输出,因为主协程结束后程序就直接退出了
通道(channel)
通道(channel)是用来传递数据的一个数据结构。
通道可用于两个 goroutine 之间通过传递一个指定类型的值来同步运行和通讯。操作符 <-
用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。
ch <- v // 把 v 发送到通道 ch
v := <-ch // 从 ch 接收数据
// 并把值赋给 v
声明一个通道很简单,我们使用chan关键字即可,通道在使用前必须先创建:
ch := make(chan int)
注意:默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须有接收端相应的接收数据。
以下实例通过两个 goroutine 来计算数字之和,在 goroutine 完成计算后,它会计算两个结果的和:
实例
package main
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // 把 sum 发送到通道 c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // 从通道 c 中接收
fmt.Println(x, y, x+y)
}
/*
输出结果为:
-5 17 12
*/
通道缓冲区
通道可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小:
ch := make(chan int, 100)
带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。
不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。
注意:如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。
实例
package main
import "fmt"
func main() {
// 这里我们定义了一个可以存储整数类型的带缓冲通道
// 缓冲区大小为2
ch := make(chan int, 2)
// 因为 ch 是带缓冲的通道,我们可以同时发送两个数据
// 而不用立刻需要去同步读取数据
ch <- 1
ch <- 2
// 获取这两个数据
fmt.Println(<-ch)
fmt.Println(<-ch)
}
//执行输出结果为:
//1
//2
生产者消费者模型
并发最常见的例子
简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据。
单向channel最典型的应用是“生产者消费者模型”。channel又分为有缓冲和无缓冲channel。channel中参数传递的时候,是作为引用传递。
直接看代码:
1、无缓冲channel
示例代码实现如下
这里使用无缓冲channel,生产者生产一次数据放入channel,然后消费者从channel读取数据,如果没有只能等待,也就是阻塞,直到管道被关闭。所以宏观是生产者消费者同步执行。用时输出:8.9805ms
package main
import (
"fmt"
"time"
)
//生产这函数,定义int channel chan用来获取生产者生产的数据
func producer(out chan <- int) {
for i:=0; i<1000; i++{
data := i*i
fmt.Println("生产者生产数据:", data)
out <- data // 缓冲区写入数据
}
close(out) //写完关闭管道
}
//消费者获取管道中的数据
func consumer(in <- chan int){
// 无需同步机制,先做后做
// 没有数据就阻塞等
for data := range in {
fmt.Println("消费者得到数据:", data)
}
}
func main(){
start := time.Now() // 获取当前时间
// 传参的时候显式类型像隐式类型转换,双向管道向单向管道转换
ch := make(chan int) //无缓冲channel
go producer(ch) // 子go程作为生产者
consumer(ch) // 主go程作为消费者
elapsed := time.Since(start)
fmt.Println("程序完成耗时:", elapsed)
}
2、有缓冲channel
有缓冲channel,只修改ch := make(chan int, 5) // 添加缓冲
一句,只要缓冲区不满,生产者可以持续向缓冲区channel放入数据,只要缓冲区不为空,消费者可以持续从channel读取数据。就有了异步,并发的特性。最终耗时:6.3572ms
package main
import (
"fmt"
"time"
)
func producer(out chan<- int) {
for i := 0; i < 1000; i++ {
data := i * i
fmt.Println("生产者生产数据:", data)
out <- data // 缓冲区写入数据
}
close(out) //写完关闭管道
}
func consumer(in <-chan int) {
// 无需同步机制,先做后做
// 没有数据就阻塞等
for data := range in {
fmt.Println("消费者得到数据:", data)
}
}
func main() {
start := time.Now() // 获取当前时间
// 传参的时候显式类型像隐式类型转换,双向管道向单向管道转换
ch := make(chan int, 5) // 添加缓冲区,5
go producer(ch) // 子go程作为生产者
consumer(ch) // 主go程作为消费者
elapsed := time.Since(start)
fmt.Println("程序完成耗时:", elapsed)
}
3、实际应用
实际应用中,同时访问同一个公共区域,同时进行不同的操作。都可以划分为生产者消费者模型,比如订单系统。
很多用户的订单下达之后,放入缓冲区或者队列中,然后系统从缓冲区中去读来真正处理。系统不必开辟多个线程来对应处理多个订单,减少系统并发的负担。通过生产者消费者模式,将订单系统与仓库管理系统隔离开,且用户可以随时下单(生产数据)。如果订单系统直接调用仓库系统,那么用户单击下订单按钮后,要等到仓库系统的结果返回。这样速度会很慢。
也就是:用户变成了生产者,处理订单管理系统变成了消费者。
package main
import (
"fmt"
"time"
)
// 模拟订单对象,使用一结构体
type OrderInfo struct {
id int
}
// 生产订单--生产者
func producerOrder(out chan <- OrderInfo) {
// 业务生成订单
for i:=0; i<1000; i++{
order := OrderInfo{id: i+1}
fmt.Println("生成订单,订单ID为:", order.id)
out <- order // 写入channel
}
// 如果不关闭,消费者就会一直阻塞,等待读
close(out) // 订单生成完毕,关闭channel
}
// 处理订单--消费者
func consumerOrder(in <- chan OrderInfo) {
// 从channel读取订单,并处理
for order := range in{
fmt.Println("读取订单,订单ID为:", order.id)
}
}
func main() {
ch := make(chan OrderInfo, 5)
go producerOrder(ch)
go consumerOrder(ch)
//使用time.Sleep(time.Second * 2)阻塞,否则,程序立即停止
//time.Sleep(time.Second * 2)
//推荐使用,使用时间阻塞是不确定的
// Ctrl+C 退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n", <-sig)
}
我们这个例子中生产者和消费者是并发的。因此,消费者输出的结果序列的顺序是不确定的,这并没有问题,生产者和消费者依然可以相互配合工作。
发布订阅模型
发布订阅(publish-and-subscribe)模型通常被简写为pub/sub模
型。在这个模型中,消息生产者成为发布者(publisher),而消息
消费者则成为订阅者(subscriber),生产者和消费者是M:N的关
系。在传统生产者和消费者模型中,是将消息发送到一个队列中,
而发布订阅模型则是将消息发布给一个主题。
为此,我们构建了一个名为pubsub的发布订阅模型支持包:
// Package pubsub implements a simple multi-topic pub-
sub library.
package pubsub
import (
"sync"
"time"
)
type (
subscriber chan interface{} // 订阅者为一个管
道
topicFunc func(v interface{}) bool // 主题为一个过滤
器
)
// 发布者对象
type Publisher struct {
m sync.RWMutex // 读写锁
buffer int // 订阅队列的缓存
大小
timeout time.Duration // 发布超时时间
subscribers map[subscriber]topicFunc // 订阅者信息
}
// 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度
func NewPublisher(publishTimeout time.Duration, buffer
int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan
interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
// 发布一个主题
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
// 关闭发布者对象,同时关闭所有的订阅者管道。
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
// 发送主题,可以容忍一定的超时
func (p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface{}, wg
*sync.WaitGroup,
) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
select {
case sub <- v:
case <-time.After(p.timeout):
}
}
下面的例子中,有两个订阅者分别订阅了全部主题和含有"golang"的
主题:
import "path/to/pubsub"
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
defer p.Close()
all := p.Subscribe()
golang := p.SubscribeTopic(func(v interface{}) bool
{
if s, ok := v.(string); ok {
return strings.Contains(s, "golang")
}
return false
})
p.Publish("hello, world!")
p.Publish("hello, golang!")
go func() {
for msg := range all {
fmt.Println("all:", msg)
}
} ()
go func() {
for msg := range golang {
fmt.Println("golang:", msg)
}
} ()
// 运行一定时间后退出
time.Sleep(3 * time.Second)
}
在发布订阅模型中,每条消息都会传送给多个订阅者。发布者通常
不会知道、也不关心哪一个订阅者正在接收主题消息。订阅者和发
布者可以在运行时动态添加,是一种松散的耦合关系,这使得系统
的复杂性可以随时间的推移而增长。在现实生活中,像天气预报之
类的应用就可以应用这个并发模式。
竞争模型
假设我们想快速地搜索“golang”相关的主题,我们可能会同时打开
Bing、Google或百度等多个检索引擎。当某个搜索最先返回结果
后,就可以关闭其它搜索页面了。因为受网络环境和搜索引擎算法
的影响,某些搜索引擎可能很快返回搜索结果,某些搜索引擎也可
能等到他们公司倒闭也没有完成搜索。我们可以采用类似的策略来
编写这个程序
func main() {
ch := make(chan string, 32)
go func() {
ch <- searchByBing("golang")
}()
go func() {
ch <- searchByGoogle("golang")
}()
go func() {
ch <- searchByBaidu("golang")
}()
fmt.Println(<-ch)
}
并不关心是哪个搜索引擎先完成搜索,我们只取最先完成的,但是通道的缓存要足够大
Future模型
Future模型是将异步请求和代理模式结合的产物
举例:假设我们是一个电商平台,用户在网站下单。用户操作的是客户端它会向Future服务端发送数据,服务端会从后台的数据接口获取完整的订单数据,并响应用户。
我们模拟一下用户订单的行为:
A、用户挑完商品开始下单,这时客户端向服务器端发送请求1。
B、服务端根据客户端的信息,向后台获取完整的订单数据。这里做一个说明,比如用户客户端只发送了几个商品的id和数量,我们的服务端需要从后台数据库读取商家、商品、订单、库存等各种信息,最后拼成完整的一个订单返回。
C、步骤2会比较耗时,因此服务端直接返回给客户端一个伪造的数据,比如一个订单id。
D、客户端收到订单id后,开始检查订单信息,比如检查一下商品数量是否正确。
注意:
这里如果需要付款的话,就要等到最后订单数据的返回,也就是真实的数据返回。如果数据没有返回,就要一直等待,直到返回。
这时候完整的订单信息拼接完成了,返回了订单的完整数据,用户付款并完成这个订单。
客户端发送一个长时间的请求,服务端不需等待该数据处理完成便立即返回一个伪造的代理数据(相当于商品订单,不是商品本身),用户也无需等待,先去执行其他的若干操作后,再去调用服务器已经完成组装的真实数据。该模型充分利用了等待的时间片段