多生产者多消费者问题的无锁队列实现

2023-05-16

背景

代码根据论文 Implementing Lock-Free Queues 复现。
背景知识博客:左耳朵耗子博客 https://coolshell.cn/articles/8239.html
代码地址:https://github.com/zxwsbg/lock-free-queue
b站链接:https://www.bilibili.com/video/BV1q54y1Y71W/

背景介绍

生产者和消费者问题是操作系统中老生常谈的一个问题。针对不同的场景(单生产者-单消费者、单生产者-多消费者、多生产者-多消费者)有着不同的解决方法,本文研究的是多生产者多消费者的场景。

解决多生产者多消费者的常见方法是互斥锁+信号量,而互斥锁开销过大,在高并发常见下会有很大的性能影响。而采用无锁队列的方法可以避免锁的使用来达到减小开销的目的。

无锁队列的实现的要点就是 CAS 操作,后续在考虑到内存重新分配的时候会造成安全性问题(ABA问题),故引入了 DoubleCAS 或者其他内存分配机制来解决问题。
在这里插入图片描述

无锁队列实现

数据结构介绍

按照论文中提到的两种方法,选了一种较优的方法实现。
以单向链表的形式实现这个队列,每个节点的数据结构为

class QueueNode {
  public:
    int        val;
    QueueNode* next;
    QueueNode(int val) : val(val) {
        next = NULL;
    }
};

而对于一个无锁队列对象而言,包含以下部分

  • 数据:队列的最大长度、链表头结点、链表尾节点
  • 函数:初始化(构造函数)、入队、出队
class LockFreeQueue {
  public:
    LockFreeQueue();
    bool enqueue(int val);
    int  dequeue();
    ~LockFreeQueue();

  private:
    int        queue_size; // 暂时未使用,论文里并没有提及最大资源数
    QueueNode* tail;
    QueueNode* head;
};

其数据结构包含关系如下图所示
在这里插入图片描述

入队操作

底下我们以入队操作为例

bool LockFreeQueue::enqueue(int val)
{
    QueueNode* cur_node;
    QueueNode* add_node = new QueueNode(val);
    while (1) {
        cur_node = tail;
        if (__sync_bool_compare_and_swap(&(cur_node->next), NULL, add_node)) {
            break;
        }
        else {
            __sync_bool_compare_and_swap(&tail, cur_node, cur_node->next);
        }
    }
    __sync_bool_compare_and_swap(&tail, cur_node, add_node);
    return 1;
}

这里的 __sync_bool_compare_and_swap就是 GCC 提供的 CAS 算法。
第7行的CAS首先尝试将新加的节点放到链表末尾,这里有两种结果:

  1. 加入成功,那么退出循环进入倒数第三行
  2. 加入不成功,说明这时候有其他线程抢先往里面插入了一个节点,那么就把当前节点的位置更新为尾节点,再次进入循环直到能正确更新

到了最后一个CAS的时候,只需要进行一次置尾操作,并不需要循环,原因是:如果当前线程将节点已经加进去了的话,那么其他所有线程的操作都会失败,只有当前线程更新尾节点完成后,其他线程的第二个CAS操作才能成功。

这里有一个小trick,明明第6行每次循环时都会更新节点,为什么还需要第二个CAS操作呢?因为在极端情况下,一个线程已经完成了增加节点操作,在置尾操作(第三个CAS)之前突然挂了,这时候就导致其他所有线程全部不能更新。

在左耳朵耗子的博客中,对于上述问题的解决方法描述是选用了《Implementing Lock-Free Queues》论文中提到的第二个方法,该方法采用的是一开始在head,然后不断找next直到找尾节点(通过多个线程共用fetch来减少开销)。本文以论文中提到的第一个方法为例进行描述(因为我觉得它更优雅)。

出队操作

与入队操作类似。

int LockFreeQueue::dequeue()
{
    QueueNode* cur_node;
    int        val;
    while (1) {
        cur_node = head;
        if (cur_node->next == NULL) {
            return -1;
        }

        if (__sync_bool_compare_and_swap(&head, cur_node, cur_node->next)) {
            break;
        }
    }
    val = cur_node->next->val;
    delete cur_node;
    return val;
}

