zeromq java 教程_ZeroMQ入门

2023-05-16

概述

ZeroMQ(也称为 ØMQ,0MQ 或 zmq)是一个可嵌入的网络通讯库(对 Socket 进行了封装)。 它提供了携带跨越多种传输协议(如:进程内,进程间,TCP 和多播)的原子消息的 sockets 。 有了ZeroMQ,我们可以通过发布-订阅、任务分发、和请求-回复等模式来建立 N-N 的 socket 连接。 ZeroMQ 的异步 I / O 模型为我们提供可扩展的基于异步消息处理任务的多核应用程序。 它有一系列语言API(几乎囊括所有编程语言),并能够在大多数操作系统上运行。

传统的 TCP Socket 的连接是1对1的,可以认为“1个 socket = 1个连接”,每一个线程独立维护一个 socket 。但是 ZMQ 摒弃了这种1对1的模式,ZMQ的 Socket 可以很轻松地实现1对N和N对N的连接模式,一个 ZMQ 的 socket 可以自动地维护一组连接,用户无法操作这些连接,用户只能操作套接字而不是连接本身。所以说在 ZMQ 的世界里,连接是私有的。

三种基本模型

ZMQ 提供了三种基本的通信模型,分别是 Request-Reply 、Publish-Subscribe 和 Parallel Pipeline ,接下来举例说明三种模型并给出相应的代码实现。

Request-Reply(请求-回复)

以 “Hello World” 为例。客户端发起请求,并等待服务端回应请求。客户端发送一个简单的 “Hello”,服务端则回应一个 “World”。可以有 N 个客户端,一个服务端,因此是 1-N 连接。

c5e191d58ae5

Request-Reply

服务端代码如下:

hwserver.java

import org.zeromq.ZMQ;

public class hwserver {

public static void main(String[] args) throws InterruptedException {

ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket responder = context.socket(ZMQ.REP);

responder.bind("tcp://*:5555");

while (!Thread.currentThread().isInterrupted()) {

byte[] request = responder.recv(0);

System.out.println("Received" + new String(request));

Thread.sleep(1000);

String reply = "World";

responder.send(reply.getBytes(),0);

}

responder.close();

context.term();

}

}

hwserver.py

import time

import zmq

context = zmq.Context()

socket = context.socket(zmq.REP)

socket.bind("tcp://*:5555")

while True:

message = socket.recv()

print("Received request: %s" % message)

# Do some 'work'

time.sleep(1)

socket.send(b"World")

客户端代码如下:

hwclient.java

import org.zeromq.ZMQ;

public class hwclient {

public static void main(String[] args) {

ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket requester = context.socket(ZMQ.REQ);

requester.connect("tcp://localhost:5555");

for (int requestNbr = 0; requestNbr != 10; requestNbr++) {

String request = "Hello";

System.out.println("Sending Hello" + requestNbr);

requester.send(request.getBytes(),0);

byte[] reply = requester.recv(0);

System.out.println("Reveived " + new String(reply) + " " + requestNbr);

}

requester.close();

context.term();

}

}

hwclient.py

import zmq

context = zmq.Context()

print("Connecting to hello world server...")

socket = context.socket(zmq.REQ)

socket.connect("tcp://localhost:5555")

for request in range(10):

print("Sending request %s ..." % request)

socket.send(b"Hello")

message = socket.recv()

print("Received reply %s [ %s ]" % (request,message))

从以上的过程,我们可以了解到使用 ZMQ 写基本的程序的方法,需要注意的是:

服务端和客户端无论谁先启动,效果是相同的,这点不同于 Socket。

在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。

服务端收到信息后,会发送一个 “World” 给客户端。值得注意的是一定是客户端连接上来以后,发消息给服务端,服务端接受消息然后响应客户端,这样一问一答。

ZMQ 的通信单元是消息,它只知道消息的长度,并不关心消息格式。因此,你可以使用任何你觉得好用的数据格式,如 Xml、Protocol Buffers、Thrift、json 等等。

Publish-Subscribe(发布-订阅)

下面以一个天气预报的例子来介绍该模式。

服务端不断地更新各个城市的天气,客户端可以订阅自己感兴趣(通过一个过滤器)的城市的天气信息。

c5e191d58ae5

Publish-Subscribe

服务端代码如下:

wuserver.java

import org.zeromq.ZMQ;

import java.util.Random;

