Go并发(goroutine)及并发常用模型的实现

2023-11-13

前言

Go 语言最吸引人的地方是它内建的并发支持。

作为天然支持高并发的语言,写并发比java和python要简单方便的多.

在并发编程中,对共享资源的正确访问需要精确的控制,在目前的绝大多数语言中,都是通过加锁等线程同步方案来解决这一困难问题,而 Go 语言却另辟蹊径,它将共享的值通过 Channel 传递(实际上多个独立执行的线程很少主动共享资源)。在任意给定的时刻,最好只有一个 Goroutine 能够拥有该资源。数据竞争从设计层面上就被杜绝了。为了提倡这种思考方式,Go 语言将其并发编程哲学化为一句口号:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通过共享内存来通信,而应通过通信来共享内存。

这是更高层次的并发编程哲学(通过管道来传值是 Go 语言推荐的做法)。虽然像引用计数这类简单的并发问题通过原子操作或互斥锁就能很好地实现,但是通过 Channel 来控制访问能够让你写出更简洁正确的程序。

goroutine基础

goroutine 语法格式

go 函数名( 参数列表 )

例如:

go f(x, y, z)

开启一个新的 goroutine:

f(x, y, z)

Go 允许使用 go 语句开启一个新的运行期线程, 即 goroutine,以一个不同的、新创建的 goroutine 来执行一个函数。 同一个程序中的所有 goroutine 共享同一个地址空间。

实例

package main

import (
        "fmt"
        "time"
)

func say(s string) {
        for i := 0; i < 5; i++ {
                time.Sleep(100 * time.Millisecond)
                fmt.Println(s)
        }
}
//你会看到输出的 hello 和 world 是没有固定先后顺序。因为它们是两个 goroutine 在执行
func main() {
        go say("world")
        say("hello")
}
注意!main函数也是也是一个协程,还是主协程,比如说上述的例子,如果给say("hello")也加上go,那么大概率终端什么都不会输出,因为主协程结束后程序就直接退出了

通道(channel)

通道(channel)是用来传递数据的一个数据结构。

通道可用于两个 goroutine 之间通过传递一个指定类型的值来同步运行和通讯。操作符 <- 用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。

ch <- v    // 把 v 发送到通道 ch
v := <-ch  // 从 ch 接收数据
           // 并把值赋给 v

声明一个通道很简单,我们使用chan关键字即可,通道在使用前必须先创建:

ch := make(chan int)

注意:默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须有接收端相应的接收数据。

以下实例通过两个 goroutine 来计算数字之和,在 goroutine 完成计算后,它会计算两个结果的和:

实例

package main

import "fmt"

func sum(s []int, c chan int) {
        sum := 0
        for _, v := range s {
                sum += v
        }
        c <- sum // 把 sum 发送到通道 c
}

func main() {
        s := []int{7, 2, 8, -9, 4, 0}

        c := make(chan int)
        go sum(s[:len(s)/2], c)
        go sum(s[len(s)/2:], c)
        x, y := <-c, <-c // 从通道 c 中接收

        fmt.Println(x, y, x+y)
}
/*
输出结果为:

-5 17 12
*/

通道缓冲区

通道可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小:

ch := make(chan int, 100)

带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。

不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。

注意:如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。

实例

package main

import "fmt"

func main() {
    // 这里我们定义了一个可以存储整数类型的带缓冲通道
        // 缓冲区大小为2
        ch := make(chan int, 2)

        // 因为 ch 是带缓冲的通道,我们可以同时发送两个数据
        // 而不用立刻需要去同步读取数据
        ch <- 1
        ch <- 2

        // 获取这两个数据
        fmt.Println(<-ch)
        fmt.Println(<-ch)
}
//执行输出结果为:
//1
//2

生产者消费者模型

并发最常见的例子

简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据。

单向channel最典型的应用是“生产者消费者模型”。channel又分为有缓冲和无缓冲channel。channel中参数传递的时候,是作为引用传递。

直接看代码:

1、无缓冲channel

示例代码实现如下

这里使用无缓冲channel,生产者生产一次数据放入channel,然后消费者从channel读取数据,如果没有只能等待,也就是阻塞,直到管道被关闭。所以宏观是生产者消费者同步执行。用时输出:8.9805ms

package main

import (
	"fmt"
	"time"
)

//生产这函数,定义int channel chan用来获取生产者生产的数据
func producer(out chan <- int) {
	for i:=0; i<1000; i++{
		data := i*i
		fmt.Println("生产者生产数据:", data)
		out <- data  // 缓冲区写入数据
	}
	close(out)  //写完关闭管道
}

