使用 Spring 与 RabbitMQ 集成

2024-05-26

我正在为我们的一个应用程序开发消息传递界面。该应用程序是一种服务,旨在接受“作业”,进行一些处理并返回结果(实际上以文件的形式)。

这个想法是使用 RabbitMQ 作为消息传递基础设施,并使用 Spring AMQP 来处理协议特定的细节。

我不想让我的代码与 Spring AMQP 紧密耦合,因此我想使用 Spring Integration 来隐藏消息传递 api。所以基本上我想要这个:

消息发送到 RabbitMQ ====> Spring AMQP ====> Spring Integration ====> MyService ====> 一路回复回 RabbitMQ

我正在尝试制定将其连接在一起所需的 XML 配置,但我在多个抽象级别和不同术语方面遇到了问题。事实证明,找到一个在 Spring AMQP/RabbitMQ 之上演示 Spring Integration 的工作示例非常困难,尽管事实上这种设置对我来说非常“最佳实践”。

1)所以..能否有一些聪明的人快速看看这个,也许能把我推向正确的方向?我需要什么,不需要什么? :-)

2)理想情况下,队列应该是多线程的,这意味着taskExecutor应该将多个消息传递给我的jobService以进行并行处理。需要什么配置?

 <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
    http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
    ">

    <context:component-scan base-package="com.myprogram.etc" />

    <!-- Messaging infrastructure: RabbitMQ -->

    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="${ei.messaging.amqp.servername}" />
        <property name="username" value="${ei.messaging.amqp.username}" />
        <property name="password" value="${ei.messaging.amqp.password}" />
    </bean>

    <rabbit:connection-factory id="connectionFactory" />

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- From RabbitMQ -->

    <int-amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP" queue-names="our-product-name-queue" connection-factory="connectionFactory"/>

    <!-- Spring Integration configuration -->

    <int:channel id="fromAMQP">
        <!-- Is this necessary?? -->
        <int:queue/>
    </int:channel>

    <!-- JobService is a @Service with a @ServiceActivator annotation -->
    <int:service-activator input-channel="fromAMQP" ref="jobService"/>
</beans>

我怀疑我和你一样是 spring-integration 和 spring-integration-amqp 的菜鸟,但我确实在部分基于一个示例项目的基础上得到了一些东西。

对于rabbitmq基础设施,我有以下内容:

<rabbit:connection-factory id="rabbitConnectionFactory"/>

<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/>

<rabbit:admin connection-factory="rabbitConnectionFactory"/>

<!-- some attributes seemed to be ok with queue name, others required id
  -- so I used both with the same value -->
<rabbit:queue id='test.queue' name='test.queue'/>

<rabbit:direct-exchange name:"my.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="test.queue" key="test.binding"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

要向rabbitmq发送消息,我有以下内容:

<!-- This is just an interface definition, no implementation required
  -- spring will generate an implementation which puts a message on the channel -->
<int:gateway id="backgroundService", 
         service-interface="com.company.BackgroundService"
             default-request-channel="toRabbit"

<int:channel id:"toRabbit"/>

<!-- used amqpTemplate to send messages on toRabbit channel to rabbitmq -->
<int-amqp:outbound-channel-adapter channel:"toRabbit" 
                               amqp-template="amqpTemplate" 
                   exchange-name="my.exchange" 
                   routing-key="test.binding"/>

为了接收消息,我有以下内容:

<int:service-activator input-channel="fromRabbit" 
                       ref="testService" 
                       method="serviceMethod"/>


// from rabbitmq to local channel
<int-amqp:inbound-channel-adapter channel="fromRabbit" 
                                  queue-names="test.queue" 
                                  connection-factory="rabbitConnectionFactory"/>

<int:channel id="fromRabbit"/>

一些警告 - spring-integration 中 amqp 集成的文档说可以同步发送和接收返回值,但我还没有弄清楚。当我的服务激活器方法返回一个值时,它会引发一个异常,将消息放回rabbitmq(并生成一个无限循环,因为它会再次接收消息并再次抛出异常)。

我的BackgroundService 界面如下所示:

