FLINK SQL实战案例之商品销量实时统计

2023-10-26

问题导读

1.本文的业务包含哪些流程?
2.本文难点在什么地方?
3.如何通过flink sql实现商品销量实时统计?
 

1、案例背景介绍
互联网电商往往需要对订单商品销量实时统计,用于实时大屏展示,库存销量监控等等。本文主要介绍如何通过flink sql的方式进行商品实时销量的统计。

业务流程介绍:

1.使用otter采集业务库binlog数据输出到kafka

2.flink读取kafka数据进行商品销量统计

3.统计结果输出到mysql

4.下游业务系统直接读取mysql数据

业务需求介绍:

根据订单创建时间统计商品每天的实时销量,不包含取消订单的商品

2、准备工作
将mysql订单相关的binlog日志实时同步到kafka对应的Topic,然后创建对应的flink table source表。

为了简化需求,下面的订单表和订单明细表只列出主要的字段。

订单主表:orders
 

字段名

数据类型

注释

order_no

VARCHAR

订单编号

order_status

int

订单状态(0:已取消,1:待支付,2:已支付,3:已出库,... )

pay_time

timestamp

支付时间

create_time

timestamp

订单创建时间

update_time

timestamp

订单更新时间


订单明细表:order_detail
 

字段名

数据类型

注释

order_no

VARCHAR

订单编号

product_code

VARCHAR

商品编码

quantity

int

商品数量

create_time

timestamp

订单创建时间

update_time

timestamp

订单更新时间


3、难点解析
同一个订单会有多次业务操作(例如下单、付款、发货,取消等等),每一次业务操作都会导致订单状态发生变化,并且每次变化订单表对应的Binlog日志会产生一条订单号相同的数据。如果我们不做处理直接关联聚合查询的话会导致数据重复统计结果不正确。因此我们需要了解业务系统都有哪些操作会对订单主表和订单明细进行更新操作。

假设业务系统数据变更是这样的:

用户下单后新增订单主表和订单明细表数据
后续的业务操作只会更新订单主表数据,订单明细表数据不会更新变化
数据每次更新update_time字段都会同时变化
再来看一下我们的需求如何处理:

需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。根据需求和订单数据更新的特点,这里需要用到flink回撤流的特性来处理该需求。flinksql可以使用row_number() over(partition by order_no order by update_time desc) 通过限制 where rn=1来获取同一订单的最新状态数据,然后和订单明细表进行关联求和。flinksql会自动更新统计结果。

4、编写业务逻辑

--订单主表source table
CREATE TABLE orders 
            (
               order_no     string,
               order_state  int,
               pay_time     string,
               create_time  string,
               update_time  string
             ) 
       WITH (
               'connector.type' = 'kafka',       
               'connector.version' = 'universal', --kafka版本    
               'connector.topic' = '_tporders',--kafkatopic
               'connector.properties.zookeeper.connect' = 'localhost:2181', 
               'connector.properties.bootstrap.servers' = 'localhost:9092',
               'connector.properties.group.id' = 'testGroup',
               'connector.startup-mode' = 'latest-offset',
               'format.type' = 'json' --数据为json格式             
             )
             
--订单明细表source table
CREATE TABLE order_detail 
            (
               order_no     string,
               product_code string,
               quantity     int,
               create_time  string,
               update_time  string
             ) 
       WITH (
               'connector.type' = 'kafka',       
               'connector.version' = 'universal', --kafka版本    
               'connector.topic' = 'tp_order_detail',--kafkatopic
               'connector.properties.zookeeper.connect' = 'localhost:2181', 
               'connector.properties.bootstrap.servers' = 'localhost:9092',
               'connector.properties.group.id' = 'testGroup',
               'connector.startup-mode' = 'latest-offset',
               'format.type' = 'json' --数据为json格式             
             )
--mysql统计结果表sink table
--mysql建表时指定主键为order_date,product_code,flink写入数据时相同主键会进行更新
CREATE TABLE product_sale
             (
              order_date string,
              product_code string,
              cnt int
              ) 
         WITH (
           'connector.type' = 'jdbc', 
           'connector.url' = 'jdbc:mysql://localhost:3306/flink?serverTimezone=UTC&useSSL=true', 
           'connector.table' = 'order_state_cnt', 
           'connector.driver' = 'com.mysql.cj.jdbc.Driver', 
           'connector.username' = 'root',
           'connector.password' = 'root',
           'connector.write.flush.max-rows' = '1',--默认每5000条数据写入一次,测试调小一点
           'connector.write.flush.interval' = '2s',--写入时间间隔
           'connector.write.max-retries' = '3'
         )
         
 --统计商品销量并写入mysql        
