如何使用 vert.x-rx 创建反应式客户端-服务器 TCP 通信

2023-12-26

我目前正在开发一个项目,该项目需要外部系统和我将编写的应用程序(用Java)之间的TCP 通信。众所周知,使用常规 NIO 可以轻松实现这一点。然而,作为我正在开发的这个新项目的一部分,我必须使用 Vert.x 来提供 TCP 通信。请参考下图:

在右侧,我的应用程序作为 TCP 服务器运行,等待来自左侧外部系统的连接。我读过要创建 TCP 并侦听连接,您只需执行以下操作:

NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
  if (res.succeeded()) {
    System.out.println("Server is now listening!");
  } else {
    System.out.println("Failed to bind!");
  }
});

然而,我不明白的是当外部系统连接到我的应用程序并通过 TCP 发送 EchoRequestMessages 时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为 EchoRequestMessage POJO,然后将 EchoResponseMessage 编码为字节缓冲区以发送回外部系统。

我如何使用 vert.x-rx 对 EchoRequestMessage 的接收、其解码、EchoResponseMessage 的编码执行反应式编程,然后将其发送回外部系统,所有这些都在一个构建器模式类型设置中。我读过有关 Observables 和订阅的内容,但我不知道要观察什么或订阅什么。任何帮助将不胜感激。


要从套接字读取数据,您可以使用RecordParser。在套接字连接上,数据通常由换行符分隔:

RecordParser parser = RecordParser.newDelimited("\n", sock);

A RecordParser是一个 Vert.xReadStream所以它可以转化为Flowable:

FlowableHelper.toFlowable(parser)

现在如果一个EchoRequestMessage可以从创建Buffer:

public class EchoRequestMessage {
  private String message;

  public static EchoRequestMessage fromBuffer(Buffer buffer) {
    // Deserialize
  }

  public String getMessage() {
    return message;
   }
 }

And an EchoResponseMessage转换为Buffer:

public class EchoResponseMessage {
  private final String message;

  public EchoResponseMessage(String message) {
    this.message = message;
  }

  public Buffer toBuffer() {
    // Serialize;
  }
}

您可以使用 RxJava 运算符来实现回显服务器流程:

vertx.createNetServer().connectHandler(sock -> {

  RecordParser parser = RecordParser.newDelimited("\n", sock);

  FlowableHelper.toFlowable(parser)
    .map(EchoRequestMessage::fromBuffer)
    .map(echoRequestMessage -> {
      return new EchoResponseMessage(echoRequestMessage.getMessage());
    })
    .subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
      throwable.printStackTrace();
      sock.close();
    }, sock::close);

}).listen(1234);

[编辑] 如果在您的协议消息中不是行分隔而是长度前缀,那么您可以创建自定义ReadStream:

class LengthPrefixedStream implements ReadStream<Buffer> {
  final RecordParser recordParser;
  boolean prefix = false;

  private LengthPrefixedStream(ReadStream<Buffer> stream) {
    recordParser = RecordParser.newFixed(4, stream);
  }

  @Override
  public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
    recordParser.exceptionHandler(handler);
    return this;
  }

  @Override
  public ReadStream<Buffer> handler(Handler<Buffer> handler) {
    if (handler == null) {
      recordParser.handler(null);
      return this;
    }
    recordParser.handler(buffer -> {
      if (prefix) {
        prefix = false;
        recordParser.fixedSizeMode(buffer.getInt(0));
      } else {
        prefix = true;
        recordParser.fixedSizeMode(4);
        handler.handle(buffer);
      }
    });
    return this;
  }

  @Override
  public ReadStream<Buffer> pause() {
    recordParser.pause();
    return this;
  }

  @Override
  public ReadStream<Buffer> resume() {
    recordParser.resume();
    return this;
  }

  @Override
  public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
    recordParser.endHandler(endHandler);
    return this;
  }
}

并将其转换为Flowable:

