每隔5分钟输出最近一小时内点击量最多的前N个商品(SQL实现版)

2023-11-08

代码

package com.zjc.flow_analysis.hotitems_analysis

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Slide}
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.sql.Timestamp
import java.util.Properties

object HotItemsSQL {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop103:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotItems", new SimpleStringSchema(), properties))
    val dataStream = inputStream.map(data => {
      val arrayData = data.split(",")
      UserBehavior(arrayData(0).toLong, arrayData(1).toLong, arrayData(2).toLong, arrayData(3).toString,arrayData(4).toLong)
    }).assignAscendingTimestamps(_.timestamp * 1000L)

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env, settings)
    // 将dataStream转为表
    tableEnv.createTemporaryView("dataTable",dataStream, 'itemId, 'behavior, 'timestamp.rowtime as 'ts)
    val resultTalbe = tableEnv.sqlQuery(
      """
        |select *
        |from (
        | select *,
        |  row_number() over(partition by windowEnd order by cnt desc) as row_num
        |  from (
        |  select itemId, count(itemId) as cnt,
        |         hop_end(ts, interval '5' minute, interval '1' hour) as windowEnd
        |  from dataTable
        |  where behavior='pv'
        |  group by itemId, hop(ts, interval '5' minute, interval '1' hour)
        |  )
        |)
        |where row_num <= 5
        |""".stripMargin
    )
    resultTalbe.toRetractStream[(Long, Long,Timestamp, Long)].print("result")
    env.execute("商品热门统计(sql版实现)")

  }
}

输出,部分截图:
在这里插入图片描述
注意:sql中用单引号,如behavior=‘pv’,如果用双引号sql解析会有问题。
HOP(time_attr, interval, interval)定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度。
HOP_END(time_attr, interval, interval)定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度,返回的窗口右边界时间戳。
官网解释下图:
在这里插入图片描述
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

