网易首页 > 网易号 > 正文 申请入驻

大数据培训 | Flink SQL窗口表值函数聚合实现原理

0
分享至

引子

表值函数(table-valued function, TVF),顾名思义就是指返回值是一张表的函数,在Oracle、SQL Server等数据库中屡见不鲜。

而在Flink的上一个稳定版本1.13中,社区通过FLIP-145提出了窗口表值函数(window TVF)的实现,用于替代旧版的窗口分组(grouped window)语法。

举个栗子,在1.13之前,我们需要写如下的Flink SQL语句来做10秒的滚动窗口聚合:

SELECT TUMBLE_START(procTime, INTERVAL '10' SECONDS) AS window_start,merchandiseId,COUNT(1) AS sellCount

FROM rtdw_dwd.kafka_order_done_log

GROUP BY TUMBLE(procTime, INTERVAL '10' SECONDS),merchandiseId;

在1.13版本中,则可以改写成如下的形式:

SELECT window_start,window_end,merchandiseId,COUNT(1) AS sellCount

FROM TABLE( TUMBLE(TABLE rtdw_dwd.kafka_order_done_log, DESCRIPTOR(procTime), INTERVAL '10' SECONDS) )

GROUP BY window_start,window_end,merchandiseId;

根据设计文档的描述,窗口表值函数的思想来自2019年的SIGMOD论文<>,而表值函数属于SQL 2016标准的一部分。

Calcite从1.25版本起也开始提供对滚动窗口和滑动窗口TVF的支持。

除了标准化、易于实现之外,窗口TVF还支持旧版语法所不具备的一些特性,如Local-Global聚合优化、Distinct解热点优化、Top-N支持、GROUPING SETS语法等。

接下来本文简单探究一下基于窗口TVF的聚合逻辑,以及对累积窗口TVF做一点简单的改进。

更多Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)

SQL定义

窗口TVF函数的类图如下所示。

Flink SQL在Calcite原生的SqlWindowTableFunction的基础上加了指示窗口时间的三列,即window_start、window_end和window_time。

SqlWindowTableFunction及其各个实现类的主要工作是校验TVF的操作数是否合法(通过内部抽象类AbstractOperandMetadata和对应的子类OperandMetadataImpl)。这一部分不再赘述,在下文改进累积窗口TVF的代码中会涉及到。

物理计划

目前窗口TVF不能单独使用,需要配合窗口聚合或Top-N一起使用。以上文中的聚合为例,观察其执行计划如下。

EXPLAIN

SELECT window_start,window_end,merchandiseId,COUNT(1) AS sellCount

FROM TABLE( TUMBLE(TABLE rtdw_dwd.kafka_order_done_log, DESCRIPTOR(procTime), INTERVAL '10' SECONDS) )

GROUP BY window_start,window_end,merchandiseId;

== Abstract Syntax Tree ==

LogicalAggregate(group=[{0, 1, 2}], sellCount=[COUNT()])

+- LogicalProject(window_start=[$48], window_end=[$49], merchandiseId=[$10])

+- LogicalTableFunctionScan(invocation=[TUMBLE($47, DESCRIPTOR($47), 10000:INTERVAL SECOND)], rowType=[RecordType(BIGINT ts, /* ...... */, TIMESTAMP_LTZ(3) *PROCTIME* procTime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])

+- LogicalProject(ts=[$0], /* ...... */, procTime=[PROCTIME()])

+- LogicalTableScan(table=[[hive, rtdw_dwd, kafka_order_done_log]])

== Optimized Physical Plan ==

Calc(select=[window_start, window_end, merchandiseId, sellCount])

+- WindowAggregate(groupBy=[merchandiseId], window=[TUMBLE(time_col=[procTime], size=[10 s])], select=[merchandiseId, COUNT(*) AS sellCount, start('w$) AS window_start, end('w$) AS window_end])

+- Exchange(distribution=[hash[merchandiseId]])

+- Calc(select=[merchandiseId, PROCTIME() AS procTime])

+- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, /* ...... */])

== Optimized Execution Plan ==

Calc(select=[window_start, window_end, merchandiseId, sellCount])

+- WindowAggregate(groupBy=[merchandiseId], window=[TUMBLE(time_col=[procTime], size=[10 s])], select=[merchandiseId, COUNT(*) AS sellCount, start('w$) AS window_start, end('w$) AS window_end])

+- Exchange(distribution=[hash[merchandiseId]])

+- Calc(select=[merchandiseId, PROCTIME() AS procTime])

+- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, /* ...... */])

