Flink RocketMQ Connector实现

2023-11-11

Flink内置了很多Connector,可以满足大部分场景。但是还是有一些场景无法满足,比如RocketMQ。需要消费RocketMQ的消息,需要自定时Source。

一、自定义FlinkRocketMQConsumer

参考FlinkKafkaConsumer:

public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T>{}

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {}

public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> {}

public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {}

public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceFunction.SourceContext<T> var1) throws Exception;

    void cancel();

    @Public
    public interface SourceContext<T> {
        void collect(T var1);

        @PublicEvolving
        void collectWithTimestamp(T var1, long var2);

        @PublicEvolving
        void emitWatermark(Watermark var1);

        @PublicEvolving
        void markAsTemporarilyIdle();

        Object getCheckpointLock();

        void close();
    }
}

可以看到,自定义的Source,只需要实现SourceFunction。

创建FlinkRocketMQConsumer,实现SourceFunction,重写run()和cancel()方法

public class FlinkRocketMQConsumer implements SourceFunction<String> {
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
         
    }

    @Override
    public void cancel() {
         
    }
}

需要准备一个RocketMQ的消费者客户端,在pom中添加如下依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
    <scope>provided</scope>
</dependency>

对于FlinkRocketMQConsumer来说,需要初始化一个consumer,代码如下:

public class FlinkRocketMQConsumer implements SourceFunction<String> {

    private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");

}

这样,在FlinkRocketMQConsumer类加载的时候,就会初始化一个consumer。

另外,还需要对consumer进行初始化,需要知道nameSrvAddr和topic,所以添加一个构造方法,对consumer进行初始化

public class FlinkRocketMQConsumer implements SourceFunction<String> {
  private String nameSrvAddr;
  private String topic;  
  public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {
    this.nameSrvAddr = nameSrvAddr;
    this.topic = topic;
  }
  ...
}

重写run方法和cancel方法

@Override
public void run(SourceContext<String> ctx) throws Exception {
    consumer.setNamesrvAddr(nameSrvAddr);
    consumer.subscribe(topic, "*");

    consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
        msgs.forEach(msg -> {
            ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));
        });
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();

    // 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭
    // 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭
    // 所以,不能让程序走完
    while (true) {
        Thread.sleep(10);
    }
}

@Override
public void cancel() {
    consumer.shutdown();
}

完整代码如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import java.nio.charset.Charset;

/**
 * @author Johnson
 * @version 1.0
 * @description
 * @create 2023-03-20 10:02
 */
public class FlinkRocketMQConsumer implements SourceFunction<String> {

    private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");

    private String nameSrvAddr;
    private String topic;

    public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {
        this.nameSrvAddr = nameSrvAddr;
        this.topic = topic;
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        consumer.setNamesrvAddr(nameSrvAddr);
        consumer.subscribe(topic, "*");

        consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
            msgs.forEach(msg -> {
                ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();

        // 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭
        // 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭
        // 所以,不能让程序走完
        while (true) {
            Thread.sleep(10);
        }
    }

    @Override
    public void cancel() {
        consumer.shutdown();
    }
}

二、方法调用

package rocketmq;

import com.source.FlinkRocketMQConsumer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author Johnson
 * @version 1.0
 * @description
 * @create 2023-03-21 10:30
 */
public class FlinkRocketMQConsumerDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> rmqDS = env.addSource(new FlinkRocketMQConsumer("***:9876", "test_rmq"));
        rmqDS .print("**********");
        env.execute("FlinkRocketMQConsumerDemo");
    }
}

到这来,就可以正常消费RocketMQ中的数据,控制台输出如下。

三、隐患

在FlinkRocketMQConsumer中,为了正常调用SourceContext(ctx),使用可一个线程一直占用,不让run方法结束,目前是可以正常运行,但是能不能经受得起时间检验,会不会给以后埋下隐患,还有待观察。

关于这一点,是否有更好的实现方法,欢迎各位技术大佬留言发表见解。。。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink RocketMQ Connector实现 的相关文章