//消费者获取管道中的数据
func consumer(in <- chan int){
	// 无需同步机制,先做后做
	// 没有数据就阻塞等
	for data := range in {
		fmt.Println("消费者得到数据:", data)
	}

}

func main(){
	start := time.Now() // 获取当前时间
	// 传参的时候显式类型像隐式类型转换,双向管道向单向管道转换
	ch := make(chan int)  //无缓冲channel
	go producer(ch)  // 子go程作为生产者
	consumer(ch)  // 主go程作为消费者
	elapsed := time.Since(start)
    fmt.Println("程序完成耗时:", elapsed)
}

2、有缓冲channel

有缓冲channel,只修改ch := make(chan int, 5) // 添加缓冲一句,只要缓冲区不满,生产者可以持续向缓冲区channel放入数据,只要缓冲区不为空,消费者可以持续从channel读取数据。就有了异步,并发的特性。最终耗时:6.3572ms

package main

import (
	"fmt"
	"time"
)

func producer(out chan<- int) {
	for i := 0; i < 1000; i++ {
		data := i * i
		fmt.Println("生产者生产数据:", data)
		out <- data // 缓冲区写入数据
	}
	close(out) //写完关闭管道
}

func consumer(in <-chan int) {
	// 无需同步机制,先做后做
	// 没有数据就阻塞等
	for data := range in {
		fmt.Println("消费者得到数据:", data)
	}

}

func main() {
	start := time.Now() // 获取当前时间
	// 传参的时候显式类型像隐式类型转换,双向管道向单向管道转换
	ch := make(chan int, 5) // 添加缓冲区,5

	go producer(ch) // 子go程作为生产者
	consumer(ch)    // 主go程作为消费者
	elapsed := time.Since(start)
	fmt.Println("程序完成耗时:", elapsed)
}

 3、实际应用

实际应用中,同时访问同一个公共区域,同时进行不同的操作。都可以划分为生产者消费者模型,比如订单系统。
很多用户的订单下达之后,放入缓冲区或者队列中,然后系统从缓冲区中去读来真正处理。系统不必开辟多个线程来对应处理多个订单,减少系统并发的负担。通过生产者消费者模式,将订单系统与仓库管理系统隔离开,且用户可以随时下单(生产数据)。如果订单系统直接调用仓库系统,那么用户单击下订单按钮后,要等到仓库系统的结果返回。这样速度会很慢。
也就是:用户变成了生产者,处理订单管理系统变成了消费者。

package main

import (
	"fmt"
	"time"
)


// 模拟订单对象,使用一结构体
type OrderInfo struct {
	id int
}


// 生产订单--生产者
func producerOrder(out chan <- OrderInfo)  {
	// 业务生成订单
	for i:=0; i<1000; i++{
		order := OrderInfo{id: i+1}
		fmt.Println("生成订单,订单ID为:", order.id)
		out <- order // 写入channel
	}
	// 如果不关闭,消费者就会一直阻塞,等待读
	close(out)  // 订单生成完毕,关闭channel
}


// 处理订单--消费者
func consumerOrder(in <- chan OrderInfo)  {
	// 从channel读取订单,并处理
	for order := range in{
		fmt.Println("读取订单,订单ID为:", order.id)
	}
}

func main()  {
	ch := make(chan OrderInfo, 5)
	go producerOrder(ch)
	go consumerOrder(ch)
    //使用time.Sleep(time.Second * 2)阻塞,否则,程序立即停止
	//time.Sleep(time.Second * 2)
    //推荐使用,使用时间阻塞是不确定的
    // Ctrl+C 退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Printf("quit (%v)\n", <-sig)
    
}

我们这个例子中生产者和消费者是并发的。因此,消费者输出的结果序列的顺序是不确定的,这并没有问题,生产者和消费者依然可以相互配合工作。 

发布订阅模型


发布订阅(publish-and-subscribe)模型通常被简写为pub/sub模
型。在这个模型中,消息生产者成为发布者(publisher),而消息
消费者则成为订阅者(subscriber),生产者和消费者是M:N的关
系。在传统生产者和消费者模型中,是将消息发送到一个队列中,
而发布订阅模型则是将消息发布给一个主题。
为此,我们构建了一个名为pubsub的发布订阅模型支持包:

// Package pubsub implements a simple multi-topic pub-
sub library.
package pubsub
import (
"sync"
"time"
)
type (
subscriber chan interface{} // 订阅者为一个管
道
topicFunc func(v interface{}) bool // 主题为一个过滤
器
)
// 发布者对象
type Publisher struct {
m sync.RWMutex // 读写锁
buffer int // 订阅队列的缓存
大小
 timeout time.Duration // 发布超时时间
subscribers map[subscriber]topicFunc // 订阅者信息
}
// 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度
func NewPublisher(publishTimeout time.Duration, buffer
int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan
interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
 delete(p.subscribers, sub)
close(sub)
}
// 发布一个主题
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
// 关闭发布者对象,同时关闭所有的订阅者管道。
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
// 发送主题,可以容忍一定的超时
func (p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface{}, wg
*sync.WaitGroup,
) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
select {
case sub <- v:
case <-time.After(p.timeout):
}
}