FlowableHelper.toFlowable(new LengthPrefixedStream(sock))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 vert.x-rx 创建反应式客户端-服务器 TCP 通信 的相关文章

  • Grails 3.x bootRun 失败

    我正在尝试在 grails 3 1 11 中运行一个项目 但出现错误 失败 构建失败并出现异常 什么地方出了错 任务 bootRun 执行失败 进程 命令 C Program Files Java jdk1 8 0 111 bin java
  • Java new Date() 打印

    刚刚学习 Java 我知道这可能听起来很愚蠢 但我不得不问 System out print new Date 我知道参数中的任何内容都会转换为字符串 最终值是 new Date 返回对 Date 对象的引用 那么它是如何打印这个的呢 Mo
  • Java中反射是如何实现的?

    Java 7 语言规范很早就指出 本规范没有详细描述反射 我只是想知道 反射在Java中是如何实现的 我不是问它是如何使用的 我知道可能没有我正在寻找的具体答案 但任何信息将不胜感激 我在 Stackoverflow 上发现了这个 关于 C
  • 如何在 Play java 中创建数据库线程池并使用该池进行数据库查询

    我目前正在使用 play java 并使用默认线程池进行数据库查询 但了解使用数据库线程池进行数据库查询可以使我的系统更加高效 目前我的代码是 import play libs Akka import scala concurrent Ex
  • 在 java 类和 android 活动之间传输时音频不清晰

    我有一个android活动 它连接到一个java类并以套接字的形式向它发送数据包 该类接收声音数据包并将它们扔到 PC 扬声器 该代码运行良好 但在 PC 扬声器中播放声音时会出现持续的抖动 中断 安卓活动 public class Sen
  • 在 HTTPResponse Android 中跟踪重定向

    我需要遵循 HTTPost 给我的重定向 当我发出 HTTP post 并尝试读取响应时 我得到重定向页面 html 我怎样才能解决这个问题 代码 public void parseDoc final HttpParams params n
  • 制作一个交互式Windows服务

    我希望我的 Java 应用程序成为交互式 Windows 服务 用户登录时具有 GUI 的 Windows 服务 我搜索了这个 我发现这样做的方法是有两个程序 第一个是服务 第二个是 GUI 程序并使它们进行通信 服务将从 GUI 程序获取
  • Final字段的线程安全

    假设我有一个 JavaBeanUser这是从另一个线程更新的 如下所示 public class A private final User user public A User user this user user public void
  • Android:捕获的图像未显示在图库中(媒体扫描仪意图不起作用)

    我遇到以下问题 我正在开发一个应用程序 用户可以在其中拍照 附加到帖子中 并将图片保存到外部存储中 我希望这张照片也显示在图片库中 并且我正在使用媒体扫描仪意图 但它似乎不起作用 我在编写代码时遵循官方的Android开发人员指南 所以我不
  • INSERT..RETURNING 在 JOOQ 中不起作用

    我有一个 MariaDB 数据库 我正在尝试在表中插入一行users 它有一个生成的id我想在插入后得到它 我见过this http www jooq org doc 3 8 manual sql building sql statemen
  • 控制Android的前置LED灯

    我试图在用户按下某个按钮时在前面的 LED 上实现 1 秒红色闪烁 但我很难找到有关如何访问和使用前置 LED 的文档 教程甚至代码示例 我的意思是位于 自拍 相机和触摸屏附近的 LED 我已经看到了使用手电筒和相机类 已弃用 的示例 但我
  • Liferay ClassNotFoundException:DLFileEntryImpl

    在我的 6 1 0 Portal 实例上 带有使用 ServiceBuilder 和 DL Api 的 6 1 0 SDK Portlet 这一行 DynamicQuery query DynamicQueryFactoryUtil for
  • Spring Data JPA 应用排序、分页以及 where 子句

    我目前正在使用 Spring JPA 并利用此处所述的排序和分页 如何通过Spring data JPA通过排序和可分页查询数据 https stackoverflow com questions 10527124 how to query
  • 路径中 File.separator 和斜杠之间的区别

    使用有什么区别File separator和一个正常的 在 Java 路径字符串中 与双反斜杠相反 平台独立性似乎不是原因 因为两个版本都可以在 Windows 和 Unix 下运行 public class SlashTest Test
  • 无法捆绑适用于 Mac 的 Java 应用程序 1.8

    我正在尝试将我的 Java 应用程序导出到 Mac 该应用程序基于编译器合规级别 1 7 我尝试了不同的方法来捆绑应用程序 1 日食 我可以用来在 Eclipse 上导出的最新 JVM 版本是 1 6 2 马文 看来Maven上也存在同样的
  • Java列表的线程安全

    我有一个列表 它将在线程安全上下文或非线程安全上下文中使用 究竟会是哪一个 无法提前确定 在这种特殊情况下 每当列表进入非线程安全上下文时 我都会使用它来包装它 Collections synchronizedList 但如果不进入非线程安
  • 声明的包“”与预期的包不匹配

    我可以编译并运行我的代码 但 VSCode 中始终显示错误 早些时候有一个弹出窗口 我不记得是什么了 我点击了 全局应用 从那以后一直是这样 Output is there but so is the error The declared
  • 有没有办法为Java的字符集名称添加别名

    我收到一个异常 埋藏在第 3 方库中 消息如下 java io UnsupportedEncodingException BIG 5 我认为发生这种情况是因为 Java 没有定义这个名称java nio charset Charset Ch
  • 使用 JMF 创建 RTP 流时出现问题

    我正处于一个项目的早期阶段 需要使用 RTP 广播DataStream创建自MediaLocation 我正在遵循一些示例代码 该代码目前在rptManager initalize localAddress 出现错误 无法打开本地数据端口
  • 如何实现仅当可用内存较低时才将数据交换到磁盘的写缓存

    我想将应用程序生成的数据缓存在内存中 但如果内存变得稀缺 我想将数据交换到磁盘 理想情况下 我希望虚拟机通知它需要内存并将我的数据写入磁盘并以这种方式释放一些内存 但我没有看到任何方法以通知我的方式将自己挂接到虚拟机中before an O