public class wuserver {

public static void main(String[] args) {

ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket publisher = context.socket(ZMQ.PUB);

publisher.bind("tcp://*:5556");

publisher.bind("icp://weather");

Random srandom = new Random(System.currentTimeMillis());

while (!Thread.currentThread().isInterrupted()) {

int zipcode, temperature, relhumidity;

zipcode = 10000 + srandom.nextInt(10000);

temperature = srandom.nextInt(215) - 80 + 1;

relhumidity = srandom.nextInt(50) + 10 + 1;

String update = String.format("%05d %d %d", zipcode, temperature, relhumidity);

}

publisher.close();

context.term();

}

}

wuserver.py

from random import randrange

import zmq

context = zmq.Context()

socket = context.socket(zmq.PUB)

socket.bind("tcp://*:5556")

while True:

zipcode = randrange(1, 100000)

temperature = randrange(-80, 135)

relhumidity = randrange(10, 60)

socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))

客户端代码如下:

wuclient.java

import org.zeromq.ZMQ;

import java.util.StringTokenizer;

public class wuclient {

public static void main(String[] args) {

ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket suscriber = context.socket(ZMQ.SUB);

suscriber.connect("tcp://localhost:5556");

String filter = (args.length > 0) ? args[0] : "10001";

suscriber.suscribe(filter.getBytes()); //过滤条件

int update_nbr;

long total_temp = 0;

for (update_nbr = 0; update_nbr < 100; update_nbr++) {

String string = suscriber.recvStr(0).trim();

StringTokenizer sscanf = new StringTokenizer(string, " ");

int zipcode = Integer.valueOf(sscanf.nextToken());

int temperature = Integer.valueOf(sscanf.nextToken());

int relhumidity = Integer.valueOf(sscanf.nextToken());

total_temp += temperature;

}

System.out.println("Average temperature for zipcode '" + filter + "' was " + (int) (total_temp / update_nbr));

suscriber.close();

context.term();

}

}

wuclient.py

import sys

import zmq

context = zmq.Context()

socket = context.socket(zmq.SUB)

print("Collecting updates from weather server...")

socket.connect("tcp://localhost:5556")

zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"

if isinstance(zip_filter, bytes):

zip_filter = zip_filter.decode('ascii')

socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

total_temp = 0

for update_nbr in range(5):

string = socket.recv_string()

zipcode, temperature, relhumidity = string.split()

total_temp += int(temperature)

print("Average temperature for zipcode '%s' was %dF" % (zip_filter, total_temp / (update_nbr + 1)))

服务器端生成随机数 zipcode、temperature、relhumidity 分别代表城市代码、温度值和湿度值,然后不断地广播信息。而客户端通过设置过滤参数,接受特定城市代码的信息,最终将收集到的温度求平均值。

需要注意的是:

与 “Hello World” 例子不同的是,Socket 的类型变成 ZMQ.PUB 和 ZMQ.SUB 。

客户端需要设置一个过滤条件,接收自己感兴趣的消息。

发布者一直不断地发布新消息,如果中途有订阅者退出,其他均不受影响。当订阅者再连接上来的时候,收到的就是后来发送的消息了。这样比较晚加入的或者是中途离开的订阅者必然会丢失掉一部分信息。这是该模式的一个问题,即所谓的 "Slow joiner" 。

Parallel Pipeline

Parallel Pipeline 处理模式如下:

ventilator 分发任务到各个 worker

每个 worker 执行分配到的任务

最后由 sink 收集从 worker 发来的结果

c5e191d58ae5

Parallel Pipeline

taskvent.java

import org.zeromq.ZMQ;

import java.io.IOException;

import java.util.Random;

import java.util.StringTokenizer;

public class taskvent {

public static void main(String[] args) throws IOException {

ZMQ.Context context = new ZMQ.context(1);

ZMQ.Socket sender = context.socket(ZMQ.PUSH);

sender.bind("tcp://*:5557");

ZMQ.Socket sink = context.socket(ZMQ.PUSH);

sink.connect("tcp://localhost:5558");

System.out.println("Please enter when the workers are ready: ");

System.in.read();

System.out.println("Sending task to workes\n");

sink.send("0",0);

Random srandom = new Random(System.currentTimeMillis());

int task_nbr;

int total_msec = 0;

for (task_nbr = 0; task_nbr < 100; task_nbr++) {

int workload;

workload = srandom.nextInt(100) + 1;

total_msec += workload;

System.out.print(workload + ".");

String string = String.format("%d", workload);

sender.send(string, 0);

}

System.out.println("Total expected cost: " + total_msec + " msec");

sink.close();

sender.close();

context.term();

}

}

taskvent.py

import zmq

import time

import random

try:

