Flink java模拟生成自定义流式数据

2023-10-27

思路如下:

  1. 定义一个POJO类,注意flink里使用的类必须有一个无参的构造方法
  2. 自定义DataSource实现SourceFunction接口
  3. 使用ctx.collect()传入想要发送的数据就可以了

首先定义一个POJO类:

class MyData {
    public int keyId;
    public long timestamp;
    public double value;

    public MyData() {
    }

    public MyData(int accountId, long timestamp, double value) {
        this.keyId = accountId;
        this.timestamp = timestamp;
        this.value = value;
    }

    public long getKeyId() {
        return keyId;
    }

    public void setKeyId(int keyId) {
        this.keyId = keyId;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "MyData{" +
                "keyId=" + keyId +
                ", timestamp=" + timestamp +
                ", value=" + value +
                '}';
    }
}

生成自己的数据:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class CreateMyData {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<MyData> sourceStream = env.addSource(new MyDataSource());
        env.setParallelism(3);
        sourceStream.print();
        env.execute();
    }


    private static class MyDataSource implements SourceFunction<MyData> {
        // 定义标志位,用来控制数据的产生
        private boolean isRunning = true;
        private final Random random = new Random(0);

        @Override
        public void run(SourceContext ctx) throws Exception {
            while (isRunning) {
                ctx.collect(new MyData(random.nextInt(5), System.currentTimeMillis(), random.nextFloat()));
                Thread.sleep(1000L); // 1s生成1个数据
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink java模拟生成自定义流式数据 的相关文章

  • Java中有没有一种方法可以通过名称实例化一个类?

    我正在寻找问题 从字符串名称实例化一个类 https stackoverflow com questions 9854900 instantiate an class from its string name它描述了如何在有名称的情况下实例
  • Spring应用中Eureka健康检查的问题

    我正在开发一个基于 Spring 的应用程序 其中包含多个微服务 我的一个微服务充当尤里卡服务器 到目前为止一切正常 在我所有其他微服务中 用 EnableEurekaClient 我想启用这样的健康检查 应用程序 yml eureka c
  • .properties 中的通配符

    是否存在任何方法 我可以将通配符添加到属性文件中 并且具有所有含义 例如a b c d lalalala 或为所有以结尾的内容设置一个正则表达式a b c anything 普通的 Java 属性文件无法处理这个问题 不 请记住 它实际上是
  • 如何在 Spring 中禁用使用 @Component 注释创建 bean?

    我的项目中有一些用于重构逻辑的通用接口 它看起来大约是这样的 public interface RefactorAwareEntryPoint default boolean doRefactor if EventLogService wa
  • 如何更改javaFX中按钮的图像?

    我正在使用javaFX 我制作了一个按钮并为此设置了图像 代码是 Image playI new Image file c Users Farhad Desktop icons play2 jpg ImageView iv1 new Ima
  • 没有 Spring 的自定义 Prometheus 指标

    我需要为 Web 应用程序提供自定义指标 问题是我不能使用 Spring 但我必须使用 jax rs 端点 要求非常简单 想象一下 您有一个包含键值对的映射 其中键是指标名称 值是一个简单的整数 它是一个计数器 代码会是这样的 public
  • 在 junit 测试中获取 javax.lang.model.element.Element 类

    我想测试我的实用程序类 ElementUtils 但我不知道如何将类作为元素获取 在 AnnotationProcessors 中 我使用以下代码获取元素 Set
  • jdbc mysql loginTimeout 不起作用

    有人可以解释一下为什么下面的程序在 3 秒后超时 因为我将其设置为在 3 秒后超时 12秒 我特意关闭了mysql服务器来测试mysql服务器无法访问的这种场景 import java sql Connection import java
  • Spring Boot Data JPA 从存储过程接收多个输出参数

    我尝试通过 Spring Boot Data JPA v2 2 6 调用具有多个输出参数的存储过程 但收到错误 DEBUG http nio 8080 exec 1 org hibernate engine jdbc spi SqlStat
  • Java ResultSet 如何检查是否有结果

    结果集 http java sun com j2se 1 4 2 docs api java sql ResultSet html没有 hasNext 方法 我想检查 resultSet 是否有任何值 这是正确的方法吗 if resultS
  • tomcat 中受密码保护的应用程序

    我正在使用 JSP Servlet 开发一个Web应用程序 并且我使用了Tomcat 7 0 33 as a web container 所以我的要求是tomcat中的每个应用程序都会password像受保护的manager applica
  • 如何对不同的参数类型使用相同的java方法?

    我的问题 我有 2 个已定义的记录 创建对象请求 更新对象请求 必须通过实用方法进行验证 由于这两个对象具有相同的字段 因此可以对这两种类型应用相同的验证方法 现在我只是使用两种方法进行重载 但它很冗长 public record Crea
  • 如何访问JAR文件中的Maven资源? [复制]

    这个问题在这里已经有答案了 我有一个使用 Maven 构建的 Java 应用程序 我有一个资源文件夹com pkg resources 我需要从中访问文件 例如directory txt 我一直在查看各种教程和其他答案 但似乎没有一个对我有
  • 为什么 Java 8 不允许非公共默认方法?

    让我们举个例子 public interface Testerface default public String example return Hello public class Tester implements Testerface
  • 关键字“table”附近的语法不正确,无法提取结果集

    我使用 SQL Server 创建了一个项目 其中包含以下文件 UserDAO java public class UserDAO private static SessionFactory sessionFactory static se
  • 如何使用 jUnit 将测试用例添加到套件中?

    我有 2 个测试类 都扩展了TestCase 每个类都包含一堆针对我的程序运行的单独测试 如何将这两个类 以及它们拥有的所有测试 作为同一套件的一部分执行 我正在使用 jUnit 4 8 在 jUnit4 中你有这样的东西 RunWith
  • Cucumber 0.4.3 (cuke4duke) 与 java + maven gem 问题

    我最近开始为 Cucumber 安装一个示例项目 并尝试使用 maven java 运行它 我遵循了这个指南 http www goodercode com wp using cucumber tests with maven and ja
  • 我如何在java中读取二进制数据文件

    因此 我正在为学校做一个项目 我需要读取二进制数据文件并使用它来生成角色的统计数据 例如力量和智慧 它的设置是让前 8 位组成一个统计数据 我想知道执行此操作的实际语法是什么 是不是就像读文本文件一样 这样 File file new Fi
  • Opencv Java 灰度

    我编写了以下程序 尝试从彩色转换为灰度 Mat newImage Imgcodecs imread q1 jpg Mat image new Mat new Size newImage cols newImage rows CvType C
  • 使用反射覆盖最终静态字段是否有限制?

    在我的一些单元测试中 我在最终静态字段上的反射中遇到了奇怪的行为 下面是说明我的问题的示例 我有一个基本的 Singleton 类 其中包含一个 Integer public class BasicHolder private static

随机推荐

  • 蓝牙学习笔记

    Bluetoth学习笔记 前言 蓝牙技术其实包括BR EDR LE以及AMP三种 其中BR是传统的蓝牙技术 也是我们最常用的一种 LE是低功耗模式 是目前流行的模式 三种模式RF都使用2 4GHz ISM Industrial Scient
  • FPGA开发软件详细清单

    前言 主流的FPGA公司有ISE ALTERA LATTICE三家 各自的开发软件也不一样 1 Xilinx公司软件链接 1 ISE 14 7 百度云链接 https pan baidu com s 1O E y7RsDvbOWjvDKQf
  • A卡和N卡

    NVIDIA 全称为NVIDIA Corporation NASDAQ NVDA 官方中文名称英伟达 A卡 AMD的卡 N卡 英伟达的卡 DirectXDirectCompute对手是OpenGL opencl 对手是cuda AMD的卡特
  • QT中的QMQTT通信

    1 将qmqtt的包下载完之后加载到工程中 qt中qmqtt通讯包 桌面系统文档类资源 CSDN下载 2 h文件添加下列语句 QMQTT Client client 3 在cpp文件中初始化 client new QMQTT Client
  • 21张让你Python代码能力突飞猛进的速查表

    随着深度学习的蓬勃发展 越来越多的小伙伴们开始使用python作为主打代码 python有着种类繁多的第三方库 这里为大家从网络上收集了一些代码速查表 21张让你Python代码能力突飞猛进的速查表随着深度学习的蓬勃发展 越来越多的小伙伴们
  • php 服务器端主动发数据到客户端解决办法

    1 在客户端使用隐藏 iframe 其src指向服务器端的控制代码 比如 server php 2 在server php 通过 while 实现永不结束的请求 在循环内 实现断点 比如 sleep 2 表示每2秒钟循环一次 3 在每次循环
  • vue-admin-template的基本使用

    目录 NodeJs基础 NPM使用 yarn使用 模块化开发 使用vue admin template 修改请求路径 修改路由 先使用vsCode创建一个工作区 创建一个空的文件夹 使用vsCode打开这个文件夹 将文件夹另存为工作区 最终
  • react的map循环嵌套

    var btnType Object keys obj map key i gt var item obj key map s index gt return
  • cublasSgetriBatched的input matrix A 的值,在計算之後是否被改變或叫做污染,答案是No

    基於Nvidia的sample源文件改寫 可以發現 Sgetri的輸入矩陣A的元素值 并沒有改變 編譯的話 在cudaSample對應的blas文件夾中置入如下cu文件 并且修改對應的makefile裏的變量名字來編譯運行 Copyrigh
  • Equal Sums CodeForces - 988C(map+pair应用)

    题意 小A有 n 个整数数列 a1 a2 an 每个数列的长度为li 请你找出两个编号不同的数列 并从这两个数列中各恰好删除一个数 使得这两个数列的和相等 AC代码 参考了大佬的博客 include
  • 【Matlab】基于SVM支持向量机的数据回归预测(Excel可直接替换数据)

    Matlab 基于SVM支持向量机的数据回归预测 Excel可直接替换数据 1 模型原理 2 文件结构 3 Excel数据 4 分块代码 5 完整代码 6 运行结果 1 模型原理 支持向量机 Support Vector Machine S
  • hadoop之MapReduce

    MapReduce的处理过程分为两个步骤 map和reduce 每个阶段的输入输出都是key value的形式 key和value的类型可以自行指定 map阶段对切分好的数据进行并行处理 处理结果传输给reduce 由reduce函数完成最
  • 利用ApiPost实现Mock Server服务

    APIPOST可以让你在没有后端程序的情况下能真实地返回接口数据 你可以用APIPOST实现项目初期纯前端的效果演示 也可以用APIPOST实现开发中的数据模拟从而实现前后端分离 在使用APIPOST之前 你的团队实现数据模拟可能是下面的方
  • gitHub OpenSSL SSL_read: Connection was reset, errno 10054 解决方法

    gitHub OpenSSL SSL read Connection was reset errno 10054 解决方法 将原先使用密码提交的方法 更换为使用基于令牌 token 的身份验证提交 获取方法令牌 token 的步骤如下 在这
  • 最近涉猎的东西总结

    1 Erlang 2 RabbitMQ Java Client 3 Java AES 加密类库 4 Jackson类库 操作Json格式字符串
  • SpringMVC之JSON数据返回与异常处理机制

    目录 一 SpringMVC的JSON数据返回 1 导入Maven依赖 2 配置spring mvc xml 3 ResponseBody注解的使用 3 1案例演示 1 List集合转JSON 2 Map集合转JSON 3 返回指定格式St
  • buck电路上下管_关于Buck变换器上管MOSFET开关速度的优劣势是什么?

    答 Buck变换器上管MOSFET开关速度的分析及提高 目前 Buck变换器在电脑主板 通信电源 手机等电子产品中的应用越来越广泛 随着全世界节能减排的发展 Buck变换器的效率成为评价电源系统性能优劣及可靠性的最重要指标 Buck电路的效
  • string数组转int数组

    string数组类型转换为int数组 方法一 ConvertAll的用法 1 public static int StrToInt string str 2 3 return int Parse str 4 5 6 string arrs
  • 华为CE12808/S9700交换机istack/CSS堆叠主备倒换操作命令步骤

    一 华为CE12808交换机 istack堆叠状态 1 设备型号 交换机一 HUAWEI CE12808 交换机二 HUAWEI CE12808 2 istack堆叠主备倒换操作步骤 2 1 设备当前配置保存并进行备份 2 2 切换所用命令
  • Flink java模拟生成自定义流式数据

    思路如下 定义一个POJO类 注意flink里使用的类必须有一个无参的构造方法 自定义DataSource实现SourceFunction接口 使用ctx collect 传入想要发送的数据就可以了 首先定义一个POJO类 class My