在Flink SQL规则集中,与如上查询相关的规则按顺序依次是:

  • ConverterRule:StreamPhysicalWindowTableFunctionRule该规则将调用窗口TVF的逻辑节点(即调用SqlWindowTableFunction的LogicalTableFunctionScan节点)转化为物理节点(StreamPhysicalWindowTableFunction)。
  • ConverterRule:StreamPhysicalWindowAggregateRule该规则将含有window_start、window_end字段的逻辑聚合节点FlinkLogicalAggregate转化为物理的窗口聚合节点StreamPhysicalWindowAggregate以及其上的投影StreamPhysicalCalc。在有其他分组字段的情况下,还会根据FlinkRelDistribution#hash生成StreamPhysicalExchange节点。
  • RelOptRule:PullUpWindowTableFunctionIntoWindowAggregateRule顾名思义,该规则将上面两个规则产生的RelNode进行整理,消除代表窗口TVF的物理节点,并将它的语义上拉至聚合节点中,形成最终的物理计划。

然后,StreamPhysicalWindowAggregate节点翻译成StreamExecWindowAggregate节点,进入执行阶段。

切片化窗口与执行

以前我们提过粒度太碎的滑动窗口会使得状态和Timer膨胀,比较危险,应该用滚动窗口+在线存储+读时聚合的方法代替。

社区在设计窗口TVF聚合时显然考虑到了这点,提出了切片化窗口(sliced window)的概念,并以此为基础设计了一套与DataStream API Windowing不同的窗口机制。

如下图的累积窗口所示,每两条纵向虚线之间的部分就是一个切片(slice)。

切片的本质就是将滑动/累积窗口化为滚动窗口,并尽可能地复用中间计算结果,降低状态压力。

自然地,前文所述的Local-Global聚合优化、Distinct解热点优化就都可以无缝应用了。

那么,切片是如何分配的呢?答案是通过SliceAssigner体系,其类图如下。

注意CumulativeSliceAssigner多了一个isIncremental()方法,这是下文所做优化的一步可见,对于滚动窗口而言,一个窗口就是一个切片;而对滑动/累积窗口而言,一个窗口可能包含多个切片,一个切片也可能位于多个窗口中。

更多Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)

所以共享切片的窗口要特别注意切片的过期与合并。

以负责累积窗口的CumulativeSliceAssigner为例,对应的逻辑如下。

@Override

public Iterable