随机推荐

  • Visual Studio 不会编译带有 *.inl 实现的模板类

    我正在关注一本关于 SFML 游戏开发的书 但我被困在第二章 因为我无法编译我刚刚编写的代码 它几乎是从书中逐字复制 除了成员变量名称和异常文本 我有使用 C 和模板的经验 但我以前从未见过这个错误 而且我已经盯着它看了几个小时了 我没有发
  • cv2.waitKey(1) & 0xff == ord('q') 如何工作?

    这条线如何运作 据我所知 到目前为止 输出cv2 waitKey number 对于所有的每一个int数字是 1 and 0xff是一个十六进制数 等于255以十进制数字表示 1 0xff等于255以十进制数字表示 Also ord q 等
  • 在后台线程上构建 UIView

    我知道 UI 应该只在主线程上更新 但是是否可以在单独的线程上创建和添加子视图 只要它们不添加到可见视图中 会导致内存和性能问题吗 这是一些示例代码 NSOperationQueue queue NSOperationQueue alloc
  • 使用 Spring MVC RequestMappingHandlerMapping 和 Spring Websocket 的 ServletWebSocketHandlerRegistry 处理相同的 URL

    我想要拥有什么 客户端发送GET HTTP 1 1 没有Connection upgrade 该请求应由RequestMappingHandlerMapping 客户端发送Connection upgrade与 GET 请求一起 该请求应该
  • Oracle SQL:为什么我的函数输出 null?

    CREATE OR REPLACE FUNCTION get status by member id p member id NUMBER RETURN CHAR AS v status CHAR 1 BEGIN select status
  • C++ 指针和指向引用的指针

    我正在尝试创建一个二叉搜索树 我使用递归过程将节点插入树中 代码如下 void BST insertRoot Node node int data if node NULL this gt root new Node data else i
  • 使用依赖规则匹配的方面意见提取中的命名实体识别

    使用 Spacy 我根据我定义的语法规则从文本中提取方面意见对 规则基于 POS 标签和依赖标签 通过以下方式获得token pos and token dep 下面是其中一项语法规则的示例 如果我通过了判决Japan is cool 它返
  • 部署到 SharePoint 2010 网站的 WCF 服务出现“EndPoint Not Found”错误

    我正在尝试利用自动完成扩展器 questions tagged autocompleteextender来自ajax控制工具包 questions tagged ajaxcontroltoolkit在我的一个共享点 questions ta
  • 如何从页脚中删除“订单和退货”?

    我已将新的 Magento 1 5 0 1 安装更新为 Magento 1 6 0 0 现在页脚中有一个链接 订单和退货 我 尚 不知道如何删除它 我无法从核心文件中删除它 我已经尝试过 XML 方法 但似乎不起作用 可能是我的错 目前 我
  • 如果未选择下拉列表,则不会在提交时发送 GET 变量

    我有一个表单 用于过滤仅包含下拉列表的搜索结果 我使用 GET 而不是 post 以便可以轻松地通过 URL 共享结果
  • 将字符串转换为Java代码

    我遇到了一个奇怪的案例 在给定的数据库中 我得到了一条记录VARCHAR字段 所以在我的实体模型中我添加了这个字段 以及 getter 和 setter 现在是乐趣开始的时刻 下面的字符串实际上是方法的主体 它看起来像这样 if score
  • 尝试修改不可创建的数组值

    我没有 Perl 经验 并且很难解决此错误 任何帮助表示赞赏 脚本如下 第40行报告问题 粗体 usr bin perl print Please enter filename without extension input lt gt c
  • 在设备上构建失败,退出代码为 1

    我成功构建了一个应用程序 进行一些更改后 在模拟器上构建和运行仍然按预期正常工作 但在我的物理设备上构建和运行却意外失败 PhaseScriptExecution CP Embed Pods Frameworks Users olivera
  • 如何获取 numpy / scipy 中特定百分位的索引?

    我看过这个答案 https stackoverflow com a 2374662 391161它解释了如何计算特定百分位数的值 以及这个答案 https stackoverflow com a 12414469 391161它解释了如何计
  • 在没有 GLUT 的情况下初始化 OpenGL

    我能找到的每个介绍和示例似乎都使用 GLUT 或其他一些框架来 初始化 OpenGL 有没有一种方法可以仅使用 GL 和 GLU 中可用的内容来初始化 OpenGL 如果不是 那么 GLUT 正在做什么 如果没有它就不可能实现 正如卢克所指
  • 标准化矩阵 python 的行

    给定 python 中的二维数组 我想用以下规范标准化每一行 Norm 1 L 1 Norm 2 L 2 标准信息 L Inf 我已经开始这段代码 from numpy import linalg as LA X np array 1 2
  • Elastic Search版本冲突问题

    我正在使用弹性搜索来进行搜索 但最近我观察到在将数据添加到弹性搜索时出现一些随机错误 版本冲突 需要 seqNo 113789 主要术语 19 当前文档有 seqNo 113797 和主要术语 19 上述类型错误是随机出现的 我无法在弹性搜
  • ADO 和 DAO 的区别

    这不是一个更好的问题 而是一个关于为什么它们在功能上不同的问题 我遇到的问题已得到解决 但我很好奇为什么会发生这种行为 背景 使用 Excel vba 从 Access 数据库中提取数据 当用户单击按钮时 将从 Access 中提取记录集
  • 每个函数名称后面的@n(“at 符号”)是什么?

    我正在尝试使用 Netwide Assembler 学习汇编语言 在教程中 我看到有一个 n在每个函数名称的末尾 例如 CALL GetStdHandle 4 CALL WriteFile 20 CALL ExitProcess 4 这是做
  • 如何使用 vert.x-rx 创建反应式客户端-服务器 TCP 通信

    我目前正在开发一个项目 该项目需要外部系统和我将编写的应用程序 用Java 之间的TCP 通信 众所周知 使用常规 NIO 可以轻松实现这一点 然而 作为我正在开发的这个新项目的一部分 我必须使用 Vert x 来提供 TCP 通信 请参考