如何从 Http 集成流程创建 Spring Reactor Flux?


我有一个与此非常相似的问题如何从 ActiveMQ 队列创建 Spring Reactor Flux?

唯一的区别是消息来自 Http 端点而不是 JMS 队列。问题是消息通道由于某种原因没有被填充,或者它没有被 Flux.from() 拾取。日志条目显示 GenericMessage 是从 Http 集成流创建的,并以有效负载作为路径变量,但未排队/发布到通道?我试过.channel(MessageChannels.queue()) and .channel(MessageChannels.publishSubscribe())没有任何区别,事件流是空的。这是代码:

public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                        .requestMapping(r -> r

@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){     
    return Flux.from(httpReactiveSource())              




buildscript {
    ext {
        springBootVersion = '2.0.0.M2'
    repositories {
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    dependencies {

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }

dependencies {



它起作用时@SpringBootApplication and @RestController在一个文件中定义,但在以下情况下停止工作@SpringBootApplication and @RestController位于单独的文件中。


package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

public class TestApp {
     public static void main(String[] args) {
            SpringApplication.run(TestApp.class, args);


package com.example.controller;

import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;

import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;

public class TestController {
        public Publisher<Message<String>> httpReactiveSource() {
            return IntegrationFlows.
                            .requestMapping(r -> r

        @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> eventMessages() {
            return Flux.from(httpReactiveSource())



public class SpringIntegrationSseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);

    public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                        .requestMapping(r -> r

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> eventMessages() {
        return Flux.from(httpReactiveSource())


我在 POM 中有这个依赖项:

    <relativePath/> <!-- lookup parent from repository -->





curl http://localhost:8080/events



curl -X POST http://localhost:8080/message/foo

curl -X POST http://localhost:8080/message/bar

curl -X POST http://localhost:8080/message/666





请注意,我们不需要spring-boot-starter-webflux依赖性。这FluxSSE 与 Servlet 容器上的常规 MVC 配合良好。

Spring Integration 也将很快支持 WebFlux:https://jira.spring.io/browse/INT-4300。因此,您将能够在那里进行如下配置:

                        .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))

并且完全仅依赖 WebFlux,无需任何 Servlet 容器依赖项。


