Kafka 创建 两个topic 一个用于发送信息 一个用于接收Flink处理之后的信息

2023-11-05

Kafka 创建 两个topic 一个用于发送信息 一个用于接收Flink处理之后的信息

Kafka生产者Java代码

package cn.oneseek;
import cn.oneseek.util.JsonData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class Producer {
    public static void main(String[] args) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "localhost:9092");
        //设置数据key和value的序列化处理类
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        //创建生产者实例
        KafkaProducer<String,String> producer = new KafkaProducer<>(props);
        Map<String,String> map = new HashMap<>();
        map.put("单体Json",JsonData.str1);
        map.put("嵌套单体Json",JsonData.str2);
        map.put("数组Json",JsonData.str3);
        map.put("嵌套Json数组",JsonData.str4);
        map.put("变体Json(子Json为单体Json或Json数组)",JsonData.str5);
        for (Map.Entry<String,String> entry:map.entrySet()){
            ProducerRecord record = new ProducerRecord<String, String>("test", entry.getValue());
            //发送记录
            producer.send(record);
        }
        producer.close();
    }
}

Kafka消费者java代码

package cn.oneseek;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
    public static void main(String[] args) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "localhost:9092");
        //必须指定消费者组
        props.put("group.id", "test-consumer-group");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
        //订阅topic1的消息
        consumer.subscribe(Arrays.asList("test1"));
        //到服务器中读取记录
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(100);
            for(ConsumerRecord<String,String> record : records){
                System.out.println(record.value());
            }
        }
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 创建 两个topic 一个用于发送信息 一个用于接收Flink处理之后的信息 的相关文章

  • 算法导论 学习笔记 第六章 堆排序

    实际中 待排序的数很少是单独的数值 它们通常是称为记录的数据集的一部分 每个记录包含一个关键字 即排序问题中要重排的值 记录的剩余部分由卫星数据组成 通常是要与关键字一同存取的 如果每个记录包含大量卫星数据 我们通常重排记录指针的数组 而非
  • JS深层次多级对象Key的遍历方法,将多层级对象扁平化

    文章目录 一 深层次多级对象介绍 问题提出 二 深层次多级对象Key的遍历方法实现 三 验证一下我们的遍历函数 四 将多级对象扁平化 总结与应用 最近整理之前写过的博客 发现深度拷贝对象的一段代码 想着深度拷贝后我怎么来验证两个对象的值 是

