TopN语句常用于计算实时数据中对某个指标的最大或者最小的前N个数据的筛选。Flink SQL可以基于 OVER窗口操作灵活地完成TopN的工作。
语法
SELECT*
FROM(
SELECT*,
ROW_NUMBER()OVER([PARTITION BY col1[,col2..]]
ORDER BY col1[asc|desc][,col2[asc|desc]...])AS rownum
FROM table_name)
WHERE rownum<=N[AND conditions]
参数说明:
ROW_NUMBER():行号计算函数OVER的窗口,行号计算从1开始。
PARTITION BY col1[, col2..] : 指定分区的列,可以不指定。
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:指定排序的列和每列的排序方向。
如上语法所示,TopN需要两层query。
查询中使用 ROW_NUMBER() 窗口函数来对数据根据排序列进行排序并标上排名。
外层查询中对排名进行过滤,只取前N条。例如N=10即为取前10条的数据。
在执行过程中,Flink SQL会对输入的数据流根据排序键进行排序。如果某个分区的前N条记录发生了改变,则会将改变的那几条数据以更新流的形式发给下游。
注意:如果需要将TopN的数据输出到外部存储,后接的结果表必须是一个带主键的表。
WHERE条件的限制
为了使Flink SQL能识别出这是一个TopN的query,外层循环中必须要指定 rownum <= N的格式来指定前N条记录,不能使用rownum - 5 <= N 这种将rownum至于某个表达式中。当然,WHERE条件中,可以额外带上其他条件,但是必须是以AND连接。
示例一
如下示例,先统计查询流中每小时、每个城市和关键字被查询的次数。然后输出每小时、每个城市被查询最多的前100个关键字。在输出表中,小时、城市、排名三者可以唯一确定一条记录,所以需要将这三列声明成联合主键(需要在外部存储中也有同样的主键设置)。
CREATE TABLE rds_output(
rownumint,
start_time bigint,
city varchar,
keyword varchar,
pv bigint,
PRIMARY KEY(rownum,start_time,city)
)with(
type='rds',
...
)
INSERT INTO rds_output
SELECT rownum,start_time,city,keyword,pv
FROM(
SELECT*,
ROW_NUMBER()OVER(PARTITION BY start_time,city ORDER BY pv desc)AS rownum
FROM(
selectsubstr(time_str,1,12)asstart_time,
keyword,
count(1)aspv,
city
fromtmp_search
groupbysubstr(time_str,1,12),keyword,city
)a
)
WHERE rownum<=100
示例二
测试数据
ip(varchar)
time(varchar)
192.168.1.1
100000000
192.168.1.2
100000000
192.168.1.2
100000000
192.168.1.3
100030000
192.168.1.3
100000000
192.168.1.3
100000000
测试语句
create table source_table(
IP VARCHAR,
`TIME`VARCHAR
)with(
type='datahub',
endPoint='http://dh-cn-hangzhou.aliyuncs.com',
project='blink_test',
topic='ip_count01',
accessId='LTXXXx',
accessKey='gUqXXXxxx'
);
create table result_table(
rownumint,
start_time VARCHAR,
IP VARCHAR,
cc BIGINT,
PRIMARY KEY(start_time,IP)
)with(
type='rds',
url='jdbc:mysql://rm-bp1gz4k202t8XXXXXXs.com:3306/blink_test',
tableName='blink_rds_test',
userName='xxx',
password='xxx'
);
INSERT INTO result_table
SELECT rownum,start_time,IP,cc
FROM(
SELECT*,
ROW_NUMBER()OVER(PARTITION BY start_time ORDER BY cc desc)AS rownum
FROM(
SELECT SUBSTRING(`TIME`,1,2)AS start_time,--可以根据真实时间取相应的数值,这里取得是测试数据
COUNT(IP)AS cc,
IP
FROM source_table
GROUP BY SUBSTRING(`TIME`,1,2),IP
)a
)
WHERE rownum<=3--可以根据真实top值取相应的数值,这里取得是测试数据
测试结果
rownum(int)
start_time(varchar)
ip(varchar)
cc(bigint)
1
10
192.168.1.3
6
2
10
192.168.1.2
4
3
10
192.168.1.1
2
无排名优化
根据TopN的语法,rownum字段会作为结果表的主键字段之一写入结果表。但是这可能导致数据膨胀的问题。例如,收到一条原排名9的更新数据,更新后排名上升到1,那么从1到9的数据排名都发生变化了。需要将这些数据作为更新都写入结果表。这样就产生了数据膨胀,可能导致结果表因为收到了太多的数据而降低更新速度。
优化方法
结果表中不保存 rownum,最终的 rownum 由前端计算。因为TopN的数据量通常不会很大,前端排序100个数据很快。当收到一条原排名9,更新后排名上升到1的数据,也只需要发送这一条数据,而不用把排名1到9的数据全发送下去。这种优化能够提升结果表的更新速度。
无排名优化语法
SELECT col1,col2,col3
FROM(
SELECT col1,col2,col3
ROW_NUMBER()OVER([PARTITION BY col1[,col2..]]
ORDER BY col1[asc|desc][,col2[asc|desc]...])AS rownum
FROM table_name)
WHERE rownum<=N[AND conditions]
语法与上文类似,只是在外层查询中将 rownum 字段裁剪掉即可。
注意:在无 rownum 的场景,结果表主键的定义一定要特别注意。如果定义有误,会直接导致TopN结果的不正确。无 rownum 场景的主键应为 TopN 上游 group by 节点的 keys 列表。
无排名优化示例
本示例来自某视频行业用户的真实业务(精简后)。用户每个视频在分发时会产生大量流量,依据视频产生的流量可以分析出最热门的视频。如下示例用于统计出每分钟流量最大的 top5 的视频。
测试语句
--从SLS读取数据原始存储表
CREATE TABLE sls_cdnlog_stream(
vid VARCHAR,--video id
rowtimeTimestamp,--观看视频发生的时间
response_size BIGINT,--观看产生的流量
WATERMARK FOR rowtimeaswithOffset(rowtime,0)
)WITH(
type='sls',
...
);
--1分钟窗口统计vid带宽数
CREATE VIEW cdnvid_group_view AS
SELECT vid,
TUMBLE_START(rowtime,INTERVAL'1'MINUTE)AS start_time,
SUM(response_size)AS rss
FROM sls_cdnlog_stream
GROUP BY vid,TUMBLE(rowtime,INTERVAL'1'MINUTE);
--存储表
CREATE TABLE hbase_out_cdnvidtoplog(
vid VARCHAR,
rss BIGINT,
start_time VARCHAR,
--注意结果表中不存储rownum字段
--特别注意该主键的定义,为TopN上游groupbykeys
PRIMARY KEY(start_time,vid)
)WITH(
type='RDS',
...
);
--统计每分钟top5消耗流量的vid,并输出
INSERT INTO hbase_out_cdnvidtoplog
--注意次外层查询,不选出rownum字段
SELECT vid,rss,start_time FROM
(
SELECT
vid,start_time,rss,
ROW_NUMBER()OVER(PARTITION BY start_time ORDER BY rss DESC)asrownum,
FROM
cdnvid_group_view
)
WHERE rownum<=5;
测试数据
vid(VARCHAR)
rowtime(Timestamp)
response_size(BIGINT)
10000
2017-12-18 15:00:10
2000
10000
2017-12-18 15:00:15
4000
10000
2017-12-18 15:00:20
3000
10001
2017-12-18 15:00:20
3000
10002
2017-12-18 15:00:20
4000
10003
2017-12-18 15:00:20
1000
10004
2017-12-18 15:00:30
1000
10005
2017-12-18 15:00:30
5000
10006
2017-12-18 15:00:40
6000
10007
2017-12-18 15:00:50
8000
测试结果
start_time(VARCHAR)
vid(VARCHAR)
rss(BIGINT)
2017-12-18 15:00:00
10000
9000
2017-12-18 15:00:00
10007
8000
2017-12-18 15:00:00
10006
6000
2017-12-18 15:00:00
10005
5000
2017-12-18 15:00:00
10002
4000
本文转自实时计算——