linux消息队列服务,Linux进程间通信-消息队列(mqueue)深入理解

2023-05-16

前面两篇文章分解介绍了匿名管道和命名管道方式的进程间通信,本文将介绍Linux消息队列(posix)的通信机制和特点。

1、消息队列

消息队列的实现分为两种,一种为System V的消息队列,一种是Posix消息队列;这篇文章将主要围绕Posix消息队列介绍;

消息队列可以认为是一个消息链表,某个进程往一个消息队列中写入消息之前,不需要另外某个进程在该队列上等待消息的达到,这一点与管道和FIFO相反。Posix消息队列与System V消息队列的区别如下:

(1) 对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息。

(2)当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或启动一个线程,System V消息队列则不提供类似的机制。

2、消息队列的基本操作

2.1 打开一个消息队列

#include   

typedef int mqd_t;

mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);

返回: 成功时为消息队列描述字,出错时为-1。

功能: 创建一个新的消息队列或打开一个已存在的消息的队列。

2.2 关闭一个消息队列

#include   

int mq_close(mqd_t mqdes);

返回: 成功时为0,出错时为-1。

功能: 关闭已打开的消息队列。

2.3 删除一个消息队列

#include   

int mq_unlink(const char *name)

返回: 成功时为0,出错时为-1

功能: 从系统中删除消息队列。

这三个函数操作的代码如下:

#include

#include

#include

#include

#include

#include

int main(int argc, char* argv[])

{

int flag = O_RDWR | O_CREAT | O_EXCL;

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqd_t mqid = mq_open("/mq_test", flag, mode,NULL);

if (mqid == -1)

{

printf("mqueue create failed!\n");

return 1;

}

else

{

printf("mqueue create success!\n");

}

mq_close(mqid);  return 0;

}

#include

#include

int main(int argc, char* argv[])

{

mq_unlink("/mq_test");

return 0;

}

注意:编译posix mqueue时,要连接运行时库(runtime library),既-lrt选项,运行结果如下:

172793886406bbf5d2d4bd3aa0e4cdd9.png

关于mqueue更多详细内容可以使用:man mq_overview命令查看,里面有一条需要注意的是,Linux下的Posix消息队列是在vfs中创建的,可以用

mount -t mqueue none /dev/mqueue

将消息队列挂在在/dev/mqueue目录下,便于查看。

2.4 mq_close()和mq_unlink()

mq_close()的功能是关闭消息队列的文件描述符,但消息队列并不从系统中删除,要删除一个消息队列,必须调用mq_unlink();这与文件系统的unlink()机制是一样的。

3、消息队列的属性

#include   

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);

均返回:成功时为0, 出错时为-1

每个消息队列有四个属性:

struct mq_attr

{

long mq_flags;       /* message queue flag : 0, O_NONBLOCK */

long mq_maxmsg;  /* max number of messages allowed on queue*/

long mq_msgsize;  /* max size of a message (in bytes)*/

long mq_curmsgs;  /* number of messages currently on queue */

};

4、消息收发

#include   

int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

返回:成功时为0,出错为-1

ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);

返回:成功时为消息中的字节数,出错为-1

mqsend代码如下:

#include

#include

#include

#include

#include

#include

int main(int argc, char* argv[])

{

int flag = O_RDWR;

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);

if (mqid == -1)

{

printf("open mqueue failed!\n");

return 1;

}

char *buf = "hello, i am sender!";

mq_send(mqid,buf,strlen(buf),20);

mq_close(mqid);

return 0;

}

mqrecv代码如下:

#include

#include

#include

#include

#include

#include

int main(int argc, char* argv[])

{

int flag = O_RDWR;

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);

if (mqid == -1)

{

printf("open mqueue failed!\n");

return 1;

}

struct mq_attr attr;

mq_getattr(mqid,&attr);

char buf[256] = {0};

int priority = 0;

mq_receive(mqid,buf,attr.mq_msgsize,&priority);

printf("%s\n",buf);

mq_close(mqid);

return 0;

}

运行结果如下:

70b0ae317b037d1de753dec0f15f104b.png

首先我们运行三次send,然后运行四次recv,可见recv的前三次是可以收到消息队列里的三个消息的,当运行第四次的时,系统消息队列里为空,recv就会阻塞;关于非阻塞式mqueue见下文。

5、mq_notify函数

如前文介绍,poxis消息队列运行异步通知,以告知何时有一个消息放置到某个空消息队列中,这种通知有两种方式可以选择:

(1)产生一个信号

(2)创建一个线程来执行一个指定的函数

这种通知通过mq_notify() 函数建立。该函数为指定的消息消息队列建立或删除异步事件通知,

#include

int mq_notify(mqd_t mqdes, const struct sigevent* notification);

(1)如果notification参数为非空,那么当前进程希望在有一个消息到达所指定的先前为空的对列时得到通知。

(2)如果notification参数为空,而且当前进程被注册为接收指定队列的通知,那么已存在的注册将被撤销。

(3)任意时刻只有一个进程可以被注册为接收某个给定队列的通知。

