Flink 从 kafka 中读取数据并输出到 kafka

2023-05-16

Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据。kafka和flink二者被称为当前处理流式数据的双子星。

下面我们将从以下几个步骤展开讲解:

目录

一 添加maven依赖

二 编写flink程序

从kafka读取数据

输出数据到kakfka

三  启动kafka集群

四 运行flink程序


一、添加maven依赖

<!--kafka connector-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.1</version>
</dependency>

二、编写flink程序

老规矩,先上代码 再做介绍

代码如下:

package com.flink.wc.myflink.source;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class mysource_kafka_kafka {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 配置kafka集群信息  properties是java中的一个集合类, 多用于 配置参数, 它继承于 Hashtable,表示一个持久的属性集.属性列表中每个键及其对应值都是一个字符串。
        // 这里和在kafka javaAPI中配置kafka信息时一样
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-001:9092");
            // 配置序列化 
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        
        // 从kafka中读取数据
        DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>("myflink_source", new SimpleStringSchema(), properties));

        stream.print("flink");

        // 将数据输出到kafka
        stream.addSink(new FlinkKafkaProducer<String>("myflink_sink", new SimpleStringSchema(), properties));
        
        env.execute();

    }
}

1、从kafka读取数据

通过addSource()方法传入一个SourceFunction的实现类

FlinkKafkaConsumer() 就是这个实现类 很好理解 就是实例化一个flink程序的kafka消费者

源代码中FlinkKafkaConsumer类 构造函数如下:三个参数分别是 (kafka主题, 反序列化对象, kafka集群配置信息)

public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
    this(Collections.singletonList(topic), valueDeserializer, props);
}

2、输出数据到kakfka

通过addSink()方法传入一个SinkFunction的实现类

FlinkKafkaProducer () 就是这个实现类 也很好理解 就是实例化一个flink程序的kafka生产者

构造函数如下:三个参数分别是 (kafka主题, 序列化对象, kafka集群配置信息)

public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
    this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner()));
}

三、启动kafka集群

// 启动zookeeper
// 启动kafka
(base) [hadoop@hadoop-001 ~]$ jps
1410 QuorumPeerMain
6583 NameNode
8121 Jps
8058 Kafka
6798 DataNode

// hadoop-001 上启动生产者:
(base) [hadoop@hadoop-001 ~]$ kafka-console-producer.sh --bootstrap-server hadoop-001:9092 --topic myflink_source

