Kafka Streams如何获取kafka headers

2023-11-22

我有下面的卡夫卡流代码

    public class KafkaStreamHandler implements  Processor<String, String>{

    private ProcessorContext context;


        @Override
    public void init(ProcessorContext context) {
        // TODO Auto-generated method stub
        this.context = context;
    }

    public KeyValue<String, KafkaStatusRecordWrapper> process(String key, String value) {

        Headers contexts = context.headers();

        contexts.forEach(header -> System.out.println(header));
     }

public void StartFailstreamHandler() {
       StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "failed-streams-userstream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "500");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //consumer_timeout_ms
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);

        props.put("state.dir","/tmp/kafka/stat));

     userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));

     /* take few descsion based on Header */
     /* How to get the Header */ 

       userStream.map(this::process);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);


kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {

                logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
            }
        });


        kafkaStreams.cleanUp();
        kafkaStreams.start();
    }

    }

现在我们的一个客户端正在发送有关 kafka 标头的版本信息,如下所示。

ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic", 1, "message");
record.headers().add(new RecordHeader("version", "v1".getBytes()));
producer.send(record);

基于此标头,我需要为我的消息选择解析器,如何使用 KStream 运算符读取此标头? 我已经看过流的所有 API,但没有方法给出 header

我无法更改为普通的 kafka 消费者,因为我的应用程序已经依赖于几个 KStream API ..


处理器不允许您在下游 DSL 中链接新运算符,您应该使用 TransformValues,以便可以继续使用 Stream DSL:

  1. 首先从 ValueTransformerWithKey 中提取 headers
public class ExtractHeaderThenDoSomethingTransformer implements ValueTransformerWithKey<String, String, String> {

    ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public String transform(String readOnlyKey, String value) {
        Headers headers = context.headers();
        /* take few descsion based on Header: if you want to filter base on then just return null then chaining another filter operator after transformValues*/
        /* How to get the Header */
        return value;
    }

    @Override
    public void close() {

    }
}
  1. 将 ExtractHeaderThenDoSomethingTransformer 添加到您的拓扑中,如下所示:
userStream
        .transformValues(ExtractHeaderThenDoSomethingTransformer::new)
        .map(this::processs);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Streams如何获取kafka headers 的相关文章