(4)当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。即说明,在mq_receive调用中的阻塞比任何通知的注册都优先。

(5)当前通知被发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册。

sigevent结构如下:

union sigval{

int    sival_int;          /*integer value*/

void    *sival_ptr;        /*pointer value*/

};

struct sigevent{

int    sigev_notify;      /*SIGEV_{NONE, SIGNAL, THREAD}*/

int    sigev_signo;        /*signal number if SIGEV_SIGNAL*/

union sigval    sigev_value;

void    (*sigev_notify_function)(union sigval);

pthread_attr_t  *sigev_notify_attributes;

};

5.1 mq_notify() 使用信号处理程序

一个正确的使用非阻塞mq_receive的信号通知的例子:

#include

#include

#include

#include

#include

#include

#include

#include

void sig_usr1(int );

volatile sig_atomic_t mqflag;

int main(int argc, char* argv[])

{

mqd_t mqid = 0;

void *buff;

struct mq_attr attr;

struct sigevent sigev;

sigset_t zeromask,newmask,oldmask;

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);  // 非阻塞式打开mqueue

mq_getattr(mqid,&attr);

buff = malloc(attr.mq_msgsize);

sigemptyset(&zeromask);

sigemptyset(&newmask);

sigemptyset(&oldmask);

sigaddset(&newmask,SIGUSR1);    // 初始化信号集

signal(SIGUSR1,sig_usr1);      // 信号处理程序

sigev.sigev_notify = SIGEV_SIGNAL;

sigev.sigev_signo = SIGUSR1;

int n = mq_notify(mqid,&sigev);  // 启用通知

for (;;)

{

sigprocmask(SIG_BLOCK,&newmask,&oldmask);

while(mqflag == 0)

sigsuspend(&zeromask);

mqflag = 0;

ssize_t n;

mq_notify(mqid, &sigev);    // 重新注册

while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)

printf("SIGUSR1 received, read %ld bytes.\n",(long)n);  //读取消息

if(errno != EAGAIN)

printf("mq_receive error\n");

sigprocmask(SIG_UNBLOCK,&newmask,NULL);

}

return 0;

}

void sig_usr1(int signo)

{

mqflag = 1;

}

��里为什么使用的是非阻塞式mq_receive,为什么不在信号处理程序中打印接收到的字符请参阅《unp 第二卷》

5.2 mq_notify() 使用线程处理程序

异步事件通知的另一种方式是把sigev_notify设置成SIGEV_THREAD,这会创建一个新线程,该线程调用由sigev_notify_function指定的函数,所用的参数由sigev_value指定,新线程的属性由sigev_notify_attributes指定,要指定线程的默认属性的话,传空指针。新线程是作为脱离线程创建的。

#include

#include

#include

#include

#include

#include

#include

#include

#include

mqd_t mqid = 0;

struct mq_attr attr;

struct sigevent sigev;

static void notify_thread(union sigval);

int main(int argc, char* argv[])

{

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);

mq_getattr(mqid,&attr);

sigev.sigev_notify = SIGEV_THREAD;

sigev.sigev_notify_function = notify_thread;

sigev.sigev_value.sival_ptr = NULL;

sigev.sigev_notify_attributes = NULL;

int n = mq_notify(mqid,&sigev);

for (;;)

pause();

return 0;

}

static void notify_thread(union sigval arg)

{

ssize_t n;

char* buff;

printf("notify_thread_started!\n");

buff = malloc(attr.mq_msgsize);

mq_notify(mqid, &sigev);

while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)

printf("SIGUSR1 received, read %ld bytes.\n",(long)n);

if(errno != EAGAIN)

printf("mq_receive error\n");

free(buff);

pthread_exit(0);

}

0b1331709591d260c1c78e86d0c51c18.png

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