// hadoop-002 上启动消费者 :
(base) [hadoop@hadoop-002 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop-001:9092 --topic myflink_sink

四、运行flink程序

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 从 kafka 中读取数据并输出到 kafka 的相关文章

  • Flink checkPoint和SavePoint

    savepoint和checkpoint都是flink为容错提供的强大功能特性 能够自动或手动保存job的运行状态 两者区别 checkpoint 应用定时触发 用户保存状态 会过期 内部应用失败重启的时候启用 但是手动cancel时 会删
  • Flink设置Source数据源使用kafka获取数据

    流处理说明 有边界的流bounded stream 批数据 无边界的流unbounded stream 真正的流数据 Source 基于集合 package com pzb source import org apache flink ap
  • windows python kafka 初级使用

    今天花了点时间在这个kafka上 因为我们工作中也用到了kafka 我这边对于kafka的理解是能用或者知道基本原理就行 实现在自己的windows环境搭建一次kafka 然后使用python进行数据的生产和消费 如果之后工作中对于kafk
  • 仿kafka实现java版时间轮

    系统定时 超时 在我们平时的项目开发中 会设置系统的超时时间 比如在http接口中设置超时时间 在定时调度中也会用到 在jdk的开发的实现Timer和ScheduledThreadPoolExecutor DelayQueue定时调度中使用
  • kafka消费者客户端线程安全以及多线程实现并发读取消息

    kafka的生产者客户端Producer是线程安全的 但是消费者客户端是非线程安全的 每次操作时都会调用accqure方法用来确定当前只有一个线程操作 如果有多个线程在操作 会抛出CME异常 针对这种情况 为了能够多线程更快速的读取消息 可
  • Kafka : KafkaProducer Closing the kafka producer with timeoutMillis

    1 美图 2 背景 一段kafka写入程序 不晓得为啥突然发现很多奇怪的日志 kafka 多线程发送数据 然后在本地是可以的 在服务器上是偶现的 我写了一个本地程序多线程生产数据 发现是没有问题的 Test public void mult
  • ELK配置记录(filebeat+kafka+Logstash+Elasticsearch+Kibana)

    一 简介 elk日志平台 日志收集 分析和展示的解决方案 满足用户对 志的查询 排序 统计需求 elk架构 filebeat 采集 kafka Logstash 管道 Elasticsearch 存储 搜索 Kibana 日志应用 各组件功
  • kafka问题解决:org.apache.kafka.common.errors.TimeoutException

    记录使用kafka遇到的问题 1 Caused by java nio channels UnresolvedAddressException null 2 org apache kafka common errors TimeoutExc
  • flink大数据处理流式计算详解

    flink大数据处理 文章目录 flink大数据处理 二 WebUI可视化界面 测试用 三 Flink部署 3 1 JobManager 3 2 TaskManager 3 3 并行度的调整配置 3 4 区分 TaskSolt和parall
  • [Docker]使用Docker部署Kafka

    Kafka 是一个分布式流处理平台 它依赖于 ZooKeeper 作为其协调服务 在 Kafka 集群中 ZooKeeper 负责管理和协调 Kafka 的各个节点 因此 要在 Docker 容器中启动 Kafka 通常需要同时启动一个 Z
  • WebSocket + kafka实时推送数据(springboot纯后台)

    逻辑 kafka订阅消费者主题 消费后通过webSocket推送到前端 kafka vue financial webSocket 学习引用 SpringBoot2 0集成WebSocket 实现后台向前端推送信息 World Of Mos
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • 公司实战 ElasticSearch+Kafka+Redis+MySQL

    一 需求 前一段时间公司要进行数据转移 将我们ES数据库中的数据转移到客户的服务器上 并且使用定时将新增的数据同步 在这过程中学到了很多 在此记录一下 二 技术栈 Mysql Redis ElasticSearch Kafka 三 方案 为
  • Kafka性能保证和延时队列实现原理

    数据不丢不漏 不重不错 一 不丢 生产写入消息不丢失 数据组织形式 首先 从数据组织形式来说 kafka有三层形式 kafka有多个主题 topic 每个主题有多个分区 分区分为主分区和副本分区 每个分区又有多条消息 而每个分区可以分布到不
  • 【Docker安装部署Kafka+Zookeeper详细教程】

    Docker安装部署Kafka Zookeeper Docker拉取镜像 Docker拉取zookeeper的镜像 docker pull zookeeper Docker拉取kafka的镜像 docker pull wurstmeiste
  • Kafka——Mac搭建kafka环境

    1 下载Kafka安装包 下载地址 将压缩包移动到 usr local mv kafka 2 12 3 1 0 tgz usr local 解压 tar zxvf kafka 2 12 3 1 0 tgz 2 启动 启动zookeeper
  • Kafka 权威指南

    Kafka 权威指南 这本书于 2021 年看完 2022 年又看了一遍 感觉书读百遍 其义自现 这本书侧重于 Kafka 的理论知识 虽然书有点老 但是其中关于 Kafka 的基础知识的章节讲得确实不错 适合学习 Kafka 的新手以及
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的

随机推荐

  • 「数字信号处理」MATLAB设计 双音多频拨号系统

    前言 实验目的 xff1a 用Matlab模拟实现双音多频拨号系统 输入 xff1a 一串数字模拟电话号码 输出 xff1a 检测出的电话号码 Matlab版本 xff1a 2021b 系统 xff1a MacOS 实验方法 xff1a 查
  • 「STM32入门」TIM定时中断

    定时器的简介 定时器可以对输入的时钟进行计数 xff0c 并在计数值达到设定值时触发中断 xff0c 在中断内可以执行中断事件不仅具备基本的定时中断功能 xff0c 而且还包含内外时钟源选择 xff0c 主从触发模式 xff0c 输入捕获
  • 二极管反向恢复过程详细解析

    二极管反向恢复过程 xff0c 现代脉冲电路中大量使用晶体管或二极管作为开关 或者使用主要是由它们构成的逻辑集成电路 而作为开关应用的二极管主要是利用了它的通 电阻很小 断 电阻很大 特性 即二极管对正向及反向电流表现出的开关作用 二极管和
  • 性能优化总结

    性能优化关注点 从图中可以看出 xff0c 性能优化的主要关注 xff1a CPU 内存 磁盘IO 网络IO等四个方面 性能指标 每个关注点都有对应的指标 xff0c 吞吐率 响应时间 QPS IOPS TP99 资源使用率是我们经常关注的
  • Python:优先队列的使用及类的自定义比较函数

    Priority queue模块 该模块定义的优先级队列 xff0c 其内部使用了 heapq 模块 xff0c 所以它的时间复杂度和heapq是相同的 当一个对象的所有元素都是可比较的时 xff0c 默认情况下是根据队列中的对象的第一个元
  • 「STM32入门」USART串口通信

    通信 通信的目的 xff1a 将一个设备的数据传送到另一个设备 xff0c 扩展硬件系统 通信协议 xff1a 制定通信的规则 xff0c 通信双方按照协议规则进行数据收发 STM32常见的通信协议 本文将介绍USART 概念解释 TX R
  • 「数字信号处理」采样过程与内插恢复完整图解

    内插与采样的关系 来源 xff1a 数字信号处理 采样与内插 DSP期末知识点题型4 哔哩哔哩 bilibili
  • 配置vscode作为STM32代码的编辑器(替代keil5)。实现:代码自动补全, 编译,下载。nRF52也可以编译。

    STM32CubeMX新建好工程在工程根目录新建文件夹 vscode在 vscode 文件夹内新建文件c cpp properties json 34 configurations 34 34 name 34 34 STM32 34 任意的
  • Python中的字典

    1 字典概念 Python内置的数据结构之一 xff0c 与列表一样是一个可变序列 以键值对的方式存储数据 xff0c 字典是一个无序的序列 xff08 列表是有序的 xff09 字典通过计算key的hash值确定存储位置 xff0c 所以
  • docker中使用cuda

    需要注意的事项 1 注意应用NVIDIA在docker hub上提供的镜像 本次使用的是 nvidia cuda 10 1 cudnn7 devel ubuntu18 04 2 在宿主机安装 nvidia docker2 3 运行时 要加
  • Mysql8.0 忘记密码怎么办

    Mysql8 0 忘记密码怎么办 今天晨雨帮身边小伙伴解决了mysql8 0无法连接上的问题 中间碰到的一些问题和大家分享一下 跳过密码登录时不成功修改密码时不成功navicat连接时报 2000的问题 1 首先先停止mysql服务 可通过
  • Kurento实战之二:快速部署和体验,Java笔试题编程题

    GitCommit 269548fa27e0089a8b8278fc4fc781d7f65a939b runc Version 1 0 0 rc92 GitCommit ff819c7e9184c13b7c2607fe6c30ae19403
  • QT使用render时pixmap背景不为透明的解决办法

    当我们需要将一个界面绘制成图片时 xff0c 就需要使用到render方法 QPixmap pixmap pwidget gt size pwidget gt render amp pixmap 如果pwidget背景为透明时 xff0c
  • 【iOS】—— 高德地图SDK基础使用

    最近稍微学了学iOS调用高德的SDK xff0c 就随便做做笔记 注意 xff1a 本篇博客基于高德地图SDK的3D地图来写的 xff0c 若使用的是2D地图可能有的方法可能有所不同 xff0c 比如自定义定位蓝点之类的 一 准备工作 xf
  • 如何做代码Code Review

    预防胜于治疗 xff0c 研究表明高效的 Code Review 可以发现70 90 的 bug xff0c Review 作用如下 xff1a 提高团队代码标准 xff0c 所有人共享同一套标准 xff0c 阻止破窗效应 推动团队合作 r
  • 【iOS】—— 浅谈UISearchController

    UISearchController是iOS的一个系统的搜索控件 xff0c 在平时我们输入信息的时候会出现相应的联想搜索的内容 xff0c 然后通过UITableView展示到搜索框的下面 xff0c 供我们选择 原本还想用UITextF
  • Linux Shell中的正则表达式

    Linux Shell中的正则表达式 正则表达式是什么正则表达式通配符 cut命令awk命令sedsort排序命令wc统计命令 正则表达式是什么 正则表达式是用于描述字符排列和匹配模式的一种语法规则 它主要用于字符串的模式分割 匹配 查找及
  • 【Linux】刚烧录完(相当于是第1次连接),VNC树莓派无法连接

    文章目录 解决方法如下 xff1a 1 在Terminal中输入 96 vncserver 96 2 在Terminal中再输入 96 sudo raspi config 96 3 输入连接即可 刚烧录完 xff0c 然后用 ifconfi
  • QT 配置Opencv+gdal心得

    本人研究僧一枚 xff0c 老师给了使用QT开发遥感图像相关程序的课题 xff0c 完全从零开始学习 xff0c 查阅了许多的资料 xff0c 过程里东拼西凑 xff0c 碰壁无数 所以我就想写一些学习的心得体会 xff0c 给自己复习使用
  • Flink 从 kafka 中读取数据并输出到 kafka

    Kafka 是一个分布式的基于发布 订阅的消息系统 xff0c 本身处理的也是流式数据 kafka和flink二者被称为当前处理流式数据的双子星 下面我们将从以下几个步骤展开讲解 xff1a 目录 一 添加maven依赖 二 编写flink