如何使用嵌入到Spring Cloud Stream中的kafka创建单元测试

2023-11-22

抱歉,这个问题太笼统了,但是有人有一些关于如何使用嵌入的 kafka 执行生产者和消费者测试的教程或指南。我已经尝试了几个,但是有多个版本的依赖项,但没有一个真正起作用=/

我正在使用春云流卡夫卡。


我们通常建议使用测试粘合剂在测试中,但如果你想使用嵌入式 kafka 服务器,可以这样做......

将其添加到您的 POM 中...

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

测试应用程序...

@SpringBootApplication
@EnableBinding(Processor.class)
public class So43330544Application {

    public static void main(String[] args) {
        SpringApplication.run(So43330544Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public byte[] handle(byte[] in){
        return new String(in).toUpperCase().getBytes();
    }

}

应用程序.属性...

spring.cloud.stream.bindings.output.destination=so0544out
spring.cloud.stream.bindings.input.destination=so0544in
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=so0544

测试用例...

@RunWith(SpringRunner.class)
@SpringBootTest
public class So43330544ApplicationTests {

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

    @Autowired
    private KafkaTemplate<byte[], byte[]> template;

    @Autowired
    private KafkaProperties properties;

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void testSendReceive() {
        template.send("so0544in", "foo".getBytes());
        Map<String, Object> configs = properties.buildConsumerProperties();
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
        Consumer<byte[], byte[]> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton("so0544out"));
        ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
        consumer.commitSync();
        assertThat(records.count()).isEqualTo(1);
        assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
    }

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用嵌入到Spring Cloud Stream中的kafka创建单元测试 的相关文章

  • Kafka不启动空白输出

    我正在努力安装 Kafka 和 Zookeeper 我已经运行了 Zookeeper 并且它当前正在运行 我将所有内容设置为 https dzone com articles running apache kafka on windows
  • 如何在测试套件中定义 JUnit 方法规则?

    我有一个类 它是 JUnit 测试类的 JUnit 套件 我想定义一个规则on the suite 这是可以做到的 但需要做一些工作 您还需要定义自己的 Suite 运行程序和测试运行程序 然后在测试运行程序中重写 runChild 使用以
  • 单元测试组合服务方法

    我正在为一个类编写 junit 单元测试 该类使用以下方法实现公开的接口 public Set
  • 如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

    我想使用 Python PySpark 从 Kafka 源到 MariaDB 进行 Spark 结构化流处理 Spark 2 4 x 我想使用流式 Spark 数据帧 而不是静态数据帧或 Pandas 数据帧 看来必须要用foreach o
  • 如何使用 Spring Boot 测试 Maven 模块项目

    我已经将一个基于 Spring Boot 的项目拆分为几个 Maven 模块 现在只有war project包含一个starter类 有一个main方法 启动Spring 其他模块都是jar类型 如果 jar 项目不包含启动器 我该如何测试
  • 如何测试 Jersey REST Web 服务?

    我已经编写了一个 Restful Web 服务 并且必须使用 JUnit4 对其进行测试 我已经使用 Jersey Client 编写了一个客户端 但想知道我是否只能使用 junit4 测试我的服务 至少有人可以帮我提供样品吗 我的休息服务
  • 有条件地跳过 Junit 5 测试

    在我的 Junit Jupiter API 5 5 测试中 我正在调用我的方法 该方法在内部对远程服务进行 HTTP 调用 现在 远程服务可能会关闭或行为不正确 我想跳过测试 以防远程服务未按预期运行 Test void testMe do
  • 如何检测java中的消费者是否无法使用kafka代理?

    我有一个简单的 Java Kafka 消费者 如果 Kafka 代理不可用 我试图捕获异常 我需要它来中断线程 我有这样的代码 KafkaConsumer
  • 为什么 junit 测试不使用“sbt test”执行?

    我正在将 sbt 版本 0 13 2 与纯 java 项目一起使用 My build sbt其中有以下行 libraryDependencies Seq com novocode junit interface 0 10 test 我的测试
  • Kafka Streams 反序列化处理程序

    我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 当发生错误时 通过成功记录错误并继续 它可以正常工作 但是 假设我的传入消息有连续的错误流 我停止并重新启动 kafka 流应用程序 然后我看到失败并
  • 如何让多个 MockFor 在 Groovy 中工作?

    我正在尝试让多个模拟在 groovy 中工作 我成功实现此功能的唯一方法是创建我自己的模拟 添加元方法 我尝试过使用嵌套 use 语句 还尝试了一种使用和一种带验证的代理 但都不起作用 这两个都返回失败 junit framework As
  • 如何删除 Apache Kafka 中的主题? [复制]

    这个问题在这里已经有答案了 我需要删除 Kafka 0 8 2 2 3 中的一个主题 我使用以下命令删除主题 bin kafka topics sh zookeeper localhost 2181 delete topic DummyTo
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • FeignClient无法解析Eureka服务名称

    我正在编写一个 Eureka 客户端应用程序spring cloud starter openfeign客户 这是我的POM
  • spring boot测试如何获取服务器端口

    我有 Spring Boot 1 5 3 应用程序 有一条线server port 8081在 application properties 文件中 现在我想测试一下 ping 方法 private final Environment en
  • 来自复杂对象的 spring RestTemplate POST 参数

    我正在尝试使用 postForObject 方法使用restTemplate 来测试我们的REST 服务 单元测试 Test public void testPostOrder String url BASE URL orders Orde
  • 如何使用 Spring + DBUnit + JUnit 配置多个事务管理器

    简而言之 我的命令行 Java 应用程序将数据从一个数据源复制到另一个数据源 而不使用 XA 我已经配置了两个单独的数据源 并且想要一个可以回滚两个数据源上的数据的 JUnit 测试 我使用 DBUnit 将数据加载到 源 数据库中 但无法
  • 是否有用于事件驱动的 Kafka 消费者的 Python API?

    我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序 因此 我希望有一个 Kafka 消费者 当相关主题的流中存在新消息时 该消费者会被触发 并通过将消息推回到 Kafka 流来进行响应 我一直在寻找类似 Spring
  • 提供了 kafka schema.registry.url 但不是已知的配置

    尝试使用架构注册表发布有关主题的 json 消息 但出现以下错误 以下Spring Boot方法 已提供配置 schema registry url 但不是已知配置 应用程序 yml 文件 server port 9080 spring k
  • 如何在kafka中定义多个序列化器?

    比如说 我发布和使用不同类型的 java 对象 对于每个对象 我必须定义自己的序列化器实现 我们如何在 serializer class 属性下提供kafka消费者 生产者属性文件中的所有实现 我们有一个类似的设置 不同主题中的不同对象 但

随机推荐

  • 如何以编程方式找出哪些频道属于给定 YouTube 网络?

    似乎没有官方的 YouTube API 来查找 YouTube 网络列表或哪些频道属于给定网络 有什么想法如何找到该信息吗 如果没有直接的方法 socialblade com 使用什么算法获得近似列表 我不知道这是否是像socialblad
  • 对 CUDA 内核中不同部分进行计时

    我有一个 CUDA 内核 可以调用一系列设备函数 获取每个设备功能的执行时间的最佳方法是什么 获取设备函数之一中一段代码的执行时间的最佳方法是什么 在我自己的代码中 我使用clock 函数以获得精确的计时 为了方便起见 我有宏 enum t
  • 从推送通知启动时,launchOptions 始终为零

    我正在从 Django 应用程序发送推送通知 使用django 推送通知 到 iOS 应用程序 该应用程序面向 iOS 13 我在运行 iOS 13 3 1 的 iPhone 7 上运行它 我正在 Xcode 11 3 1 中调试 我正在尝
  • Numpy 的特征值/向量不正确

    我试图找到以下矩阵的特征值 向量 A np array 1 0 0 0 1 0 1 1 0 使用代码 from numpy import linalg as LA e vals e vecs LA eig A 我得到这个作为答案 print
  • 正确使用SQL Server中的事务

    我有 2 个命令 需要两个命令都正确执行 否则都不执行 所以我认为我需要一个交易 但我不知道如何正确使用它 下面的脚本有什么问题 BEGIN TRANSACTION Tran1 INSERT INTO Test dbo T1 Title A
  • 如何在 GitHub 上搜索提交消息?

    Not 在 Git 存储库中 而是在GitHub具体来说 如何仅搜索特定存储库 分支的提交消息 您过去可以执行此操作 但 GitHub 在 2013 年中期的某个时候删除了此功能 要在本地实现此目的 您可以执行以下操作 git log g
  • 实现多个通用接口 - 类型错误

    我正在尝试做这样的事情 public interface IRepository
  • Jquery过滤列表不区分大小写

    我想过滤列表而不区分大小写 我只想匹配不匹配大写或小写的字符 XXXXXXX yyyyyyy XXxxx 如果我在搜索框中输入 X 它会同时显示 1 和 3 我添加了下面的代码 但它也区分大小写
  • bash 计算文件中单词的出现次数

    我很抱歉问了这个非常菜鸟的问题 但我还是个新手bash编程 几天前开始 基本上我想要做的是将一个文件与另一个文件中出现的所有单词一起保存 我知道我可以这样做 sort uniq c sort 问题是 之后我想获取第二个文件 再次计算出现次数
  • 使用 std::launder 从指向非活动对象的指针获取指向活动对象成员的指针?

    This question followes this one 让我们考虑一下这个示例代码 struct sso union struct char ptr char size r 8 large str char short str 16
  • R 中按最后一个空格分割字符串

    我有一个向量 其中有多个空格的字符串 我想将其分成两个向量 并按最后的空格分开 例如 vec lt c This is one And another And one more again 应该成为 vec1 c This is And A
  • 类型带反射的文字注入

    上下文 java使用guice 最后版本 大家好 是否可以通过这种方式用 Guice 注入一些 TypeLiteral public MyClass a Class
  • 在 WebApi 和 MVC 项目之间共享 SignalR 中心

    是否有推荐的方法在两个应用程序之间共享 SignalR 集线器 实际情况是一个面向公众的WebAPI项目和一个内部MVC WebApp 我想要做的是从 WebAPI 项目调用 SignalR 集线器上的方法 并将这些方法的结果推送到通过 M
  • 删除数据框中的所有左侧 NA 并向左移动已清理的行

    我有以下数据框dat 它在某些行的开头呈现特定于行的 NA 数量 dat lt as data frame rbind c NA NA 1 3 5 NA NA NA c NA 1 3 6 8 NA c 1 7 NA dat V1 V2 V3
  • Google 在抓取我们的网站时是否会忽略哈希片段 (#) 后面的内容?

    我们使用哈希片段后面的信息通过 JavaScript 显示不同的页面 以免强制浏览器再次加载整个页面 例如 页面的直接链接可能如下所示 book id page id www example com book 1234 5678 由于我们没
  • 常量对象的常量数组

    如何在 C 而不是 C 中定义常量对象的常量数组 我可以定义 int const Array init data here 但这是常量对象的非常量数组 我可以用 int const const Array init data here 这可
  • 使用 Razor 进行 POST 时 Model.List 为 null

    My view foreach var item in Model List Html HiddenFor model gt item UserId Html HiddenFor model gt item Name Html Hidden
  • 如何在 JavaScript 中将 Object {} 转换为键值对的 Array []

    我想像这样转换一个对象 1 5 2 7 3 0 4 0 5 0 6 0 7 0 8 0 9 0 10 0 11 0 12 0 成一个键值对数组 如下所示 1 5 2 7 3 0 4 0 如何在 JavaScript 中将对象转换为键值对数组
  • 函数矩阵、SymPy 和 SciPy 上的数值积分

    从我的 SymPy 输出中 我得到了如下所示的矩阵 我必须将其积分为 2D 目前我正在按元素进行操作 如下所示 此方法有效 但速度太慢 对于sympy mpmath quad and scipy integrate dblquad 对于我的
  • 如何使用嵌入到Spring Cloud Stream中的kafka创建单元测试

    抱歉 这个问题太笼统了 但是有人有一些关于如何使用嵌入的 kafka 执行生产者和消费者测试的教程或指南 我已经尝试了几个 但是有多个版本的依赖项 但没有一个真正起作用 我正在使用春云流卡夫卡 我们通常建议使用测试粘合剂在测试中 但如果你想