linux消息队列服务,Linux进程间通信-消息队列(mqueue)深入理解 的相关文章

  • 无法在 Linux 的 NetBeans 中编译 C++ 和 OpenGL (GLFW) 的简单源代码

    我开始学习 OpenGL glfw 我从教程中复制源代码并尝试编译它 但出现了错误 我想我已经正确安装了所有头文件 glm glfw 等 这是我的来源 我没有在头文件中使用这些字符 include iostream include stdi
  • 段错误...关于你好世界

    这段代码非常简单 但我在 x86 64 Linux 系统上遇到了段错误 这让我很烦恼 刚开始接触asm 请耐心等待 与 NASM 组装nasm f elf64 test asm 与连接ld o test test o SECTION tex
  • Pthreads - 高内存使用率

    我正在用 C 编写一些东西 在 256Mb 系统上的 Linux 中创建大量 Pthread 我通常有 200Mb 的免费空间 当我使用少量线程运行该程序时 它可以工作 但是一旦我让它创建大约 100 个线程 它就会出现错误 因为系统内存不
  • 何时用引号将 shell 变量括起来?

    我应该或不应该在 shell 脚本中用引号括住变量吗 例如 下列说法正确的是 xdg open URL eq 2 or xdg open URL eq 2 如果是这样 为什么 一般规则 如果它可以为空或包含空格 或实际上任何空格 或特殊字符
  • 如何通过 makefile 在 Linux 上安装程序? [复制]

    这个问题在这里已经有答案了 可能的重复 Linux Unix make install 应该包含什么 https stackoverflow com questions 528399 what should linux unix make
  • PIL 的 Image.show() 带来*两个*不同的查看器

    在 python shell 中处理图像时 我使用 image show 其中 image 是 Image 的实例 很久以前什么也没发生 但在定义了一个名为 xv 的 Mirage 符号链接后 我很高兴 最近几天 show 将显示 Imag
  • 任何退出 bash 脚本但不退出终端的方法

    当我使用exitshell 脚本中的命令 该脚本将终止终端 提示符 有什么方法可以终止脚本然后停留在终端中吗 我的剧本run sh预计通过直接获取或从另一个脚本获取来执行 编辑 更具体地说 有两个脚本run2 sh as run sh ec
  • 如何在C(Linux utf8终端)中打印“盒子抽屉”Unicode字符?

    我正在尝试显示 方框图范围 2500 257F 中的 Unicode 字符 它应该是标准 utf8 Unicode 标准 版本 6 2 我根本做不到 我首先尝试使用旧的 ASCII 字符 但 Linux 终端以 utf8 显示 并且没有显示
  • 使用脚本检查 git 分支是否领先于另一个分支

    I have branch1 and branch2我想要某种 git branch1 isahead branch2 这将显示如果branch1已承诺branch2没有 也可能指定这些提交 我无法检查差异原因branch2 is在之前br
  • Vagrant 遇到问题 - “404 - 未找到”

    我正在尝试使用 Vagrant 制作一个 LAMP 盒子 有人告诉我它使用起来非常简单 我对网络和虚拟机完全陌生 对 Linux Ubuntu 的经验也很少 我目前已尝试按照官方文档页面上的教程进行操作 http docs vagrantu
  • 在 Linux 中重新启动时,新创建的文件变为 0 kb(数据被覆盖为空)

    我遇到了一个奇怪的问题 这让我发疯 当前的任务是在 root 用户第一次登录时启动一组文件 并在同一用户第二次登录时启动另一组文件 我决定使用 profile 和 bashrc 文件 并在第一次登录期间发生的任务结束时重新加载 bashrc
  • bash 将输出重定向到文件,但结果不完整

    重定向命令输出的问题已经被问过很多次了 但是我有一个奇怪的行为 我使用的是 bash shell debian 版本 4 3 30 1 release 并尝试将输出重定向到文件 但并非所有内容都记录在文件中 我尝试运行的 bin 文件是 l
  • 为什么 OS X 和 Linux 之间的 UTF-8 文本排序顺序不同?

    我有一个包含 UTF 8 编码文本行的文本文件 mac os x cat unsorted txt foo foo 津 如果它有助于重现问题 这里是文件中确切字节的校验和和转储 以及如何自己生成文件 在 Linux 上 使用base64 d
  • BASH:输入期间按 Ctrl+C 会中断当前终端

    我的 Bash 版本是 GNU bash version 4 3 11 1 release x86 64 pc linux gnu 我有一段这样的代码 while true do echo n Set password read s pas
  • Crontab 每 5 分钟一次 [关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 我如何告诉 crontab 每 5 分钟运行一次 但从每小时的第二分钟开始 换句话说 我想在以下时间执行我的脚本minute 5 2 例如 我的脚本应
  • “git add”返回“致命:外部存储库”错误

    我刚刚进入 git 的奇妙世界 我必须提交我对程序所做的一系列更改 位于名为的目录中 var www myapp 我创建了一个新目录 home mylogin gitclone 从这个目录中 我做了一个git clone针对公共回购 我能够
  • 正则表达式删除块注释也删除 * 选择器

    我正在尝试使用 bash 从 css 文件中删除所有块注释 我有以下 sed 命令的正则表达式 sed r s w s w d 这可以很好地去除块注释 例如 This is a comment this is another comment
  • LINUX:如何锁定内存中进程的页面

    我有一个 LINUX 服务器 运行一个具有大量内存占用的进程 某种数据库引擎 该进程分配的内存太大 需要将其中一部分换出 换出 我想做的是将所有其他进程 或正在运行的进程的子集 的内存页面锁定在内存中 以便只有数据库进程的页面被换出 例如
  • awk 在循环中使用时不打印任何内容[重复]

    这个问题在这里已经有答案了 我有一堆使用 file 1 a 1 txt 格式的文件 如下所示 A 1 B 2 C 3 D 4 并使用以下命令添加包含每个文件名称的新列 awk print FILENAME NF t 0 file 1 a 1
  • 相当于Linux中的导入库

    在 Windows C 中 当您想要链接 DLL 时 您必须提供导入库 但是在 GNU 构建系统中 当您想要链接 so 文件 相当于 dll 时 您就不需要链接 为什么是这样 是否有等效的 Windows 导入库 注意 我不会谈论在 Win

随机推荐