多生产者-多消费者模型

我们采用 C++ 的 Thread 库,模拟多个生产者和多个消费者的情况。多个生产者线程向无所队列中插入数据,多个消费者从无锁队列中取出数据。
在这里插入图片描述

测试部分的代码实现在 lockfreequeue_test.cpp 文件中。以生产者为例,其代码如下所示。

int            thread_number;
int            task_number; // 每个线程需要入队/出队的资源个数
LockFreeQueue* lfq;

void produce(int offset)
{
    // 算上偏移量,保证不会出现重复
    for (int i = task_number * offset; i < task_number * (offset + 1); i++) { 
        printf("produce %d\n", i);
        lfq->enqueue(i);
    }
}
int main(int argc, char** argv)
{
    lfq = new LockFreeQueue;
    std::vector<std::thread> thread_vector1;

    for (int i = 0; i < thread_number; i++) {
        thread_vector1.push_back(std::thread(produce, i));
    }

    for (auto& thr1 : thread_vector1) {
        thr1.join();
    }
}

正确性检测

对于结果,首先做正确性检验,具体实现在 check.py 文件中,对于produce,要求之前没有生产过该资源;对于consume,要求之前生产过并且没有消费过该资源。

ABA 问题

在高并发场景下会出现该问题。如上述情况,以默认值参数运行后经常不能通过正确性检测。原因根据我的判断,是因为ABA问题:当我们做CAS的之前,如果head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了。而判断的方法也是个小 trick ,只需要将deque操作中释放内存的步骤注释掉,那么就不会出现内存重用的问题,这样做后也确实可以通过正确性测试。这个问题在论文中也提到了并给出了相应解决方法,但其需要自己实现一套内存分配系统(麻烦)。于是本文参考另一篇论文Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
,利用Double-CAS来解决此问题,每访问一个节点,都用引用计数的方式给其+1,Double-CAS则可以一次既判断原始的节点值,又判断引用计数值,保证其未被置换出去过。这一块的具体代码实现可见github项目代码的fix_aba_problem分支。

对比检验

为了对比实验,写了个几乎差不多的正常的多生产者多消费者单链表实现(互斥锁+信号量)。经过测试,时间几乎没差(并发场景不够)。


现存问题

  1. 如何模拟高并发
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多生产者多消费者问题的无锁队列实现 的相关文章