raw_input

except NameError:

raw_input = input

context = zmq.Context()

sender = context.socket(zmq.PUSH)

sender.bind("tcp://*:5557")

sink = context.socket(zmq.PUSH)

sink.connect("tcp://localhost:5558")

print("Please enter when workers are ready: ")

_ = raw_input()

print("Sending tasks to workers...")

sink.send(b'0')

random.seed()

total_msec = 0

for task_nbr in range(100):

workload = random.randint(1, 100)

total_msec += workload

sender.send_string(u'%i' % workload)

print("Total expected cost: %s msec" % total_msec)

time.sleep(1)

taskwork.java

import org.zeromq.ZMQ;

public class taskwork {

public static void main(String[] args) {

ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket receiver = context.socket(ZMQ.PULL);

receiver.connect("tcp://localhost:5557");

ZMQ.Socket sender = context.socket(ZMQ.PUSH);

sender.connect("tcp://localhost:5558");

while (!Thread.currentThread().isInterrupted()) {

String string = receiver.recv(0).trim();

Long mesc = Long.parseLong(string);

System.out.flush();

System.out.print(string + ".");

Sleep(mesc);

sender.send("".getBytes(), 0);

}

sender.close();

receiver.close();

context.term();

}

}

taskwork.py

import zmq

import time

import sys

context = zmq.Context()

receiver = context.socket(zmq.PULL)

receiver.connect("tcp://localhost:5557")

sender = context.socket(zmq.PUSH)

sender.connect("tcp://localhost:5558")

while True:

s = receiver.recv()

sys.stdout.write('.')

sys.stdout.flush()

time.sleep(int(s) * 0.001)

sender.send(b'')

tasksink.java

import org.zeromq.ZMQ;

public class tasksink {

public static void main(String[] args) {

ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket receiver = context.socket(ZMQ.PULL);

receiver.bind("tcp://*:5558");

String string = new String(receiver.recv(0));

long tstart = System.currentTimeMillis();

int task_nbr;

int total_mesc = 0;

for (task_nbr = 0; task_nbr < 100; task_nbr++) {

string = new String(receiver.recv(0).trim());

if ((task_nbr / 10) * 10 == task_nbr) {

System.out.print(":");

} else {

System.out.print(".");

}

}

long tend = System.currentTimeMillis();

System.out.println("\nTotal elapsed time: " + (tend - tstart) + "msec");

receiver.close();

context.term();

}

}

tasksink.py

import time

import zmq

import sys

context = zmq.Context()

receiver = context.socket(zmq.PULL)

receiver.bind("tcp://*:5558")

s = receiver.recv()

tstart = time.time()

for task_nbr in range(1, 100):

s = receiver.recv()

if task_nbr % 10 == 0:

sys.stdout.write(':')

else:

sys.stdout.write('.')

sys.stdout.flush()

tend = time.time()

print("Total elapsed time: %d msec" % ((tend - tstart) * 1000))

以下两点需要注意:

ventilator 使用 ZMQ.PUSH 来分发任务;worker 用 ZMQ.PULL 来接收任务,用 ZMQ.PUSH 来发送结果;sink 用 ZMQ.PULL 来接收 worker 发来的结果。

ventilator 既是服务端,也是客户端(此时服务端是 sink);worker 在两个过程中都是客户端;sink 也一直都是服务端。

参考资料

c5e191d58ae5

linbingdong.com

欢迎进入博客 :linbingdong.com 获取最新文章哦~

c5e191d58ae5

FullStackPlan

欢迎关注公众号: FullStackPlan 获取更多干货哦~

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

