在rabbitmq配置spring boot中在AMQP中配置多个Vhost

2024-04-08

我正在实现一个项目,我必须在rabbitmq中的不同虚拟主机之间发送消息。使用 SimpleRoutingConnectionFactory 但得到 java.lang.IllegalStateException: 无法确定查找键的目标 ConnectionFactory [null]。 任何知道如何实现以下内容的人都是我的配置类代码。

@Configuration
@EnableRabbit
public class RabbitMQConfiguration {

@Autowired
ConnectionProperties connect;

// client1 exchanges
@Bean
public TopicExchange client1Exchange() {
    TopicExchange ex = new TopicExchange("ex_client1");
    ex.setAdminsThatShouldDeclare(client1());
    return ex;
}

// client2 exchange
@Bean
public TopicExchange client2Exchange() {
    TopicExchange ex = new TopicExchange("ex_client2");
    ex.setAdminsThatShouldDeclare(client2Admin());
    return ex;
}

@Bean
public Queue client1Queue() {
    Queue queue = new Queue("client1_queue");
    queue.setAdminsThatShouldDeclare(client1());
    return queue;
}

@Bean
public Binding client1Binding() {
    Binding binding = BindingBuilder.bind(client1Queue())
            .to(client1Exchange())
            .with("client1_key");
    binding.setAdminsThatShouldDeclare(client1());
    return binding;
}


@Bean
public Queue client2Queue() {
    Queue queue = new Queue("client2_queue");
    queue.setAdminsThatShouldDeclare(client2());
    return queue;
}

@Bean
public Binding client2Binding() {
    Binding binding = BindingBuilder.bind(client2Queue())
            .to(client2Exchange())
            .with("client2_key");
    binding.setAdminsThatShouldDeclare(client2());
    return binding;
}

@Bean
@Primary
public ConnectionFactory connectionFactory() {
    SimpleRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory();
    Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<>();
    targetConnectionFactories.put("client1", client1ConnectionFactory());
    targetConnectionFactories.put("client2", client2ConnectionFactory());
    connectionFactory.setTargetConnectionFactories(targetConnectionFactories);
    return connectionFactory;
}

@Bean
public ConnectionFactory client1ConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
    connectionFactory.setVirtualHost(connect.getRabbitMQClient1VHost());
    connectionFactory.setUsername(connect.getRabbitMQClient1User());
    connectionFactory.setPassword(connect.getRabbitMQClient1Pass());
    return connectionFactory;
}

@Bean
public ConnectionFactory client2ConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
    connectionFactory.setVirtualHost(connect.getRabbitMQClient2VHost());
    connectionFactory.setUsername(connect.getRabbitClient2User());
    connectionFactory.setPassword(connect.getRabbitClient2Pass());
    return connectionFactory;
}

// You can comment all methods below and remove interface's implementation to use the default serialization / deserialization
@Bean
public RabbitTemplate rabbitTemplate() {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
    return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
    return new MappingJackson2MessageConverter();
}

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(consumerJackson2MessageConverter());
    return factory;
}

@Bean
public TaskExecutor rabbitListenerExecutor() {
    int threads = Integer.valueOf(connect.getMinConsumers()) * 2; // threads = min consumers* no of queues
    final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(threads);
    executor.setMaxPoolSize(threads);
    executor.setThreadNamePrefix("RabbitThreadListener");
    executor.afterPropertiesSet();
    return executor;
}

@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(Integer.valueOf(connect.getMinConsumers()));
    factory.setPrefetchCount(Integer.valueOf(connect.getPrefetchCount()));
    factory.setTaskExecutor(rabbitListenerExecutor());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

@Bean
public RabbitAdmin client1() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(client1ConnectionFactory());
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

@Bean
public RabbitAdmin client2() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(client2ConnectionFactory());
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

}

我得到这个堆栈跟踪

o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, 
processing can restart if the connection factory supports it
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119)
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:90)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:505)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1335)
    at java.lang.Thread.run(Thread.java:748)

The RoutingConnectionFactory一般用于发布消息。

在侦听器容器中使用路由工厂时,必须配置查找键以匹配容器中配置的队列名称。

From 文档 https://docs.spring.io/spring-amqp//reference/html/_reference.html#routing-connection-factory:

同样从版本 1.4 开始,您可以在侦听器容器中配置路由连接工厂。在这种情况下,队列名称列表将用作查找键。例如,如果您将容器配置为setQueueNames("foo", "bar"),查找键将是"[foo,bar]"(没有空间)。