随机推荐

  • Spring XML配置文件详解

    spring配置文件是用于指导Spring工厂进行Bean生产 依赖关系注入 xff08 装配 xff09 及Bean实例分发的 34 图纸 34 Spring框架的配置文件是基于xml的 xff0c Spring强大的功能依赖于类型繁多的
  • 两行命令解决ubuntu22.04安装网易云音乐后点击图标无反应的问题

    感谢知乎用户 64 拉布 xff1a https zhuanlan zhihu com p 518108518 1 终端中输入以下命令 xff1a span class token builtin class name cd span op
  • 数据库MVCC多版本并发控制原理

    MVCC实现原理 频繁的加锁会带来什么问题 xff1f 读数据的时候没办法修改 修改数据的时候没办法读取 xff0c 极大的降低了数据库性能 数据库是如何解决加锁后的性能问题的 xff1f MVCC 多版本控制实现读取数据不用加锁 xff0
  • 学习笔记-----ButterKnife

    ButterKnife是一个专注于Android系统的View注入框架 ButterKnife bind this 一切findViewById Fragment Adapter中同样适用 xff0c ButterKnife bind th
  • 【Linux】vim 中批量添加注释

    本期主题 xff1a vim 中批量添加注释博客主页 xff1a 小峰同学分享小编的在Linux中学习到的知识和遇到的问题小编的能力有限 xff0c 出现错误希望大家不吝赐 此文主要介绍两种方法 xff1a 方法一 xff1a 块选择模式
  • Exported service does not require permission警告

    很久没写过应用了 xff0c 今天写一个Service时 xff0c 在manifest文件的 lt service gt 标签发现了这个警告 lt service android name 61 34 SendService 34 gt
  • Windows服务器高并发处理IOCP(完成端口)详细说明

    本系列里完成端口的代码在两年前就已经写好了 xff0c 但是由于许久没有写东西了 xff0c 不知该如何提笔 xff0c 所以这篇文档总是在酝酿之中 酝酿了两年之后 xff0c 终于决定开始动笔了 xff0c 但愿还不算晚 这篇文档我非常详
  • linux下如何测试端口通不通(四种方法)

    一般情况下使用 34 telnet ip port 34 判断端口通不通 接下来通过本文给大家分享四种方法测试端口通不通 xff0c 感兴趣的朋友一起学习吧 一般情况下使用 34 telnet ip port 34 判断端口通不通 xff0
  • 教你Vim编辑器,如何删除一行或者多行内容

    如何从Vim中删除行 xff1f 如何删除多行 xff1f 本文介绍在Vim编辑器中删除行的不同方法文内含长段代码可复制可往左滑 xff0c 希望对大家有帮助 xff01 安装Vim 在Ubuntu Debian中的安装方式 sudo ap
  • VBoxManage常用命令用法

    VBoxManage命令常用用法 系统环境 xff1a CentOS 7 3 x86 64 VirtualBox版本 xff1a 6 1 22 VirtualBox扩展版本 xff1a 6 1 22 增加一个新的扩展包 VBoxManage
  • Centos7安装Redis

    一 安装gcc依赖 由于 redis 是用 C 语言开发 xff0c 安装之前必先确认是否安装 gcc 环境 xff08 gcc v xff09 xff0c 如果没有安装 xff0c 执行以下命令进行安装 root 64 localhost
  • webRTC中的coturn服务安装

    目录 1 先准备一台云主机 2 安装coturn的依赖 2 1 依赖软件准备 2 1 安装依赖组件 2 2 安装coturn的持久化保存用户信息库 3 安装coturn 4 coturn配置 4 1 创建用户 4 2 配置说明 4 3 收集
  • Git配置.gitignore忽略文件

    git设置忽略文件和目录有两种方式 xff0c 一种是项目所有人员共用的的 xff0c 一种是开发自己使用的 第一种 xff0c 所有开发者共用的需要把设置设定在 gitignore该文件中 第二种 xff0c 开发者个人使用的忽略配置 x
  • 线程池实现原理

    创建线程有哪几种方式 一 继承Thread类创建线程类 xff08 1 xff09 定义Thread类的子类 xff0c 并重写该类的run方法 xff0c 该run方法的方法体就代表了线程要完成的任务 因此把run 方法称为执行体 xff
  • linux查看cpu与内存

    查看cpu 第一种方法 xff1a top命令法 1 首先执行top命令 xff1b 2 在top命令的显示界面 xff0c 按数据键1 xff0c 即可查看到当前系统中的总cpu数 xff1b 第二种方法 xff1a 通过proc文件系统
  • Element 布局组件el-row和el-col 详解

    1 背景 element的布局方式与bootstrap原理是一样的 xff0c 将网页划分成若干行 xff0c 然后每行等分为若干列 xff0c 基于这样的方式进行布局 xff0c 形象的成为栅栏布局 区别是element可将每行划分为24
  • netbeans中配置maven

    deploy 发布到远程maven库 本节默认maven库为nexus netbeans中按ctrl 43 1 xff0c 打开Project窗口 xff1b 在Project窗口中找到相关的project或module 在项目名上点击鼠标
  • 订阅关系一致

    订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic Tag必须完全一致 如果订阅关系不一致 消息消费的逻辑就会混乱 甚至导致消息丢失 本文提供订阅关系一致的正确示例代码以及订阅关系不一致的可能原因 帮
  • java之PO,VO,TO,QO,BO等

    PO persistant object 持久对象 在 o r 映射的时候出现的概念 xff0c 如果没有 o r 映射 xff0c 没有这个概念存在了 通常对应数据模型 数据库 xff0c 本身还有部分业务逻辑的处理 可以看成是与数据库中
  • 多生产者多消费者问题的无锁队列实现

    背景 代码根据论文 Implementing Lock Free Queues 复现 背景知识博客 xff1a 左耳朵耗子博客 https coolshell cn articles 8239 html 代码地址 xff1a https g