随机推荐

  • Node.js MySQL 模块 - 抛出错误; // 重新抛出非 MySQL 错误;

    今天我尝试了来自 w3schools 的 node js mysql 片段 var mysql require mysql var con mysql createConnection host localhost user roots W
  • 如何在 Coq 中使用归纳类型来处理案例

    我想使用destruct通过案例来证明陈述的策略 我在网上读了几个例子 但我很困惑 有人可以更好地解释一下吗 这是一个小例子 还有其他方法可以解决它 但尝试使用destruct Inductive three zero one two Le
  • Visual Studio C++ 是否可以在不链接的情况下编译对象

    我正在运行 VS 2010 SP1 并且有一个每周运行一次的特殊分析配置 因为构建服务器需要很长时间来分析所有内容 我希望此配置无需链接即可运行 如果分析通过了项目中的所有代码 那么我希望构建继续进行下一个项目而不链接 我看不出有什么方法可
  • Python套接字接受块-防止应用程序退出

    我编写了一个非常简单的 python 类 它等待套接字上的连接 目的是将此类粘贴到现有应用程序中 并将数据异步发送到连接的客户端 问题是 当等待 socket accept 时 我无法通过按 ctrl c 来结束我的应用程序 我也无法检测到
  • JDBC 到 Spark Dataframe - 如何确保均匀分区?

    我是 Spark 新手 正在致力于通过 JDBC 从 Postgres 数据库表创建 DataFrame 使用spark read jdbc 我对分区选项有点困惑 特别是分区列 下界 上限 and 分区数 文档似乎表明这些字段是可选的 如果
  • JSON - Spring MVC:如何将 json 数据发布到 spring MVC 控制器

    我在发布 JSON 数据时遇到问题jsp to controller 每次我尝试都会收到 ajax 错误Bad Request 我对 JSON 很陌生 我真的不知道我做错了什么 我搜索并尝试了一些可以在该网站中找到的示例 但仍然遇到问题 在
  • 使用 JAX-WS:如何设置用户代理属性

    我对此进行了搜索并发现了一些未遂事件 我创建了一个 java 客户端来使用 JAX WS 来使用 Web 服务 使用 JAX 时有没有办法设置 HTTP USER AGENT 值 我希望在特定客户端 我的 访问它时获得我的网络服务日志 因此
  • 检测何时连接新显示器

    我正在编写一个需要两个显示器的应用程序 一个用于控制面板 另一个用于输出 我所拥有的是这样的 如果只有一个显示器 应用程序会在其上显示两种表单 但如果有两个显示器 则输出表单将转到另一个 问题是这只在应用程序启动时才会发生 换句话说 如果应
  • 在jsf中使用json将数据从bean发送到javascript

    我想将我的数组列表从 ManagedBean 发送到 JavaScript 代码 我的豆子在这里 public void getDataAsJson String dizi Tokyo Jakarta New York Seoul Mani
  • 如何计算列的平均值,然后将其包含在oracle中的选择查询中?

    我的桌子是 create table mobile id integer m name varchar 20 cost integer 其值为 insert into mobile values 10 NOkia 100 insert in
  • jqplot - 单个值,而不是堆积图中的总计

    In a stacked bar chart we can show total of each series in every stack like this However I want value of each series to
  • Identity.EntityFramework OnModelCreating 是如何调用的

    我正在从事两个类似的项目 但我没有创建其中任何一个 它们都具有相同的本地上下文 如下所示 using Microsoft AspNet Identity EntityFramework public class LocalContext I
  • 如何将uuid存储为数字?

    根据问题的回答 MySQL 中的 UUID 性能 回答者建议将 UUID 存储为数字而不是字符串 我不太确定如何做到这一点 有人可以建议我一些东西吗 我的 ruby 代码如何处理这个问题 如果我理解正确的话 您在主列中使用 UUID 吗 人
  • 如何将 QBASIC PLAY 命令转换为更现代的命令?

    我的 QB 应用程序中有这样的播放命令 PLAY MSe8f 4f 8f 8g8a8b4 a4 g4 f 4 o0b8o1e8e8e4d8e2 我想以某种方式将它们转换为现代应用程序可以使用的东西 有什么想法吗 我目前正在 FreeBasi
  • min_member/2 的反直觉行为

    最小成员 分钟 列表 当 Min 是标准项顺序中最小的成员时为真 如果列表为空 则失败 min member 3 1 2 X X 3 当然 解释是变量在术语的标准顺序中位于所有其他术语之前 并且使用统一 然而 所报告的解决方案感觉有些错误
  • 如何将查询结果映射到 sqlalchemy 中的自定义对象?

    我正在寻找一种方法来告诉 sqlalchemy 将某些 tabes 上的复杂查询映射到自定义类MyResult而不是默认的RowProxy班级 这是一个简单的工作示例 create table foo id integer title te
  • itunesconnect apploader 无效段对齐问题

    伙计们 我想更新我的应用程序最新版本 但应用程序加载器一直给我同样的错误 那就是 错误 ITMS 9000 段对齐无效 此应用程序没有正确的段对齐 应使用最新版本的 Xcode 重新构建 如果您需要进一步帮助 请联系开发者技术支持 我快要疯
  • 防止 ProgressDialog 被 onClick 关闭

    我使用 ProgressDialog 向用户表明他必须等待 并在用户必须等待时使我的应用程序的表面 不可触摸 我向 ProgressDialog 添加了一个按钮 如果某些条件成立 它应该启动一些操作 问题是每次用户按下按钮时 progres
  • Java滑动JPanels

    我有一个显示各种按钮的菜单 我可以让按钮在单击时调用它们各自的 JPanel 问题是我想让 Jpanel 在调用时滑入 而不是立即弹出 我尝试使用补间引擎 作为 Java 初学者 我发现它真的让人不知所措 所以我决定使用定时动画 我能够使顶
  • Kafka Streams如何获取kafka headers

    我有下面的卡夫卡流代码 public class KafkaStreamHandler implements Processor