所以;如果一个RabbitListener监听队列foo路由查找键必须是[foo]。 (您可以使用不同的密钥多次添加相同的 CF)。

或者您可以简单地创建多个容器工厂,每个容器工厂都有一个具体的 CF 而不是路由 CF。

EDIT

假设你有

@RabbitListener(queues = "myQueue", connectionFactory = "myRabbitListenerContainerFactory")
public void listen(...) {
    ...
}

If myQueue is in client1的虚拟主机,那么您需要在路由器 CF 映射中添加一个条目,因此...

targetConnectionFactories.put("[myQueue]", client1ConnectionFactory());

...因为为侦听器生成的侦听器容器将在其查找键中使用队列名称。

或者,创建2个集装箱工厂;每个都直接与 client1 和 client2 CF 连接,而不是与路由 CF...

@Bean
public SimpleRabbitListenerContainerFactory client1ListenerContainerFactory() {

@Bean
public SimpleRabbitListenerContainerFactory client2ListenerContainerFactory() {

and

@RabbitListener(queues = "myQueue", connectionFactory = "client1ListenerContainerFactory")
public void listen(...) {
    ...
}

即根本不要对侦听器使用路由 CF - 容器只有一个连接。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在rabbitmq配置spring boot中在AMQP中配置多个Vhost 的相关文章

  • JavaFX Platform.runLater 的使用以及从不同线程访问 UI

    我有几个问题Platform runLater 我有一个 JavaFX 应用程序类 在这个类中 我运行一个线程 该线程从网络套接字读取数据 现在当我创建一个新的Stage在线程内部 系统抛出异常 JavaFX 事件调度程序线程和我的网络读取
  • RSA SignatureException:签名长度不正确

    我在签署 rsa 签名时遇到问题 我有一个用私钥加密的签名 然而 当我尝试使用公钥验证它时遇到问题 我得到以下异常 java security SignatureException Signature length not correct
  • 由于键更改而尝试插入时外键约束失败

    我有一个 Content 对象 它引用多对多关系中的一组 Tag 对象 作为持久化新内容对象的一部分 我在 PostgreSQL 中查看标签是否已存在 如果存在 则将对其的引用添加到内容对象并尝试保存内容对象 我遇到的问题是 当我这样做时
  • 正确使用 JDBC 连接池 (Glassfish)

    我需要在 Java Web 服务中作为会话 bean 实现数据库连接 但我不确定我这样做是否正确 我创建了一个类 public final class SQLUtils private static DataSource m ds null
  • 用 java 编写解释器时的 switch 或 if 语句

    当前的作业需要我编写一个程序 以一种非常微小且基本的编程语言 行为有点像 FORTRAN 来读取包含指令的文件并执行这些指令 基本上它是我猜的语言的简单解释器 它是完全线性的 所有语句都是按顺序定义的 并且只有字符串和整数变量 我需要查找和
  • 如何将自定义日志处理程序添加到 Google App Engine?

    我正在尝试向我的 java 应用程序添加自定义日志处理程序 我已经实现了一个扩展 java util Logging Handler 类的 InnerLogger 类 在我的logging properties中声明为处理程序 handle
  • 以编程方式设置 Logback Appender 路径

    我正在尝试以编程方式设置 Logback 附加程序路径 滚动文件附加器 http logback qos ch apidocs ch qos logback core rolling RollingFileAppender html准确地说
  • C# 中的协变和逆变

    首先我要说的是 我是一名正在学习 C 编程的 Java 开发人员 因此 我会将我所知道的与我正在学习的进行比较 我已经使用 C 泛型几个小时了 我已经能够在 C 中重现我在 Java 中知道的相同内容 除了几个使用协变和逆变的示例 我正在读
  • 不要模拟值对象:过于通用的规则,没有解释

    以下是 Mockito 单元测试框架的引用 不要模拟值对象 为什么有人会想要这样做呢 因为实例化对象太痛苦了 gt 无效 原因 如果创造新的装置太困难 那就是一个迹象 代码可能需要一些认真的重构 另一种方法是创建 价值对象的构建者 有一些工
  • Java:使用 Java.util.concurrent 线程访问读取线程串行端口

    我正在尝试编写一个 Java 串行设备驱动程序并想使用 对我来说是新的 java util concurrent包裹 我有一种发送数据包然后等待 ACK 的方法 我打算有炭 接收在不同的线程中运行 如果接收线程收到 ACK 它应该使用发送数
  • 在 eclipse 之外将 Spring MVC 应用程序部署到 tomcat 的幕后会发生什么?

    我猜想使用像 eclipse 这样很棒的 IDE 的一个缺点是你会忽略应用程序幕后发生的事情 我是一名 Ruby 开发人员 所以不是一名 Java 老手 所以我一直在用 java 编写一个项目 并使用 spring 框架进行 IOC 和 M
  • 如何从 Google Custom Search API 获取超过 100 个结果

    我正在尝试使用 Google Custom Search API 在 Java 中进行研究 因此 我需要为每个查询提供一个大的结果集 然而 我似乎仅限于前 100 个结果 这比我需要的要少得多 我使用这样的列表方法 list setStar
  • 如何通过子 POJO 的属性过滤复合 ManyToMany POJO?

    我有两个像这样的房间实体 Entity public class Teacher implements Serializable PrimaryKey autoGenerate true public int id ColumnInfo n
  • 抽象类或接口。哪种方式是正确的?

    有两种方法可以选择抽象类或接口 微软解决方案和Oracle解决方案 微软 设计指南 请使用抽象 在 Visual Basic 中为 MustInherit 类而不是接口来将协定与实现分离 http msdn microsoft com en
  • 为什么 spring-boot 应用程序不需要 @EnableWebMvc

    所以我写了一个小应用程序 为了熟悉基础知识 我使其尽可能简单 我使用 Config java 文件制作了一个简单的 mvc 应用程序 当我认为现在应用程序应该抛出错误时 它实际上可以工作 这是我的 pom xml
  • 从命令行运行 Maven 插件的语法是什么。

    我看到这里已经有人问过这个问题 如何从命令行执行maven插件 https stackoverflow com questions 12930656 how to execute maven plugin from command line
  • Selenium - 等待网络流量

    我们将 Selenium 与 Java API 和一些 Javascript 用户扩展一起使用 我们在应用程序中使用了大量 AJAX 调用 我们的许多测试随机失败 因为有时 AJAX 调用完成得比其他时候慢 因此页面未完全加载 我们通过等待
  • 编写自定义 Eclipse 调试器

    EDIT 一定有某种方法可以解决这个问题 而无需编写全新的调试器 我目前正在研究在现有 java 调试器之上构建的方法 如果有人对如何获取 Java 调试器已有的信息 有关堆栈帧 变量 原始数据等 有任何想法 那将非常有帮助 我想要做的是我
  • H2 用户定义的聚合函数 ListAgg 不能在第一个参数上使用 DISTINCT 或 TRIM()

    所以我有一个 DB2 生产数据库 我需要在其中使用可用的函数 ListAgg 我希望使用 H2 的单元测试能够正确测试此功能 不幸的是H2不直接支持ListAgg 但是 我可以创建一个用户定义的聚合函数 import java sql Co
  • 如何在java 1.8中从org.jboss.jca.adapters.jdbc.jdk8.WrappedConnectionJDK8转换为oracle.jdbc.OracleConnection

    如何在 java 1 8 中从 org jboss jca adapters jdbc jdk8 WrappedConnectionJDK8 转换为 oracle jdbc OracleConnection 目前我正在这样使用并得到以下异常

随机推荐

  • 创建/渲染 scene2d 舞台后重置视口

    在我的游戏中 我正在绘制一个 scene2dStage使用自定义世界坐标系 然后 我想绘制一个调试 UI 上面包含一些文本 例如 FPS 但只需使用屏幕坐标 即文本位于屏幕右上角的位置 我的主要渲染方法如下所示 Override publi
  • 为什么在 Firefox 开发工具中传输的字节大于 size 字节?

    我正在衡量网站的性能 当查看 firefox developer tools 时 我注意到一个奇怪的行为 有一个特定的JS文件 传输的大小为2 831 54 KB 但实际大小为1280kb 根据 Mozilla 的说法 https deve
  • 设置 HTML 文本框的最大字数

    我想设置一个最大数量用户可以在文本框中输入的单词数 not字符数但是words 这个有可能 我做了一些挖掘 发现如何获取用户使用正则表达式输入的单词数 但我不确定如何阻止用户在达到最大值后输入更多字母 var jobValue docume
  • 使用 JSP 时如何使用 Struts 2 ModelDriven 接口访问 POJO 中的属性?

    我有一个实现的动作类ModelDriven界面 这ModelDriven是一个常规 POJO 问题在于它的属性之一是另一个对象 想象一下我的模型驱动is a object calledPersonand my person has an a
  • 如何声明一个返回函数指针的函数?

    想象一个带有参数 double 和 int 的函数 myFunctionA myFunctionA double int 这个函数应该返回一个函数指针 char myPointer 如何在 C 中声明这个函数 typedef是你的朋友 ty
  • jQuery DataTables:如何按自定义参数值而不是单元格内容排序?

    我有一个非常常见的用例 我在价格列中显示格式化的价格 例如 20 000 00 因此 当我尝试对它进行排序时 它会将其视为字符串并且排序效果不佳 10 00 20 000 00 5 000 00 我可以使它按数据参数值 非格式化浮点数 排序
  • mail() 超时问题

    当我通过浏览器执行电子邮件脚本时 会返回超时致命错误 除非我大幅增加执行时间 否则它将正常运行 而不是我正在寻找的解决方案 电子邮件已发送 但需要很长时间 平均 5 分钟 才能到达 我的收件箱 考虑到通过命令行它可以完美地工作 我认为 ph
  • 反应本机视频类型错误:未定义不是对象

    所以我尝试使用 npm 中的 React native video 包来播放 YouTube 视频 export default class App extends Component render return
  • 如何让 NSTextField 在自动布局中随着文本增长?

    Lion 中的自动布局应该可以非常简单地让文本字段 以及标签 随其包含的文本一起增长 文本字段在 Interface Builder 中设置为换行 有什么简单而可靠的方法可以做到这一点 方法intrinsicContentSize in N
  • 如何在 ASP.NET Core 5 中使用 IdentityUser 和 IdentityRole 之间的隐式多对多

    我有这两个实体 public class ApplicationUser IdentityUser
  • AF网络。检查所有操作队列的下载进度

    我有一个使用 AFNetworking 制作的 iOS 应用程序 我有一个单例自定义 httpclient 子类 我用它来使用我的 api 并从服务器下载二进制文件 我需要下载大约 100 个文件 迭代我的 url 数组并为每个 url 创
  • 将多个散景 HTML 图嵌入到 Flask 中

    我在 bokeh 网站和 stack Overflow 上搜索了过去 3 个小时 但没有一个是我真正想要的 我已经生成了我的图 并将它们放在 html 文件中 我想要做的就是将绘图嵌入到仪表板中 形成下图白色区域中的多网格状结构 然而 仅添
  • 如何启用或禁用键盘返回键[重复]

    这个问题在这里已经有答案了 当我们在文本字段中输入字符时 它会启用返回键 但是在我们的需求中 当长度大于5时 我们需要启用返回键 但是现在只要我们输入字符 它就会启用返回键 我们是否需要自定义键盘或者是否有其他可用的解决方案 提前致谢 好吧
  • 如何编写正则表达式从字符串中提取数值?

    我有一根绳子str 9 0 hr or str 9 0 hr 在这种情况下我只想要整数值9 0 语言是 Ruby 1 9 2 I d use number str d d 或者 如果小于 1 0 的值需要前导 0 number str d
  • 光栅化 matplotlib 轴内容(但不包括框架、标签)

    在一篇文章中 我正在生成变形有限元网格图 并使用 matplotlib 的 polycollection 对其进行可视化 图像保存为 pdf 高密度网格会出现问题 这种简单的方法会导致文件太大且渲染过于密集而无法实用 对于这些网格 将每个元
  • 在 R 中使用具有不均匀长度变量的熔化/铸造

    我正在处理一个想要旋转的大型数据框 以便列中的变量成为顶部的行 我发现 reshape 包在这种情况下非常有用 除了强制转换函数默认为 fun aggregate length 之外 大概这是因为我是按 案例 执行这些操作 并且测量的变量数
  • 当返回类型为 Option[Error] 时处理快速失败

    我已经发布了很多关于 Scala 中的故障处理的问题 我真的感谢大家的回答 我理解在处理 Either 和 Scalaz 或 a 时的选择 以便理解 我还有另一个 最后一个 问题 当操作处理外部非功能世界 例如数据库 时 如何执行快速失败的
  • 在 SQL 数据库中保持 RSS 提要唯一的最佳实践

    我正在开发一个项目 该项目显示来自不同站点的 RSS 提要 我将它们保存在数据库中 我的程序每 3 小时获取一次并将它们插入到 SQL 数据库中 我希望提供者有独特的记录 不要显示重复的内容 但问题是一些提供商不提供 GUID 字段 而其他
  • 全局 $sce.trustAsResourceUrl()

    我怎样才能做这样的事情 sce trustAsResourceUrl URL HERE 在全球范围内 就像在主应用程序中一样config or run 函数 以便任何 iFrame img src 等具有URL HERE将工作 文档对此的解
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co