下面的例子中,有两个订阅者分别订阅了全部主题和含有"golang"的
主题:

import "path/to/pubsub"
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
defer p.Close()
all := p.Subscribe()
golang := p.SubscribeTopic(func(v interface{}) bool
{
if s, ok := v.(string); ok {
return strings.Contains(s, "golang")
}
return false
})
p.Publish("hello, world!")
p.Publish("hello, golang!")
 go func() {
for msg := range all {
fmt.Println("all:", msg)
}
} ()
go func() {
for msg := range golang {
fmt.Println("golang:", msg)
}
} ()
// 运行一定时间后退出
time.Sleep(3 * time.Second)
}


在发布订阅模型中,每条消息都会传送给多个订阅者。发布者通常
不会知道、也不关心哪一个订阅者正在接收主题消息。订阅者和发
布者可以在运行时动态添加,是一种松散的耦合关系,这使得系统
的复杂性可以随时间的推移而增长。在现实生活中,像天气预报之
类的应用就可以应用这个并发模式。

竞争模型

假设我们想快速地搜索“golang”相关的主题,我们可能会同时打开
Bing、Google或百度等多个检索引擎。当某个搜索最先返回结果
后,就可以关闭其它搜索页面了。因为受网络环境和搜索引擎算法
的影响,某些搜索引擎可能很快返回搜索结果,某些搜索引擎也可
能等到他们公司倒闭也没有完成搜索。我们可以采用类似的策略来
编写这个程序

func main() {
ch := make(chan string, 32)
go func() {
ch <- searchByBing("golang")
}()
go func() {
ch <- searchByGoogle("golang")
}()
go func() {
ch <- searchByBaidu("golang")
}()
fmt.Println(<-ch)
}

并不关心是哪个搜索引擎先完成搜索,我们只取最先完成的,但是通道的缓存要足够大 

Future模型

Future模型是将异步请求和代理模式结合的产物
举例:假设我们是一个电商平台,用户在网站下单。用户操作的是客户端它会向Future服务端发送数据,服务端会从后台的数据接口获取完整的订单数据,并响应用户。
我们模拟一下用户订单的行为:

A、用户挑完商品开始下单,这时客户端向服务器端发送请求1。

B、服务端根据客户端的信息,向后台获取完整的订单数据。这里做一个说明,比如用户客户端只发送了几个商品的id和数量,我们的服务端需要从后台数据库读取商家、商品、订单、库存等各种信息,最后拼成完整的一个订单返回。

C、步骤2会比较耗时,因此服务端直接返回给客户端一个伪造的数据,比如一个订单id。

D、客户端收到订单id后,开始检查订单信息,比如检查一下商品数量是否正确。
注意:
这里如果需要付款的话,就要等到最后订单数据的返回,也就是真实的数据返回。如果数据没有返回,就要一直等待,直到返回。

这时候完整的订单信息拼接完成了,返回了订单的完整数据,用户付款并完成这个订单。

客户端发送一个长时间的请求,服务端不需等待该数据处理完成便立即返回一个伪造的代理数据(相当于商品订单,不是商品本身),用户也无需等待,先去执行其他的若干操作后,再去调用服务器已经完成组装的真实数据。该模型充分利用了等待的时间片段

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Go并发(goroutine)及并发常用模型的实现 的相关文章