insert into product_sale 
select create_date,product_code,sum(quantity)
from (select t1.order_no,
             t1.create_date,
             t2.product_code,
             t2.quantity
       from (select order_id,
                    order_status,
                    substring(create_time,1,10) create_date,
                    update_time ,
                    row_number() over(partition by order_no order by update_time desc) as rn
              from orders
              )t1
       left join order_detail t2
            on t1.order_no=t2.order_no
      where t1.rn=1--取最新的订单状态数据
      and t1.order_status<>0--不包含取消订单
   )t3
 group by create_date,product_code

5数据测试

假设在13点创建了两个订单,数据如下:

订单主表数据:

order_no

order_state

pay_time

create_time

update_time

order1

1

 

2020-04-01 13:00:00

2020-04-01 13:00:00

order2

1

 

2020-04-01 13:00:00

2020-04-01 13:00:00

订单明细数据:

order_no

product_code

quantity

create_time

update_time

order1

product1

3

2020-04-01 13:00:00

2020-04-01 13:00:00

order1

product2

5

2020-04-01 13:00:00

2020-04-01 13:00:00

order2

product1

2

2020-04-01 13:00:00

2020-04-01 13:00:00

order2

product2

4

2020-04-01 13:00:00

2020-04-01 13:00:00

 

统计结果:

order_date

product_code

cnt

2020-04-01

product1

5

2020-04-01

product2

9

 

然后订单号为order1的订单取消了:

order_no

order_state

pay_time

create_time

update_time

order1

0

 

2020-04-01 13:00:00

2020-04-01 13:15:00

order2

1

 

2020-04-01 13:00:00

2020-04-01 13:00:00

 

统计结果:

order_date

product_code

cnt

2020-04-01

product1

2

2020-04-01

product2

4

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