每隔5分钟输出最近一小时内点击量最多的前N个商品(SQL实现版) 的相关文章

  • SQL - != 'NULL' 的解释

    我的SSMS代码如下 Select top 50 From FilteredContact Where statuscode 1 and emailaddress1 NULL and telephone1 NULL and address1
  • 如何获得顶部带有千位分隔符的数字?

    SELECT count FROM table A 假设结果是8689 我怎样才能将它转换为8 689在 SQL Server 上 尝试这样 select replace convert varchar convert Money coun
  • MS ACCESS 计数/求和行数,不重复

    我有下表 我需要计算总行数而不包括任何重复记录 CustomerID test1 test1 test2 test3 test4 test4 如您所见 总行数为 6 但有两个 test1 和两个 test4 我希望查询返回 4 IOW 我想
  • 索引数量越少意味着插入、更新和删除速度更快? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • POINT 列上的 MySQL INSERT/UPDATE

    我正在尝试用我国家的地理位置填充我的数据库 我的一张表有 4 个字段 ID PK 纬度 经度和地理点 EDIT SCDBs Punto Geografico SET lat 18 469692 SET lon 63 93212 SET g
  • MySQL - 从临时表插入

    这看起来非常简单 但我坚持使用简单的插入语句 见下文 begin work CREATE TEMPORARY TABLE IF NOT EXISTS insert table AS select r resource id fr file
  • 在 MySQL 中对整数字段运行带引号的数字(字符串)查询时会发生哪些复杂情况

    在 SQL 中 不应引用整数 因为如果引用 它将是一个字符串 但我很好奇如果我这样做会出现什么问题 并发症 例如 SELECT FROM table WHERE id 1 正确的 vs SELECT FROM table WHERE id
  • 有没有办法阻止 SQL Express 2008 空闲?

    我使用 SQL Express 2008 作为 Web 应用程序的后端 问题是 Web 应用程序是在工作时间使用的 因此有时在午餐或休息时间 如果 20 分钟内没有用户登录 SQL Express 将进入空闲状态模式并释放其缓存 我知道这一
  • 需要按天分割日期时间范围

    我有一个需要根据日期时间拆分的表 输入表 ID Start End A 2019 03 04 23 18 04 2019 03 04 23 21 25 A 2019 03 04 23 45 05 2019 03 05 00 15 14 所需
  • SQL 2008全文索引填充延迟

    我的经理说 在基础表数据更改后 可能需要一段时间才能更新全文搜索索引 例如 如果我有一张桌子Products有一个柱子Description我更新了该描述 然后我可能需要一些时间才能搜索该新描述 真的吗 这需要多长时间 SQL 2008 对
  • 根据日期顺序排名

    我的数据如下 Heading Date A 2009 02 01 B 2009 02 03 c 2009 02 05 d 2009 02 06 e 2009 02 08 我需要如下排名 Heading Date Rank A 2009 02
  • JDBC插入实数数组

    我试图将一个真实的数组插入到 postgresql 数组中 该表的定义是 String sqlTable CREATE TABLE IF NOT EXISTS ccmBlock sampleId INTEGER block REAL 插入内
  • 如何从 SQL Server 2008 查询结果中删除“NULL”

    我有一个包含 59 列和超过 17K 行的表 很多行都有NULL在某些列中 我想删除NULL以便查询返回空白 而不是NULL 我可以运行一些更新功能来替换所有NULL with 使用 SQL Server 2008R2 Management
  • 如何在 DB2 中创建返回序列值的函数?

    如何在 DB2 中创建一个从序列中获取值并返回该值的函数 应该可以在 select 或 insert 语句中使用该函数 例如 select my func from xxx insert into xxx values my func 基本
  • SELECT max(x) 返回 null;我怎样才能让它返回0?

    运行以下命令时如何返回 0 而不是 null SELECT MAX X AS MaxX FROM tbl WHERE XID 1 假设没有XID 1的行 or SELECT coalesce MAX X 0 AS MaxX FROM tbl
  • 我不断收到错误“关系 [TABLE] 不存在”

    我一直在尝试查询数据库中的两个表 在服务器资源管理器中 我可以看到两个表 甚至可以看到其中的列 我们将它们称为 Schema table1 和 Schema table2 其中 Schema 的第一个字母大写 我尝试运行以下查询 selec
  • 如何使用 BigQuery 有效地选择另一个表中匹配子字符串的记录?

    我有一个包含数百万个字符串的表 我想将其与包含大约两万个字符串的表进行匹配 如下所示 standardSQL SELECT record FROM record JOIN fragment ON record name LIKE CONCA
  • Spark SQL 中的 SQL LIKE

    我正在尝试使用 LIKE 条件在 Spark SQL 中实现联接 我正在执行连接的行看起来像这样 称为 修订 Table A 8NXDPVAE Table B 4 8 NXD V 在 SQL Server 上执行联接 A revision
  • 如何在 SQL Server 中连接

    我的数据库没有特定的列 因此我通过开关在查询中创建了一个列 我需要的是将此列与数据库中的另一列连接起来 select certificateDuration DurationType case when certificateDuratio
  • 我是否需要在外键上指定 ON DELETE NO ACTION?

    我有以下与 SQL Server 2012 一起使用的 DDL CREATE TABLE Subject SubjectId INT IDENTITY 1 1 NOT NULL Name NVARCHAR 50 Not NULL CONST