expiredSlices(long windowEnd) {

long windowStart = getWindowStart(windowEnd);

long firstSliceEnd = windowStart + step;

long lastSliceEnd = windowStart + maxSize;

if (windowEnd == firstSliceEnd) {

// we share state in the first slice, skip cleanup for the first slice

reuseExpiredList.clear();

} else if (windowEnd == lastSliceEnd) {

// when this is the last slice,

// we need to cleanup the shared state (i.e. first slice) and the current slice

reuseExpiredList.reset(windowEnd, firstSliceEnd);

} else {

// clean up current slice

reuseExpiredList.reset(windowEnd);

return reuseExpiredList;

@Override

public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {

long windowStart = getWindowStart(sliceEnd);

long firstSliceEnd = windowStart + step;

if (sliceEnd == firstSliceEnd) {

// if this is the first slice, there is nothing to merge

reuseToBeMergedList.clear();

} else {

// otherwise, merge the current slice state into the first slice state

reuseToBeMergedList.reset(sliceEnd);

callback.merge(firstSliceEnd, reuseToBeMergedList);

可见,累积窗口的中间结果会被合并到第一个切片中。窗口未结束时,除了第一个切片之外的其他切片触发后都会过期。

实际处理切片化窗口的算子名为SlicingWindowOperator,它实际上是SlicingWindowProcessor的简单封装。SlicingWindowProcessor的体系如下。

SlicingWindowProcessor的三个重要组成部分分别是:

  • WindowBuffer:在托管内存区域分配的窗口数据缓存,避免在窗口未实际触发时高频访问状态;
  • WindowValueState:窗口的状态,其schema为[key, window_end, accumulator]。窗口结束时间作为窗口状态的命名空间(namespace);
  • NamespaceAggsHandleFunction:通过代码生成器AggsHandlerCodeGenerator生成的聚合函数体。注意它并不是一个AggregateFunction,但是大致遵循其规范。

每当一条数据到来时,调用AbstractWindowAggProcessor#processElement()方法,比较容易理解了。

@Override

public boolean processElement(RowData key, RowData element) throws Exception {

long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);

if (!isEventTime) {

// always register processing time for every element when processing time mode

windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);

if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {

// the assigned slice has been triggered, which means current element is late,

// but maybe not need to drop

long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);

if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {

// the last window has been triggered, so the element can be dropped now

return true;

} else {

windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);

// we need to register a timer for the next unfired window,

// because this may the first time we see elements under the key

long unfiredFirstWindow = sliceEnd;

while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {

unfiredFirstWindow += windowInterval;

windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);

return false;

} else {

// the assigned slice hasn't been triggered, accumulate into the assigned slice

windowBuffer.addElement(key, sliceEnd, element);

return false;

而当切片需要被合并时,先从WindowValueState中取出已有的状态,再遍历切片,并调用NamespaceAggsHandleFunction#merge()方法进行合并,最后更新状态。

@Override

public void merge(@Nullable Long mergeResult, Iterable

toBeMerged) throws Exception {

// get base accumulator

final RowData acc;

if (mergeResult == null) {

// null means the merged is not on state, create a new acc

acc = aggregator.createAccumulators();

} else {

RowData stateAcc = windowState.value(mergeResult);

if (stateAcc == null) {

acc = aggregator.createAccumulators();

} else {

acc = stateAcc;

// set base accumulator

aggregator.setAccumulators(mergeResult, acc);

// merge slice accumulators

for (Long slice : toBeMerged) {

RowData sliceAcc = windowState.value(slice);

if (sliceAcc != null) {

aggregator.merge(slice, sliceAcc);

// set merged acc into state if the merged acc is on state

if (mergeResult != null) {

windowState.update(mergeResult, aggregator.getAccumulators());

看官若要观察codegen出来的聚合函数的代码,可在log4j.properties文件中加上:

logger.codegen.name = org.apache.flink.table.runtime.generated

logger.codegen.level = DEBUG

一点改进

有很多天级聚合+秒级触发的Flink作业,在DataStream API时代多由ContinuousProcessingTimeTrigger实现,1.13版本之前的SQL则需要添加table.exec.emit.early-fire系列参数。

正式采用1.13版本后,累积窗口(cumulate window)完美契合此类需求。

但是,有些作业的key规模比较大,在一天的晚些时候会频繁向下游Redis刷入大量数据,造成不必要的压力。

更多Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)

因此,笔者对累积窗口TVF做了略有侵入的小改动,通过一个布尔参数INCREMENTAL可控制只输出切片之间发生变化的聚合结果。

操作很简单:

  • 修改SqlCumulateTableFunction函数的签名,以及配套的窗口参数类CumulativeWindowSpec等;
  • 修改SliceSharedWindowAggProcess#fireWindow()方法,如下。

@Override

public void fireWindow(Long windowEnd) throws Exception {

sliceSharedAssigner.mergeSlices(windowEnd, this);

// we have set accumulator in the merge() method

RowData aggResult = aggregator.getValue(windowEnd);

if (!isWindowEmpty()) {

if (sliceSharedAssigner instanceof CumulativeSliceAssigner

&& ((CumulativeSliceAssigner) sliceSharedAssigner).isIncremental()) {

RowData stateValue = windowState.value(windowEnd);

if (stateValue == null || !stateValue.equals(aggResult)) {

collect(aggResult);

} else {

collect(aggResult);

// we should register next window timer here,

// because slices are shared, maybe no elements arrived for the next slices

当然,此方案会带来访问状态的overhead,后续会做极限压测以观察性能,并做适当修改。

文章来源于大数据技术与架构

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
孙杨拒绝退役!而他却放弃中国国籍,转身替澳大利亚参赛!

孙杨拒绝退役!而他却放弃中国国籍,转身替澳大利亚参赛!

番茄说史聊
2024-05-14 23:16:11
大反转?总冠军悬了,韧带撕裂+左腿变形,独行侠机会来了

大反转?总冠军悬了,韧带撕裂+左腿变形,独行侠机会来了

体育新角度
2024-06-13 09:00:06
女人会不会忘掉和自己发生过关系的男人?43 岁女性真实坦白

女人会不会忘掉和自己发生过关系的男人?43 岁女性真实坦白

娱乐洞察点点
2024-06-14 06:02:37
韩国国门金承奎宣布结婚,妻子是27岁韩国女演员&模特金珍京

韩国国门金承奎宣布结婚,妻子是27岁韩国女演员&模特金珍京

直播吧
2024-06-13 17:26:35
湖记:布朗尼-詹姆斯在当地时间周四为湖人进行了试训!

湖记:布朗尼-詹姆斯在当地时间周四为湖人进行了试训!

直播吧
2024-06-14 07:29:08
大胜获0.01分!惨败扣18分!中国女排再陷绝境,蔡斌连做2大决定

大胜获0.01分!惨败扣18分!中国女排再陷绝境,蔡斌连做2大决定

邮轮摄影师阿嗵
2024-06-14 02:38:04
彻底倒向美囯?拒绝中方移民,驱离中方工人,中方大怒:永不合作

彻底倒向美囯?拒绝中方移民,驱离中方工人,中方大怒:永不合作

星辰故事屋
2024-04-27 19:04:44
不要低看18强赛,对于国足,这是我们的世界杯

不要低看18强赛,对于国足,这是我们的世界杯

柳十四
2024-06-12 13:39:04
4名美国人在吉林遇刺之后,看仇恨教育如何开花结果

4名美国人在吉林遇刺之后,看仇恨教育如何开花结果

顾礼先生
2024-06-12 16:36:11
媒体预测欧洲杯夺冠概率:英格兰19.9%最热 法国19.1%第二

媒体预测欧洲杯夺冠概率:英格兰19.9%最热 法国19.1%第二

直播吧
2024-06-13 07:29:10
2024“新首富”诞生:身价超4500亿,王健林重回前十,许家印落榜

2024“新首富”诞生:身价超4500亿,王健林重回前十,许家印落榜

财话连篇
2024-06-13 15:47:35
曝特斯拉中国总裁亲自彻查假订单,已有人因此离开

曝特斯拉中国总裁亲自彻查假订单,已有人因此离开

映射生活的身影
2024-06-13 19:45:00
一患者花120万打一针抗癌药,1个月后癌细胞消失五分之四:这药真贵,但能救命!社保不报

一患者花120万打一针抗癌药,1个月后癌细胞消失五分之四:这药真贵,但能救命!社保不报

保险课堂
2024-06-12 23:18:13
上海女子结婚,新郎全程搂着一脸深情,网友:鼻子高科技含量十足

上海女子结婚,新郎全程搂着一脸深情,网友:鼻子高科技含量十足

雅清故事汇
2024-06-13 18:08:43
“车外30℃车内75℃”,特斯拉的玻璃车顶快把打工人烤熟了?

“车外30℃车内75℃”,特斯拉的玻璃车顶快把打工人烤熟了?

Vista氢商业
2024-06-13 19:12:52
菲律宾前总统发言人:马科斯不是疯了,就是战争贩子

菲律宾前总统发言人:马科斯不是疯了,就是战争贩子

历史有些冷
2024-06-13 08:05:02
0+0+0+0!3千万合同成累赘,独行侠0-3后,没人注意这2人的表现

0+0+0+0!3千万合同成累赘,独行侠0-3后,没人注意这2人的表现

麦迪的篮球
2024-06-13 11:42:56
梅德韦杰夫称俄罗斯 必须每天以最大程度的伤害 来回应英美对俄制裁

梅德韦杰夫称俄罗斯 必须每天以最大程度的伤害 来回应英美对俄制裁

战域笔墨
2024-06-14 06:26:26
今日!CCTV5直播中国女排+欧洲杯开幕式揭幕战,5+转中超上海海港

今日!CCTV5直播中国女排+欧洲杯开幕式揭幕战,5+转中超上海海港

晚池
2024-06-14 00:43:12
06月14日00时金价暴跌!今日金价!各大金店价格!

06月14日00时金价暴跌!今日金价!各大金店价格!

小蜜情感说
2024-06-14 06:29:38
2024-06-14 07:42:44
IT爱好者小尚
IT爱好者小尚
分享IT教育类信息
630文章数 55关注度
往期回顾 全部

科技要闻

小红书员工仅1/5工龄满2年 32岁就不让进了

头条要闻

中专女生爆冷闯进全球数学竞赛12强 超越一众清北学生

头条要闻

中专女生爆冷闯进全球数学竞赛12强 超越一众清北学生

体育要闻

乔丹最想单挑的男人走了

娱乐要闻

森林北报案,称和汪峰的感情遭受压力

财经要闻

私募大佬孙强:中国为什么缺少耐心资本

汽车要闻

升级8155芯片 新款卡罗拉锐放售12.98-18.48万

态度原创

本地
数码
艺术
公开课
军事航空

本地新闻

粽情一夏|海河龙舟赛,竟然成了外国人的大party!

数码要闻

三星 Galaxy Watch 首款 FE 产品正式发布,199 美元起

艺术要闻

穿越时空的艺术:《马可·波罗》AI沉浸影片探索人类文明

公开课

近视只是视力差?小心并发症

军事要闻

美军演习将罕见以4万吨级准航母作为靶舰

无障碍浏览 进入关怀版