FLINK SQL实战案例之商品销量实时统计 的相关文章

  • 数据库优化

    前言 毫不夸张的说咱们后端工程师 无论在哪家公司 呆在哪个团队 做哪个系统 遇到的第一个让人头疼的问题绝对是数据库性能问题 如果我们有一套成熟的方法论 能让大家快速 准确的去选择出合适的优化方案 我相信能够快速准备解决咱么日常遇到的80 甚
  • Flink CDC 详述实时数据湖

    在构建实时数仓的过程中 如何快速 正确的同步业务数据是最先面临的问题 本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术 来解决业务数据实时入湖相关的问题 01 Flink CDC介绍 CDC全称是C
  • 【实战】“TED”演讲——可视化分析

    TED technology entertainment design 旨在将技术 娱乐 设计领域的专家聚集在一起的非盈利性组织 口号 Ideas worth spreading 值得传播的思想 每年2 3月 会召集杰出人物 将工作和研究提
  • 体验ChatGPT在具体应用场景下的能力与表现——vuedraggable的move多次触发问题

    当下人工智能模型在满天飞 今天拿一个具体的应用场景 来体验下ChatGPT的能力与表现 看看是否能解决实际问题 顺便填一下之前遇到的一个具体的坑 vuedraggable的move多次触发问题 背景 背景是这样的 实现低代码开发平台过程中
  • 为什么SELECT * 会导致查询效率低?

    无论在工作还是面试中 关于SQL中不要用 SELECT 都是大家听烂了的问题 虽说听烂了 但普遍理解还是在很浅的层面 并没有多少人去追根究底 探究其原理 废话不多说 本文带你深入了解一下 SELECT 效率低的原因及场景 一 效率低的原因
  • Apache Flink SQL 详解与实践

    问题导读1 为何会有Flink SQL 2 本文哪些地方涉及Flink 1 7 4 如何定义源 sources 和接收器 sinks 5 Flink SQL本文介绍了哪些sql 6 将数据格式化为正确的格式以便进一步处理 7 如何监控Fli
  • 数据仓库指标体系实践

    指标体系 1 痛点分析 主要从业务 技术 产品三个视角来看 业务视角 业务分析场景指标 维度不明确 频繁的需求变更和反复迭代 数据报表臃肿 数据参差不齐 用户分析具体业务问题找数据 核对确认数据成本较高 技术视角 指标定义 指标命名混乱 指
  • APP移动应用测试策略与工具思维导图

    2张图构建移动应用测试知识体系 1 APP移动测试策略 2 移动测试常用工具 目前觉得好用的 因还有其它事 故这里不再啰嗦 想要听我啰嗦的 改天书里见
  • 容器化部署zabbix

    一 创建docker compose yml文件 首先创建一份docker compose yml文件 使用docker compose进行容器的编排 mkdri zabbix 在根目录创建zabbix文件 cd zabbix touch
  • Scrapy+bs4爬取京东商品对应的评论信息

    Scrapy bs4爬取京东商品对应的评论信息 spiders comm py coding utf 8 import json import jsonpath import scrapy from bs4 import Beautiful
  • Solidity编程开发实例

    Solidity 编程开发实例 Voting 投票 接下来的智能合约教程非常复杂 但展示了很多Solidity的特性 它实现了一个入门的投票合约 当然 电子选举的主要问题是如何赋予投票权给准确的人 并防止操纵 我们不能解决所有的问题 但至少
  • Random.Range()的范围问题

    Random Range 方法的是进行差生随机数的一个方法 int i Random Range min max 这里进行产生的随机数 当min max相等 产生的随机数返回的是min 因为min max 产生的最大的也就是max 当min
  • 大数据数据倾斜问题

    数据倾斜 数据倾斜是我们在处理大数据量问题时绕不过去的问题 也是在面试中几乎必问的考点 正常的数据分布理论上都是倾斜的 就是我们所说的 二八原理 80 的财富集中在20 的人手中 80 的用户只使用20 的功能 20 的用户贡献了80 的访
  • 数据仓库模型设计V2.0

    一 数仓建模的意义 数据模型就是数据组织和存储方法 它强调从业务 数据存取和使用角度合理存储数据 只有将数据有序的组织和存储起来之后 数据才能得到高性能 低成本 高效率 高质量的使用 高性能 良好的数据模型能够帮助我们快速查询所需要的数据
  • 学习阿里如何进行数据指标体系的治理

    想必做数据的同学对One Data都有所耳闻 但One Data 体系具体包含了内容 有怎样的应用 不知道大家是否了解 今天我们详细分享一下One Data体系中关于数据治理相关的内容 One Data整体概述 首先 我们看看One Dat
  • SuperSocket实战手把手教程:一个完整的SocketServer项目

    SuperSocket系列教程 1 SuperSocket基础 一 基本概念 2 SuperSocket实战手把手教程 一个完整的SocketServer项目 目录 一 项目场景 1 Visual Studio新建项目 2 自定义自己服务器
  • Apache Flink(十五):Flink任务提交模式

    个人主页 IT贫道 大数据OLAP体系技术栈 Apache Doris Clickhouse 技术 CSDN博客 私聊博主 加入大数据技术讨论群聊 获取更多大数据资料 博主个人B栈地址 豹哥教你大数据的个人空间 豹哥教你大数据个人主页 哔哩
  • 【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(1)- 窗口介绍、分类、函数

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • 【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口应用)介绍及示例(1)- 窗口介绍、分类、函数

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比

