概述
ZeroMQ的订阅发布模式是一种单向的数据发布,当客户端向服务端订阅消息之后,服务端便会将产生的消息源源不断的推送给订阅者。
一个发布者,多个订阅者的关系(1:n),当发布者数据变化时发布数据,所有订阅者均能够接收到数据并处理,这就是发布/订阅模式。
发布者使用PUB套接字将消息发送到队列中,订阅者使用SUB套接字从队列中源源不断的接收消息。新的订阅者可以随时加入,但之前的消息是无法收到的。已有的订阅者可以随时退出。订阅者还可以增加“过滤器”用来有选择性的接收消息。
代码
pub.cpp
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include "zmq.h"
int main()
{
void* context = zmq_ctx_new();
assert(context != NULL);
void* socket = zmq_socket(context, ZMQ_PUB);
assert(socket != NULL);
int ret = zmq_bind(socket, "tcp://*:8080");
assert(ret == 0);
int i = 0;
while(1)
{
char szBuf[1024] = {0};
snprintf(szBuf, sizeof(szBuf), "server i=%d", i);
ret = zmq_send(socket, szBuf, strlen(szBuf) + 1, 0);
i++;
}
zmq_close (socket);
zmq_ctx_destroy (context);
return 0;
}
sub.cpp
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include "zmq.h"
#include <thread>
using namespace std;
#define TRUE 1
void Recv(void* arg)
{
while(TRUE)
{
void* socket = arg;
printf("sub while\n");
char szBuf[1024] = {0};
int ret = zmq_recv(socket, szBuf, sizeof(szBuf)-1,0);
if (ret > 0)
{
printf("Recv:%s\n", szBuf);
}
}
}
void Recv2(void* arg)
{
while(TRUE)
{
void* socket = arg;
printf("sub while\n");
char szBuf[1024] = {0};
int ret = zmq_recv(socket, szBuf, sizeof(szBuf) - 1, 0);
if (ret > 0)
{
printf("Recv2:%s\n", szBuf);
}
}
}
int main()
{
printf("Hello zeromq!\n");
void* context = zmq_ctx_new();
assert(context != NULL);
void* socket = zmq_socket(context, ZMQ_SUB);
assert(socket != NULL);
int ret = zmq_connect(socket, "tcp://localhost:8080");
assert(ret == 0);
ret = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
assert(ret == 0);
thread t1(Recv,socket);
thread t2(Recv2,socket);
t1.join();
t2.join();
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)