随机推荐

  • Java 脱敏工具类总结

    开发过程中 会遇到很多敏感数据的显示 这样不太安全 就需要用到相关的脱敏工具进行脱敏之后再显示 总结部分脱敏工具 package com li info import org apache commons lang3 StringUtils
  • pycharm的setting里面找不到我们用conda配置好的环境

    原因 电脑里面有多个python版本导致 我们目前添加的python并不是我们anaconda中python的版本 解决办法 在pycharm里面 使用setting里面的Add选项里 可以找到我们配置的所有环境 然后添加进去 具体如下图所
  • sql server将字符串转换为 uniqueidentifier 时失败

    sql server将字符串转换为 uniqueidentifier 时失败 sql server查询中出现 将字符串转换为 uniqueidentifier 时失败异常 原因为id设置为uniqueidentifier 字段 在where
  • shell day3

    思维导图
  • Unity错误记录——UIDocument重新激活后,脚本内编写的UI交互失效

    如题 本人使用UITookit编写了一个UXML的UI界面后 将其搭载到了一个UIDocument对象上 并将该对象作为UI相机的子对象以实现UI界面与相机绑定 在为UIDocument编写交互用的C 时 采用了下面的错误做法 导致UIDo
  • 正确获取硬盘序列号源码

    参考 http www winsim com diskid32 diskid32 cpp diskid32 cpp for displaying the details of hard drives in a command window
  • BES2300x笔记----TWS组对与蓝牙配对

    https me csdn net zhanghuaishu0 一 前言 看到有 道友 在评论区留言 对TWS组对 BT配对以及回连流程部分很迷糊 那这第二篇我们就来说说BES平台的相关流程和接口 PS 蓝牙基础部分就不再赘述了 网上有很多
  • jdbc mysql 重连_mysql重连的问题

    应用在长时间不连mysql后会与mysql断开 再次链接mysql时会报无法连接数据库的异常 所以连接的配置需要稍微改一下 factory org apache naming factory BeanFactory driverClass
  • LABVIEW连接MySQL进行读写更新查询操作并仿真

    相关软件的准备 欢迎访问我的小站 我的软件环境是LabVIEW 2018 32位 的 这个很重要 因为不同位数的labview需要安装不同位数的Connector odbc 还需要安装visio的运行环境 这个需要提前准备 Mysql的安装
  • 华为数字化转型之道 平台篇 第十三章 变革治理体系

    第十三章 变革治理体系 约翰 科特在 领导变革 一书中说 变革的领导团队既需要管理能力 也需要领导能力 他们必须结合起来 前面我们也谈到 数字化转型不仅是技术的创新 更是一项系统工程和企业真正的变革 企业要转型成功 既需要各个组织的积极参与
  • python---matplotlib详细教程(完结)

    文章每个图都带有案例 欢迎访问 目录 如何选择合适的图表 绘制简单的折线图 图表常用设置 颜色设置 线条样式和标记样式 画布设置 设置坐标轴标题 plt rcParams font sans serif SimHei 解决缺失字体 设置坐标
  • 【三】springboot整合token(超详细)

    springboot篇章整体栏目 一 springboot整合swagger 超详细 二 springboot整合swagger 自定义 超详细 三 springboot整合token 超详细 四 springboot整合mybatis p
  • 【华为OD机试真题 python】组装新的数组【2023 Q1

    题目描述 组装新的数组 给你一个整数M和数组N N中的元素为连续整数 要求根据N中的元素组装成新的数组R 组装规则 1 R中元素总和加起来等于M 2 R中的元素可以从N中重复选取 3 R中的元素最多只能有1个不在N中 且比N中的数字都要小
  • python格式化输出,format,数据类型转换。

    输出 计算机给用户输出的内容 是一个由里到外的一个过程 例如python语言中的print函数 输入 则相反 例如input函数 一 输出有普通的输出 也有格式化输出 普通输出 类似于 print hello word 这样直接打印 格式化
  • 为高尔夫比赛砍树

    为高尔夫比赛砍树 你被请来给一个要举办高尔夫比赛的树林砍树 树林由一个 m x n 的矩阵表示 在这个矩阵中 0 表示障碍 无法触碰 1 表示地面 可以行走 比 1 大的数 表示有树的单元格 可以行走 数值表示树的高度 每一步 你都可以向上
  • 系统篇: squashfs 文件系统

    一 squashfs简介 Squashfs是一套基于Linux内核使用的压缩只读文件系统 该文件系统能够压缩系统内的文档 inode以及目录 文件最大支持2 64字节 特点 数据 data 节点 inode 和目录 directories
  • 虚幻C++ http请求

    直接上代码 Fill out your copyright notice in the Description page of Project Settings pragma once include CoreMinimal h inclu
  • 测试岗?从功能测试进阶自动化测试开发,测试之路不迷茫...

    目录 导读 前言 一 Python编程入门到精通 二 接口自动化项目实战 三 Web自动化项目实战 四 App自动化项目实战 五 一线大厂简历 六 测试开发DevOps体系 七 常用自动化测试工具 八 JMeter性能测试 九 总结 尾部小
  • Mock框架应用(四)-Mock 重定向请求

    例一 先新建json配置文件重定向到www baidu com 启动mock服务 description 实现重定向的请求 request uri redirect redirectTo https www baidu com respon
  • Go并发(goroutine)及并发常用模型的实现

    前言 Go 语言最吸引人的地方是它内建的并发支持 作为天然支持高并发的语言 写并发比java和python要简单方便的多 在并发编程中 对共享资源的正确访问需要精确的控制 在目前的绝大多数语言中 都是通过加锁等线程同步方案来解决这一困难问题