随机推荐

  • vue给标签动态添加元素_动态添加dom元素,并绑定vue事件

    背景 后管系统配置一个产品后 前端 vue js框架 取到这些产品信息并展示出来 产品经理要求在文本内容中添加链接 例如 本保险不承保前往处于战争状态或已被宣布为紧急状态的国家或地区 最新信息以登陆http baoxian pingan c
  • 随手记3:C#Unity中随机数的使用

    问题 在同时引用UnityEngine和System命名空间时 using UnityEngine 继承MonoBehaviour类 using System 用到了浮点数转字符串的Convert函数 如果直接Random Range fl
  • 安装mysql

    wget http repo mysql com mysgl community release el7 5 noarch rpm rpm ivh mysgl community release el7 5 noarch rpm yum y
  • JDBC简介(2)

    前篇 JDBC简介 1 1 Connection角色 Connection表示与特定数据库的连接 可以获取到数据库的一些信息 这些信息包括 其表信息 应该支持的SQL语法 数据库内有什么存储过程 此链接功能的信息等等 在一般实际使用情况下
  • Srpingboot项目application.yml文件没有生效

    1 首先看文件头是不是树叶 如果是不是 文件名称可能存在问题 我的问题 因为有父工程 去掉子工程里面的
  • 第11章 分布式事务解决方案

    mini商城第11章 分布式事务解决方案 一 课题 分布式事务解决方案 二 回顾 1 MongoDB部署及应用 2 购物车功能实现 3 订单功能实现 三 目标 1 分布式事务 事务简介 本地事务讲解 不同场景下的分布式事务 2 分布式事务理
  • C++开发过程笔记~~持续更新~~

    文章目录 1 为什么只有 析构函数不论基类和派生类都用到了virtual关键字 2 c inline使函数实现可以在头文件中 避免多重定义错误 3 this gt 4 调用另一个cpp文件中函数 多个 cpp文件编译 5 有空看看开源项目g
  • 老猿学5G:融合计费场景的离线计费会话的Nchf_OfflineOnlyCharging_Update 更新操作过程

    前往老猿Python博文目录 一 Nchf OfflineOnlyCharging Update消息交互过程 Nchf OfflineOnlyCharging Update消息是是5G融合计费的离线计费中CHF为SMF中的NF功能体CTF提
  • 用css 添加手状样式,鼠标移上去变小手,变小手

    用css 添加手状样式 鼠标移上去变小手 变小手 cursor pointer 用JS使鼠标变小手onmouseover 鼠标越过的时候 nm use ver this style cursor hand cursor其他取值 auto 标
  • OpenGL学习笔记(六)-模型加载

    参考网址 LearnOpenGL 中文版 哔哩哔哩教程 第三章 模型加载 3 1 Assimp 1 Assimp能够导入多种模型文件格式 将所有的模型数据加载至Assimp的通用数据结构中 我们就能够从Assimp的数据结构中提取我们所需的
  • linux操作系统下根目录下各目录的作用

    bin 二进制文件 普通用户和超级用户使用的命令 sbin 二进制文件 root用户也就是管理员使用的命令 普通用户没有权限 boot 系统启动的关键文件 dev 管理各个设备的文件 etc 所有程序的配置文件 home 用户家目录文件 l
  • 消息循环中的TranslateMessage函数和DispatchMessage函数

    TranslateMessage函数 函数功能描述 将虚拟键消息转换为字符消息 字符消息被送到调用线程的消息队列中 在下一次线程调用函数GetMessage或PeekMessage时被读出 函数原型 BOOL TranslateMessag
  • javanio应用场景,从理论到实践!

    直击面试 反正我是带着这些问题往下读的 说一下 JVM 运行时数据区吧 都有哪些区 分别是干什么的 Java 8 的内存分代改进 举例栈溢出的情况 调整栈大小 就能保存不出现溢出吗 分配的栈内存越大越好吗 垃圾回收是否会涉及到虚拟机栈 方法
  • moviepy音视频剪辑:颜色相关变换函数blackwhite、colorx、fadein/out、gamma_corr、invert_colors、lum_contrast、mask_color详解

    前往老猿Python博文目录 注意 本文为收费专栏文章 对应免费专栏文章为 moviepy音视频剪辑 颜色相关变换函数blackwhite colorx fadein out gamma corr invert colors lum con
  • Java 匿名对象

    一 简介 1 1 含义 没有名字的对象 以常规的创建对象的方法 AtomicInteger atomicInteger new AtomicInteger 100000 格式 类名 变量名 new 类名 这样就完成了对象的创建 注意 内可以
  • windows系统启动服务一直不成功,查看windows日志方法

    今天遇到一个问题 windows系统部署了spring cloud的服务 手动执行start bat文件可以启动服务 用服务的方式启动就一直启动不了 通过 控制面板 gt 管理工具 在 事件查看器 gt windows日志 gt 应用程序
  • 遮罩和蒙版有什么区别,视频遮罩怎么用

    在制作短视频时 好多小伙伴分不清遮罩与蒙版的区别 甚至有的人认为它们就是一个东西 要说起来 这两个看似一样的概念 其实还是有很大的区别 今天就来带各位了解一下遮罩和蒙版有什么区别 视频遮罩怎么用 希望对各位认识并理解蒙版和遮罩有一定的帮助
  • 根据java实体类生成创建表sql步骤

    根据java实体类生成创建表sql步骤 根据java实体类生成创建表sql语句时 方法是利用java反射 AOP注解 主要步骤如下 1 注解类 一般在生成表的时候 需要表名 主键名 字段名 对应到注解上至少要体现出这三部分 1 1表名 主键
  • 【Flutter 组件】004-基础组件:图片及 ICON

    Flutter 组件 004 基础组件 图片及 ICON 一 图片 1 Image 概述 Flutter 中 我们可以通过 Image 组件来加载并显示图片 Image 的数据源可以是 asset 文件 内存以及网络 Image 是一个用于
  • FLINK SQL实战案例之商品销量实时统计

    问题导读1 本文的业务包含哪些流程 2 本文难点在什么地方 3 如何通过flink sql实现商品销量实时统计 1 案例背景介绍互联网电商往往需要对订单商品销量实时统计 用于实时大屏展示 库存销量监控等等 本文主要介绍如何通过flink s