package com.company

import org.springframework.integration.annotation.Gateway

public interface BackgroundService {

    //@Gateway(requestChannel="someOtherMessageChannel")
    public String sayHello(String toWho)

}

如果您不希望使用 spring bean 中配置的默认通道,则可以通过注释在每个方法上指定通道。

附加到服务激活器的服务如下所示:

package com.company;

class TestService {

    public void serviceMethod(String param) {
    log.info("serviceMethod received: " + param");
    //return "hello, " + param;
    }
}

当我在本地连接所有东西而不涉及rabbitmq时,调用者正确接收了返回值。当我进入rabbitmq通道时,当返回值后抛出异常时,我得到了前面提到的无限循环。这肯定是可能的,否则在不修改代码的情况下就不可能在不同的通道中连接,但我还不确定技巧是什么。如果您解决了,请回复并提供解决方案。显然,您可以根据需要在端点之间放置任何您喜欢的路由、转换和过滤。

如果我上面的 XML 摘录中有拼写错误,请不要感到惊讶。我必须从 groovy DSL 转换回 xml,所以我可能会犯错误。但意图应该足够明确。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Spring 与 RabbitMQ 集成 的相关文章

  • 在 Celery 工作线程中捕获 Heroku SIGTERM 以优雅地关闭工作线程

    我对此进行了大量研究 令我惊讶的是我还没有在任何地方找到一个好的答案 我正在 Heroku 上运行一个大型应用程序 并且我有某些运行很长时间处理的 celery 任务 并在任务结束时保存结果 每次我在 Heroku 上重新部署时 它都会发送
  • 我是否应该将 CachingConnectionFactory 与 hornetq 2.4.1 一起使用

    根据有关在 hornetq 中使用 JMSTemplate 的长期信息 我们在连接到服务器时一直使用 CachingConnectionFactory 这是一个示例配置 与我们正在使用的配置非常相似
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • AMQP 消息的版本控制

    是否有关于在 AMQP 中使用版本化消息的既定最佳实践 假设我正在对消息模式进行语义版本控制 我希望支持消息的当前主要版本以及以前的主要版本 这是一个现实的期望吗 处理版本化消息的不同选项有何优缺点 我已经看到版本化路由密钥和版本消息头作为
  • AMQPRuntimeException:读取数据时出错。收到 0 而不是预期的 7 字节

    它曾经有效 但现在不再有效了 我正在使用 php amqplib 和 RabbitMQ 当我尝试创建新的 AMQP 连接时 connection new AMQPConnection localhost 5672 username pass
  • RabbitMQ 管理插件窗口呈现为空白页面

    I have installed Erlang RabbitMQ and configured the management plugin as per the instructions on the website https www r
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • FTP 入站通道适配器的 FTP 问题

    我们的项目使用 ftp inbound channel adapter 从 FTP 服务器轮询文件 它工作正常 但在轮询之间不起作用 当我看到 FTP 服务器日志时 我看到 425 无法打开数据连接 现在 当我重新启动或停止并再次启动 ft
  • Spring Integration:SecurityContext 传播

    我对 Spring Integration 中的 SecurityContext 传播有一些困惑 这是文档的要点 http docs spring io spring integration reference htmlsingle sec
  • RabbitMQ - 无法联系统计数据库。消息速率和队列长度将不会显示

    我已经设置了一个兔子经纪人集群 并且在管理门户插件中我收到以下消息 无法联系统计数据库 消息速率和队列长度将不会显示 我已经搜索过这个错误 但谷歌并不友善 任何人都可以阐明这一点吗 我最近在旧安装的RabbitMQ 2 8 7 上遇到了同样
  • 如何让spring为JdbcMetadataStore创建相应的schema?