zeromq java 教程_ZeroMQ入门 的相关文章

  • Java new Date() 打印

    刚刚学习 Java 我知道这可能听起来很愚蠢 但我不得不问 System out print new Date 我知道参数中的任何内容都会转换为字符串 最终值是 new Date 返回对 Date 对象的引用 那么它是如何打印这个的呢 Mo
  • 如何默认将 Maven 插件附加到阶段?

    我有一个 Maven 插件应该在编译阶段运行 所以在项目中consumes我的插件 我必须做这样的事情
  • Java中反射是如何实现的?

    Java 7 语言规范很早就指出 本规范没有详细描述反射 我只是想知道 反射在Java中是如何实现的 我不是问它是如何使用的 我知道可能没有我正在寻找的具体答案 但任何信息将不胜感激 我在 Stackoverflow 上发现了这个 关于 C
  • Play框架运行应用程序问题

    每当我尝试运行使用以下命令创建的新 Web 应用程序时 我都会收到以下错误Play http www playframework org Error occurred during initialization of VM Could no
  • 如何找到给定字符串的最长重复子串

    我是java新手 我被分配寻找字符串的最长子字符串 我在网上研究 似乎解决这个问题的好方法是实现后缀树 请告诉我如何做到这一点或者您是否有任何其他解决方案 请记住 这应该是在 Java 知识水平较低的情况下完成的 提前致谢 附 测试仪字符串
  • JavaMail 只获取新邮件

    我想知道是否有一种方法可以在javamail中只获取新消息 例如 在初始加载时 获取收件箱中的所有消息并存储它们 然后 每当应用程序再次加载时 仅获取新消息 而不是再次重新加载它们 javamail 可以做到这一点吗 它是如何工作的 一些背
  • 如何将 pfx 文件转换为 jks,然后通过使用 wsdl 生成的类来使用它来签署传出的肥皂请求

    我正在寻找一个代码示例 该示例演示如何使用 PFX 证书通过 SSL 访问安全 Web 服务 我有证书及其密码 我首先使用下面提到的命令创建一个 KeyStore 实例 keytool importkeystore destkeystore
  • 如何在 javadoc 中使用“<”和“>”而不进行格式化?

    如果我写
  • Eclipse Java 远程调试器通过 VPN 速度极慢

    我有时被迫离开办公室工作 这意味着我需要通过 VPN 进入我的实验室 我注意到在这种情况下使用 Eclipse 进行远程调试速度非常慢 速度慢到调试器需要 5 7 分钟才能连接到远程 jvm 连接后 每次单步执行断点 行可能需要 20 30
  • Java执行器服务线程池[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 如果我使用 Executor 框架在
  • 如何从终端运行处理应用程序

    我目前正在使用加工 http processing org对于一个小项目 但是我不喜欢它附带的文本编辑器 我使用 vim 编写所有代码 我找到了 pde 文件的位置 并且我一直在从 vim 中编辑它们 然后重新打开它们并运行它们 重新加载脚
  • Android 中麦克风的后台访问

    是否可以通过 Android 手机上的后台应用程序 服务 持续监控麦克风 我想做的一些想法 不断聆听背景中的声音信号 收到 有趣的 音频信号后 执行一些网络操作 如果前台应用程序需要的话 后台应用程序必须能够智能地放弃对麦克风的访问 除非可
  • 如何从指定日期获取上周五的日期? [复制]

    这个问题在这里已经有答案了 如何找出上一个 上一个 星期五 或指定日期的任何其他日期的日期 public getDateOnDay Date date String dayName 我不会给出答案 先自己尝试一下 但是 也许这些提示可以帮助
  • 编译器抱怨“缺少返回语句”,即使不可能达到缺少返回语句的条件

    在下面的方法中 编译器抱怨缺少退货声明即使该方法只有一条路径 并且它包含一个return陈述 抑制错误需要另一个return陈述 public int foo if true return 5 鉴于Java编译器可以识别无限循环 https
  • 在 Maven 依赖项中指定 jar 和 test-jar 类型

    我有一个名为 commons 的项目 其中包含运行时和测试的常见内容 在主项目中 我添加了公共资源的依赖项
  • 有没有办法为Java的字符集名称添加别名

    我收到一个异常 埋藏在第 3 方库中 消息如下 java io UnsupportedEncodingException BIG 5 我认为发生这种情况是因为 Java 没有定义这个名称java nio charset Charset Ch
  • 使用 JMF 创建 RTP 流时出现问题

    我正处于一个项目的早期阶段 需要使用 RTP 广播DataStream创建自MediaLocation 我正在遵循一些示例代码 该代码目前在rptManager initalize localAddress 出现错误 无法打开本地数据端口
  • JGit 检查分支是否已签出

    我正在使用 JGit 开发一个项目 我设法删除了一个分支 但我还想检查该分支是否已签出 我发现了一个变量CheckoutCommand但它是私有的 private boolean isCheckoutIndex return startCo
  • 如何修复 JNLP 应用程序中的“缺少代码库、权限和应用程序名称清单属性”?

    随着最近的 Java 更新 许多人都遇到了缺少 Java Web Start 应用程序的问题Codebase Permissions and Application name体现属性 尽管有资源可以帮助您完成此任务 但我找不到任何资源综合的
  • 节拍匹配算法

    我最近开始尝试创建一个移动应用程序 iOS Android 它将自动击败比赛 http en wikipedia org wiki Beatmatching http en wikipedia org wiki Beatmatching 两

随机推荐