随机推荐

  • HierarchicalDataTemplate (一)

    能够帮助层级控件显示层级数据的模板是HierarchicalDataTemplate 一般常用于TreeView控件和MenuItem控件 显示层级数据 数据类 using System Collections Generic namesp
  • 敏捷项目管理之任务看板

    我们最近在多个项目中使用看板项目管理实施敏捷项目开发 有些经验心得 看板优势 看到瓶颈 把控进度 调整策略 让开发可视化 需求分类 必备需求 期望需求 超出预期需求 精益之道 干掉一切不增值业务活动 以客户为中心极速价值交付 又好又快完成领
  • osgEarth的Rex引擎原理分析(三十三)分页瓦片卸载器子节点的作用

    目标 十二 中的问题22 分页瓦片卸载器是在Rex引擎的setMap函数中创建的 创建之初就关联了活跃瓦片寄存器和资源释放器 作用见下面分析 osgEarthDrivers engine rex RexTerrainEngineNode c
  • ip变动导致roscore无法打开

    之前改过ip 导致roscore无法打开 错误代码 Unable to contact my own server at http localhost 60852 This usually means that the network is
  • 硬核虚拟化技术 SR-IOV的原理及探索

    2007年9月 PCI SIG官方发布了 Single Root I O Virtualization and Sharing Specification Revision 1 0 规范 定义了多个System Images如何共享PCI接
  • Android 判断设备是否模拟器

    用过一些网上提供的方法 我这边使用夜神模拟器测试 结果检测为真机 于是想了一个从cpu架构信息来判断的方法 同时支持x86和arm的应该就是模拟器 代码如下 获取 cpu 信息 public static String getCpuInfo
  • Java程序员从阿里面试回来,最后成功拿到阿里offer!

    最近有很多朋友去目前主流的大型互联网公司面试 阿里巴巴 京东 美团 滴滴 面试回来之后会发给我一些面试题 有些朋友轻松过关 拿到offer 但是有一些是来询问我答案的 其实本来真的没打算写这篇文章 主要是自己得记忆力不是很好 不像一些记忆力
  • 百度首页模仿制作(html)详解

    相信大姐学习html这一门语言的第一步一般都是做一个百度首页或者菜鸟教程首页 菜鸟教程首页模仿戳这里 什么的吧 下面就来分享一下我所写的百度首页的模仿过程 1 分析布局 这里我将他的布局分为三大块 上中下各一个div块 2 往div块中添加
  • 【Linux】Ubuntu下C语言访问MySQL数据库入门

    使用的系统是Ubuntu 11 10 数据库是MySQL MySQL数据库环境配置 首先需要安装MySQL客户端和服务器 命令行安装方式为 sudo apt get install mysql server mysql client 然后
  • 一个老鸟发的公司内部整理的 Android 学习路线图 Markdown 版本

    转自 https www diycode cc topics 122 jixiaohua发了一篇一个老鸟也发了一份他给公司内部小伙伴整理的路线图 另一份 Android 开发学习路线图 可惜不是MarkDown格式的 所以jixiaohua
  • html盒子毛玻璃效果,css毛玻璃效果(外加background属性)

    前因 后果 二话不说 上效果 注意 此方法只适合body设置背景图时的模糊 页面布局方面 主要父元素为body 子元素为想要的效果 涉及到的知识点 background filter 定位 伪元素 flex布局 主要为子元素水平居中使用 z
  • idea 快捷键

    注 有些操作的快捷键做了更改 和IntelliJ Idea默认的快捷键不一样 动作 快捷键 说明 Move Caret to Code Block End Ctrl 诸如 围起来的代码块 使用该快捷键可以快速跳转至代码块的结尾处 Move
  • Qt线程与界面

    看了个开源库Stacer 里面使用到了QConcrrent 这个使用很方便 这里简单记录一下总结下Qt的线程创建方法 Qt线程创建方法 QThread继承 QObject moveToThread QConcurrent run Qt中提到
  • pthread_mutex_t

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 1 互斥锁创建 有两种方法创建互斥锁 静态方式和动态方式 POSIX定义了一个宏PTHREAD MUTEX INITIALIZER来静态初始化互斥锁 方法如下 pthrea
  • Mybatis使用注解方式配置

    目录 1 介绍 2 初始化工程 2 1 导包 2 2 导配置 3 使用注解增删改查 3 1 环境准备 3 1 1 建库建表 3 1 2 创建Student实体类 3 1 3 创建StudentDao接口 3 1 4 配置映射关系 3 2 查
  • Python Matplotlib 实用小技巧!

    转自 网络 今天给大家介绍Matplotlib绘图实用的小技巧 1 添加标题 title matplotlib pyplot 对象中有个 title 可以设置表格的标题 import numpy as np import matplotli
  • django前端模板循环多个list

    这一部分主要涉及到后台View的设计 前端HTML页面的设计 后台设计View from django views generic import View class ForTrView View def get self request
  • PCL 区域生长分割(C++详细过程版)

    区域生长 一 概述 二 代码实现 三 结果展示 1 原始点云 2 聚类结果 四 相关链接 一 概述 区域生长分割是PCL里经典的点云聚类分割算法 具体算法原理和实现代码见 PCL 区域生长分割 为充分了解算法实现的每一个细节和有待改进的地方
  • display:none元素不可见,可通过JS方法把它修改为可见

    页面元素无论用什么xpath都无法定位 by id class name都试过了 可以去页面看看是否这个元素的display是none display none方法是设置元素不可见 所以解决办法就是首先通过JS方法把它修改为可见 js do
  • Kafka 创建 两个topic 一个用于发送信息 一个用于接收Flink处理之后的信息

    Kafka 创建 两个topic 一个用于发送信息 一个用于接收Flink处理之后的信息 Kafka生产者Java代码 package cn oneseek import cn oneseek util JsonData import or