    我想使用此处描述的 jdbc 元数据存储 https docs spring io spring integration docs 5 2 0 BUILD SNAPSHOT reference html jdbc html jdbc met
  • 从 RabbitMQ 迁移到 Amazon SQS [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我们的初创公司目前正在使用RabbitMQ with Python Django 对于消息队列 现在我们计划转移到Amazon SQS其高可用性
  • 当没有更多消息时退出 Spring Integration

    我使用 Spring Integration 4 1 配置从数据库中批量检索消息 而不是作为服务 我知道每天要处理十几条消息 因此我需要每天运行一次批处理 我的 jdbc inbound channel adapter 配置为检索 max
  • Celery 广播 vs RabbitMQ 扇出

    我最近一直在使用 Celery 但我不喜欢它 它的配置很混乱 过于复杂并且文档记录很少 我想用 Celery 从单个生产者向多个消费者发送广播消息 让我困惑的是 Celery 术语和底层传输 RabbitMQ 术语之间的差异 在 Rabbi
  • 启动时加载 RabbitMQ 配置

    如何在启动时加载 RabbitMQ 配置以确认已创建代理对象 队列 交换 绑定 用户 虚拟主机 权限和参数 根据 RabbitMQ 文档 可以通过以下方式完成load definitions http www rabbitmq com ma
  • 像 AMQP 这样的面向消息的中间件在哪些领域有用?

    MOM 面向消息的中间件 解决什么问题 可扩展性 一体化 它们通常在哪些领域使用以及它们通常在哪些领域not used 例如 Google 是否将此类解决方案用于其主要搜索引擎或为 GMail 提供支持 沃尔玛 eBay FedEx 几乎是
  • 如何注册 org.springframework.integration.monitor.IntegrationMBeanExporter

    根据http www ibm com support knowledgecenter en SS7K4U 8 5 5 com ibm websphere nd multiplatform doc ae cspr data access tr
  • RabbitMQ + Windows + LDAP 无需发送密码

    我正在尝试在 Windows 7 上使用 RabbitMQ 3 6 2 进行 LDAP 身份验证 授权 我已经在应用程序发送用户名 密码的情况下进行了基本身份验证 但密码位于我需要弄清楚如何进行的代码中避免 有没有人在不提供密码的情况下成功
  • rabbitmq和spring-rabbitmq中的DLX——拒绝消息的一些注意事项

    我确实读过这个参考资料 https www rabbitmq com dlx html https www rabbitmq com dlx html 但这并不能解决我的疑问 即 如果接受消息没有问题 spring rabbitmq发送 a
  • EasyNetQ 模型关闭

    我使用 EasyNetQ 实现了一个简单的 RabbitMQ 客户端 连接后 我收到一条通知 队列模型关闭 这是我的代码 var bus RabbitHutch CreateBus String Format host 0 hostName

随机推荐

  • Spring Boot 删除 Whitelabel 错误页面

    我正在尝试删除白色标签错误页面 所以我所做的是为 error 创建了一个控制器映射 RestController public class IndexController RequestMapping value error public
  • 如何在状态栏显示通知?

    所以我在我的活动中创建了此通知 Notification n new Notification Builder getApplicationContext setContentTitle New mail from sender setCo
  • 对 FileList 对象进行排序

    我正在尝试使用新的文件 API 对输入文件进行排序 它返回的列表似乎是不可变的 var x var files e target files FileList object Loop through the FileList and ren
  • 在 Facebook 上分享时如何更改 Play 商店应用程序的预览?

    我已在 Google Play 开发者控制台中更改了游戏的图标和屏幕截图 并且它在 Play 商店中正确显示 但当我尝试在 Facebook 上分享 Play 商店链接时 预览会带有旧图标和屏幕截图 如何将此预览更新到应用程序的最新版本 你
  • GIT:无效路径“.editorconfig”

    从 master 克隆项目时出现以下错误 错误 无效路径 editorconfig 致命 无法签出工作树 警告 克隆成功 但结账失败 您可以使用 git status 检查签出的内容 并使用 git Restore source HEAD
  • 如何使 Javascript focus() 方法在输入文本框的 onBlur 事件中工作?

    对于我来说focus 如果我将它与按钮和 onClick 事件一起使用 则该方法工作正常 但使用onBlur从文本框中 它不起作用 有人可以指导我吗
  • Yii2 - 检查用户是否登录视图

    我试图检查用户是否登录到我的视图文件中 但我不断收到此错误 Call to undefined method Yii app 我尝试添加 before app但错误仍然存 在 这次是未定义的变量 应用程序 这可能是观点吗 这是我用来检查用户
  • Angular4 Material md-table 列宽度像普通表一样自动调整大小

    我在 Angular 4 应用程序中使用 md table 因为我将 Material 用于 UI 格式的其他部分 当我使用基本上没有 CSS 的常规表格时 列会自动设置格式以适合最宽的 td 元素 使用 md table 除了太宽的单元格
  • 最佳实践:在 PHP 中导入 mySQL 文件;分割查询

    我遇到了一种情况 我必须更新共享托管提供商上的网站 该网站有一个 CMS 使用 FTP 上传 CMS 文件非常简单 我还必须导入一个大的 相对于 PHP 脚本的范围 数据库文件 未压缩时大约 2 3 MB Mysql 已关闭 无法从外部访问
  • 如何使用 AWS SAM 为 HttpApi 配置自定义域?

    我正在使用 AWS Lambda AWS API Gateway 和 aws sam 开发 API 我已经实现了 firebase 身份验证 我也使用嵌套堆栈 我正在尝试为我的 API 端点使用自定义域 因此我可以像这样调用api mydo
  • 如何使用 HBASE Shell 创建具有预分割和压缩或其他选项的表

    在 HBase shell 中 帮助文件向我们展示了几种允许的创建表的语法 create tableName NAME gt colFamily VERSIONS gt 5 create tableName NAME gt cf1 NAME
  • flutter 聊天应用程序上的消息顺序不正确

    我刚刚根据教程完成了一个基本的聊天应用程序 新消息应该显示在底部 但这种情况没有发生 当我删除 保存并再次添加时reversed排队 final messages snapshot data documents reversed 然后它似乎
  • Web API 2 和 ASP Identity - 处理锁定的用户

    我刚刚将我的 Web 应用程序 ASP NET MVC 迁移到 ASP Identity 经过相当多的工作后 除了 Web 应用程序提供的 API 之外 一切都工作正常 这是一个 WEB API 2 它使用不记名令牌机制来验证用户身份 身份
  • 如何在 postgresql 中使用“时间”字段按小时分组?

    我有一张带有一列的桌子ctime类型的time without time zone cdate ctime 2016 12 24 12 02 17 2016 12 24 12 02 32 2016 12 24 12 03 00 2016 1
  • Bootstrap 3 将文本与 Div 底部对齐

    我正在尝试在 Bootstrap 中进行如下设置 其中文本与图像底部对齐 THIS IS AN IMAGE And some text
  • Boost Python:多态容器?

    我有一个方法 或函数 它返回对多态对象列表的引用 class A class B public A std list
  • 使用 MYSQL 创建随机数

    我想知道是否有一种方法可以选择 100 到 500 之间随机生成的数字以及选择查询 Eg SELECT name address random number FROM users 我不必将此数字存储在数据库中 而只需使用它来显示目的 我尝试
  • 更新图像而不闪烁 ASP.NET C#

    我正在编写一个网站 该网站的页面必须显示图像 该图像是由 HttpHandler 使用查询字符串命令创建的 如何才能使其正常工作而不会出现任何闪烁 提前致谢 如果您需要一些代码 我很乐意分享 您可以使用 2 个 UpdatePanel 并在
  • 反序列化之前从 JSON 中删除奇怪的隐藏字符

    我有一些 JSON 发送给我 当它尝试反序列化时会中断 它似乎含有一颗黑色钻石 上面有 在里面 我看不到该角色 但它显然在那里 但在我的系统上失败了 如何摆脱这个问题并保持 JSON 完整以进行反序列化 UPDATE 以下是 JSON 中间
  • 使用 Spring 与 RabbitMQ 集成

    我正在为我们的一个应用程序开发消息传递界面 该应用程序是一种服务 旨在接受 作业 进行一些处理并返回结果 实际上以文件的形式 这个想法是使用 RabbitMQ 作为消息传递基础设施 并使用 Spring AMQP 来处理协议特定的细节 我不