代码
package com.zjc.flow_analysis.hotitems_analysis
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Slide}
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.sql.Timestamp
import java.util.Properties
object HotItemsSQL {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop103:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotItems", new SimpleStringSchema(), properties))
val dataStream = inputStream.map(data => {
val arrayData = data.split(",")
UserBehavior(arrayData(0).toLong, arrayData(1).toLong, arrayData(2).toLong, arrayData(3).toString,arrayData(4).toLong)
}).assignAscendingTimestamps(_.timestamp * 1000L)
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
// 将dataStream转为表
tableEnv.createTemporaryView("dataTable",dataStream, 'itemId, 'behavior, 'timestamp.rowtime as 'ts)
val resultTalbe = tableEnv.sqlQuery(
"""
|select *
|from (
| select *,
| row_number() over(partition by windowEnd order by cnt desc) as row_num
| from (
| select itemId, count(itemId) as cnt,
| hop_end(ts, interval '5' minute, interval '1' hour) as windowEnd
| from dataTable
| where behavior='pv'
| group by itemId, hop(ts, interval '5' minute, interval '1' hour)
| )
|)
|where row_num <= 5
|""".stripMargin
)
resultTalbe.toRetractStream[(Long, Long,Timestamp, Long)].print("result")
env.execute("商品热门统计(sql版实现)")
}
}
输出,部分截图:
![在这里插入图片描述](https://img-blog.csdnimg.cn/cd8aca0317f2421e904a09f4edf87c09.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAempjNGo=,size_20,color_FFFFFF,t_70,g_se,x_16)
注意:sql中用单引号,如behavior=‘pv’,如果用双引号sql解析会有问题。
HOP(time_attr, interval, interval)定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度。
HOP_END(time_attr, interval, interval)定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度,返回的窗口右边界时间戳。
官网解释下图:
![在这里插入图片描述](https://img-blog.csdnimg.cn/68c1078bf8d64b289340e64e39c3dc18.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAempjNGo=,size_20,color_FFFFFF,t_70,g_se,x_16)
![在这里插入图片描述](https://img-blog.csdnimg.cn/9f44f3e8af2d46e78c0a7ce592e84483.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAempjNGo=,size_20,color_FFFFFF,t_70,g_se,x_16)