随机推荐

  • SQL书写规范/SQL编码规范,这一篇就够了

    SQL代码规范 SQL Structured Query Language 结构化查询语言 在数据分析 数据开发 数据库 大数据等的领域 具有不可或缺的地位 运用十分广泛 规范化的SQL代码 能够提高代码的可读性 有利于快速定位问题 更有利
  • JavaWeb学习之Ajax、JSON08

    目录 1 概念 2 JQeury实现方式Ajax方式 2 JSON 1 概念 2 语法 1 基本规则 2 获取数据 1 JS0N转为 ava对象 2 Java对象转换JS0N 1 概念 异步的 avaScript和XML 1 异步和同步 客
  • Linux: ubuntu Appium连接手机

    1 Android或者HarmonyOS 开启开发者模式 1 打开手机设置 找到 关于手机 点击进去 2 点击 版本号 多点击几次 确保手机处于 开发者模式 2 Android或者HarmonyOS 开启USB调试 1 打开手机设置 找到
  • JavaScript-判断一个字符串是否为另一个字符串的前缀或后缀(startsWith方法与endsWith方法)

    1 startWith方法 用来判断当前字符串是否以另外一个给定的子字符串开头 并根据判断结果返回 true 或 false 运用举例 var str world console log str startsWith wor 输出true
  • 阿里云Centos7下安装Mongodb

    这个安装竟然只花了十分钟不到 首先打开xshell 连接阿里云服务器 我们进去后在根目录下面用命令下载安装包 用如下xshell命令 wget https fastdl mongodb org linux mongodb linux x86
  • DNS配置异常无法上网(明明有网,但是网页打不开了?)

    原因DNS无法解析网址 可以设置一个稳定好用的 1 打开网络设置 2 选择适配器选项 3 打开正在使用的网络 选择属性 4 双击点开协议版本Internet4 5 把DNS改为自己设置 6 将DNS服务器改为下表中的任意一个 可以自己试 那
  • Python爬虫最强项目案例之——JS逆向。这波学到就是赚到。

    前言 前段时间看到有人js逆向了某手机的反馈专区 我也第一时间学习了一下 学完后一直想着凭借自己的能力 看能不能单独完成一次 拿下js逆向真正第一血 所以就有了今天的受害者 某蓝厂手机圈子的逆向 网站分析 既然选定了目标 那就开始抓包 分析
  • 【设计模式】责任链模式

    顾名思义 责任链模式 Chain of Responsibility Pattern 为请求创建了一个接收者对象的链 这种模式给予请求的类型 对请求的发送者和接收者进行解耦 这种类型的设计模式属于行为型模式 在这种模式中 通常每个接收者都包
  • ctf—web合集

    0x00 前言 本篇主要是针对不同的做题方法和思路将wp进行分类 从而更方便大家进行索引 CTF 加解密合集 CTF Web合集 网络安全知识库 文中工具皆可关注 皓月当空w 公众号 发送关键字 工具 获取 PHP 1 变量空间绕过 ctf
  • 【Linux】进程信号“疑问?坤叫算信号吗?“

    鸡叫当然也算信号啦 文章目录 前言 一 认识信号量 二 信号的产生 1 调用系统函数向进程发信号 2 由软件条件产生信号 3 硬件异常产生信号 总结 前言 信号在我们生活中很常见 下面我们举一举生活中信号的例子 你在网上买了很多件商品 再等
  • 第14课:生活中的策略模式——怎么来不重要,人到就行

    用程序来模拟生活 从剧情中思考策略模式 策略模式 策略模式的模型抽象 类图 模型说明 设计要点 优缺点 实战应用 应用场景 故事剧情 Tony 在北京漂泊了三年 在这期间有很多的美好 也有很多心酸 有很多期待 也有很多失落 可终究还是要离开
  • 使用Docker-compose部署SpringBoot项目

    目录 一 概述 二 安装 三 构建目录结构 四 控制服务的启动顺序 五 编写配置文件 六 启动Docker compose 一 概述 Docker Compose是 docker 提供的一个命令行工具 用来定义和运行由多个容器组成的应用 使
  • Kalman滤波器从原理到实现

    Kalman滤波器的历史渊源 We are like dwarfs on the shoulders of giants by whose grace we see farther than they Our study of the wo
  • 用741运算放大器搭建RC正弦振荡器:文氏电桥振荡电路

    实验目的 了解正弦振荡器的工作原理 加强仿真multisim软件的运用水平 加强对电路的理解 搭建电路的动手能力 了解个元器件之间的配合 实验电路原理图 左侧为仿真电路 右侧为振荡波形 电路原理及其分析 I RC正弦波振荡电路又称文氏电桥振
  • linux性能分析工具专题-perf(事件采样,全面性能分析)

    文章目录 概述 perf概念 perf的工具集合介绍 perf的事件介绍 perf list参看 常用perf性能查看工具使用 perf stat 运行一个命令并且统计过程事件 perf top 输出系统某个事件热度函数或者指令排序 per
  • ArcGIS Pro python 获取一百多幅栅格的平均值

    目标 我需要计算流域内的平均等效水柱高 等效水柱高的格网已经创建好 如下 每个月一个这样的栅格格网 现在有近两百个格网需要求平均等效水柱高 要求 按照时间顺序求取每一个格网的平均等效水柱高 并生成时间序列表单 如下 看到arcpro上显示栅
  • 有空就看看的leetcode1——两数之和(c++版)

    有空就看看的leetcode1 两数之和 c 版 学习前言 两数之和题目 几个需要用到的函数 解法 1 遍历法 2 哈希表法 学习前言 有点紧张 决定看看leetcode 两数之和题目 给定一个整数数组 nums 和一个目标值 target
  • 多线程进阶(上)

    目录 一 常见的锁策略 二 CAS 三 synchronized的优化 一 常见的锁策略 1 乐观锁和悲观锁 乐观锁 从名字上来看就可以看出来 这个很乐观 这个会预期锁冲突的概率很低 就会认为锁即将要被解除了 不需要等待很久 因此上 乐观锁
  • postgresql数据库linux centos7 安装

    简介 百度百科 PostgreSQL是一种特性非常齐全的自由软件的对象 关系型数据库管理系统 ORDBMS 是以加州大学计算机系开发的POSTGRES 4 2版本为基础的对象关系型数据库管理系统 POSTGRES的许多领先概念只是在比较迟的
  • Flink RocketMQ Connector实现

    Flink内置了很多Connector 可以满足大部分场景 但是还是有一些场景无法满足 比如RocketMQ 需要消费RocketMQ的消息 需要自定时Source 一 自定义FlinkRocketMQConsumer 参考FlinkKaf