随机推荐

  • 物联网的应用场景

    随着物联网技术的不断发展和普及 它已经在各个领域展现出了巨大的潜力和前景 下面将会探讨物联网的应用前景 1 智能家居 智能家居是物联网技术最广泛应用的领域之一 通过智能家居设备 人们可以在任何时间 任何地点通过手机 平板电脑等设备远程控制家
  • 深度学习原理分析之数据不足与过拟合

    人们常常知道若干种解决过拟合的方法但不知其因 本文对其进行原理剖析 一个模型所能提供的信息一般来源于两个方面 一是训练数据中蕴含的信息 二是在模型的形成过程中 包括构造 学习 推理等 人们提供的先验信息 当训练数据不足时 说明模型从原始数据
  • Spring Boot官方例子《Developing Your First Spring Boot Application》无法运行

    官方的第一个例子就卡住了 https docs spring io spring boot docs current reference htmlsingle getting started first application 按照要求 一
  • 【消息队列】kafka consumer demo

    package consumer import org apache kafka clients consumer ConsumerConfig import org apache kafka clients consumer Consum
  • git常用命令及免密登录

    常用命令 git config global user name 用户名 设置用户签名 git config global user email 邮箱 设置用户签名 git init 初始化本地库 git status 查看本地库状态 gi
  • 多线程写图像文件的一点小测试(Boost + Gual)

    转载自 http blog csdn net liminlu0314 article details 7420484 在处理遥感图像中 发现往往比较耗时的是在数据的IO中 尤其是在O 写入 的时候更加耗时 GDAL可以支持图像的多线程写入
  • 蓝桥杯——修改数组

    问题描述 给定一个长度为N的数组A A1 A2 AN 数组中有可能有重复出现的整数 在小明要按以下方法将其修改为没有重复整数的数组 小明会依次修改A2 A3 AN 当修改Ai时 小明会检查Ai是否在A1 Ai 1中出现过 如果出现过 则小明
  • C#中的事件和委托_札记1

    C 中的事件和委托 札记1 委托 自定义委托 静态方法 被委托 委托是一种类型 所以任何定义类的地方都可以定义委托类型 自定义委托的基本格式示例如下
  • RobotFramework 安装教程

    动化测试框架 具盘点 安装步骤 页面介绍 标准库 不需要安装 直接 RF 带 扩展库 快捷键 实战 RobotFramework 安装教程 动化测试框架 具盘点 java junit和testng 具 postmen newman git
  • html动态爱心代码【二】(附源码)

    目录 前言 效果演示 内容修改 完整代码 总结 前言 七夕马上就要到了 为了帮助大家高效表白 下面再给大家带来了实用的HTML浪漫表白代码 附源码 背景音乐 可用于520 情人节 生日 表白等场景 可直接使用 效果演示 内容修改 文案 di
  • go - flag包(处理命令行参数小能手)

    前言 在golang中有很多方法来处理命令行参数 简单情况下可以不使用任何库 直接使用os Args 但是golang标准库提供了flag包来专门处理命令行参数 当然还有第三方提供的处理命令行参数的库cobra cli可以参考 flag包绑
  • qt没有mysql驱动的解决办法

    qt没有mysql驱动的解决办法 第一部分 qtcreator上没有mysql驱动的解决办法 第一步 找到你的qt的版本的源码src 第二步 点击mysql pro 电脑会自动打开qtcreater 然后就是进行编译器的选择 我选择的是 在
  • BootStrap的使用

    是别人帮我们已经写好的css样式 我们如果想要使用这个BootStrap 下载BootStrap 使用 在页面上引入BootStrap 自定置 先在网上下载好BootStrap 并导入到Pycharm 引入BootStrap 注意引入的是
  • 【react】文本内容超过一行,显示为单行省略,并且出现icon图标;点击此图标,可以进行展开或收起文本功能实现

    需求 多条数据展示 每条数据的文本内容不超过一行 文本内容为一行时 不显示 展开收起icon图标 文本超过一行时 内容单行省略 并且显示 点击图标 图标切换为收起按钮 后端返回数据 const data name 测试测试测试 time 2
  • BinaryViewer(二进制查看器)使用教程(附下载)

    1 BinaryViewer操作界面 2 面板功能 1 数据面板 此面板占据了屏幕的最中央部分 其目的是顺序显示打开的文件或物理驱动器中的所有数据 此面板通常以两列显示数据 每列都可以按用户选择的格式显示数据 请转到数据显示模式 查看如何更
  • 对indexedDB的一些使用方法

    indexedDB的使用 1 打开数据库和创建数据仓库 createDB function dbName version tableName key cursor callBack 参数为 dbName数据库名 version版本号 tab
  • Python运维开发工程师养成记(while循环语句)

    图示 案例 contine和break用法 无限循环 while else语句 今天分享到这里 喜欢的盆友可以关注一下博主 链接 https ke qq com course 4300856 tuin d8aedf68
  • android 环信集成,Android 环信集成使用总结

    最近因为项目需要 需要集成环信 对于一些账号的注册 配置的添加官方文档上写的都有 就不在记录 就记录一下集成过程中遇到的问题 环信demo中的代码太乱 而且一些功能用不到 我们就移值些自己有用的放到自己的项目中 1 消息监听 环信在收到消息
  • mysql如何查询成绩前5名_sql 语句查询 前5名后5名的成绩

    蝴蝶不菲 两种办法 分别求最大和最小 然后union allselect from select from table order by 成绩 where rownum lt 5union allselect from select fro
  • 每隔5分钟输出最近一小时内点击量最多的前N个商品(SQL实现版)

    代码 package com zjc flow analysis hotitems analysis import org apache flink api common serialization SimpleStringSchema i