应用系统定制开发Flink SQL增量查询Hudi表

前言

应用系统定制开发前面总结了和。应用系统定制开发最近项目上也有Flink SQL应用系统定制开发增量查询表的需求,应用系统定制开发正好学习总结一下。

官网文档

地址:

参数

  • read.start-commit 应用系统定制开发增量查询开始时间 对于流读,应用系统定制开发如果不指定该值,应用系统定制开发默认取最新的instantTime,应用系统定制开发也就是流读默认从最新的instantTime开始读(包含最新的)。对于批读,如果不指定该参数,只指定read.end-commit,则实现时间旅行的功能,可查询历史记录
  • read.end-commit 增量查询结束时间 不指定该参数则默认读取到最新的记录,该参数一般只适用于批读,因为流读一般的需求是查询所有的增量数据
  • read.streaming.enabled 是否流读 默认false
  • read.streaming.check-interval 流读的检查时间间隔,单位秒(s),默认值60,也就是一分钟
    查询范围 [BEGIN_INSTANTTIME,END_INSTANTTIME],既包含开始时间又包含结束时间,对于默认值可参考上面的参数说明

版本

建表造数:

  • Hudi 0.9.0
  • Spark 2.4.5

我这里建表造数使用Hudi Spark SQL 0.9.0,目的是为了模拟项目上用Java Client和Spark SQL创建的Hudi表,以验证Hudi Flink SQL增量查询时是否兼容旧版本的Hudi表(大家没有这种需求的,可以使用任何方式正常造数)

查询

  • Hudi 0.13.0-SNAPSHOT
  • Flink 1.14.3 (增量查询)
  • Spark 3.1.2 (主要是为了使用Call Procedures命令查看commit信息)

建表造数

-- Spark SQL Hudi 0.9.0create table hudi.test_flink_incremental (  id int,  name string,  price double,  ts long,  dt string) using hudi partitioned by (dt) options (  primaryKey = 'id',  preCombineField = 'ts',  type = 'cow');insert into hudi.test_flink_incremental values (1,'a1', 10, 1000, '2022-11-25');insert into hudi.test_flink_incremental values (2,'a2', 20, 2000, '2022-11-25');update hudi.test_flink_incremental set name='hudi2_update' where id = 2;insert into hudi.test_flink_incremental values (3,'a3', 30, 3000, '2022-11-26');insert into hudi.test_flink_incremental values (4,'a4', 40, 4000, '2022-12-26');
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

用show_commits看一下有哪些commits(这里查询用的是Hudi的master,因为show_commits是在0.11.0版本开始支持的,也可以通过使用hadoop命令查看.hoodie文件夹下的.commit文件)

call show_commits(table => 'hudi.test_flink_incremental');
  • 1
2022120515273620221205152723202212051527122022120515270220221205152650
  • 1
  • 2
  • 3
  • 4
  • 5

Flink SQL创建Hudi内存表

CREATE TABLE test_flink_incremental (  id int PRIMARY KEY NOT ENFORCED,  name VARCHAR(10),  price double,  ts bigint,  dt VARCHAR(10))PARTITIONED BY (dt)WITH (  'connector' = 'hudi',  'path' = 'hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental'); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

建表时不指定增量查询相关的参数,我们在查询时动态指定,这样比较灵活。
动态指定参数方法,在查询语句后面加上如下形式的语句

/*+ options(  'read.start-commit' = '20221205152723',  'read.end-commit'='20221205152736') */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

批读

Flink SQL读Hudi有两种模式:批读和流读。默认批读,先看一下批读的增量查询

验证是否包含起始时间和默认结束时间

select * from test_flink_incremental /*+ options(	'read.start-commit' = '20221205152723' --起始时间对应id=3的记录) */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

结果包含起始时间,不指定结束时间默认读到最新的数据

id   name     price        ts                 dt 4     a4      40.0      4000      dt=2022-12-26 3     a3      30.0      3000      dt=2022-11-26
  • 1
  • 2
  • 3

验证是否包含结束时间

select * from test_flink_incremental /*+ options(	'read.start-commit' = '20221205152712',  --起始时间对应id=2的记录    'read.end-commit'='20221205152723'       --结束时间对应id=3的记录) */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

结果包含结束时间

id           name        price       ts                 dt 3             a3        30.0      3000      dt=2022-11-26 2   hudi2_update        20.0      2000      dt=2022-11-25
  • 1
  • 2
  • 3

验证默认开始时间

这种情况是指定结束时间,但不指定开始时间,如果都不指定,则读表所有的最新版本的记录。

select * from test_flink_incremental /*+ options(    'read.end-commit'='20221205152712'       --结束时间对应id=2的更新记录) */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

结果:只查询end-commit对应的记录

id           name        price       ts                 dt 2   hudi2_update        20.0      2000      dt=2022-11-25
  • 1
  • 2

时间旅行(查询历史记录)

验证是否可以查询历史记录,我们更新id为2的name,更新前name为a2,更新后为hudi2_update,我们验证一下,是否可以通过Flink SQL查询Hudi历史记录,逾期结果查出id=2,name=a2

select * from test_flink_incremental /*+ options(    'read.end-commit'='20221205152702'       --结束时间对应id=2的历史记录) */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

结果:可以正确查询历史记录

id           name        price       ts                 dt 2             a2        20.0      2000      dt=2022-11-25
  • 1
  • 2

流读

开启流读的参数:

read.streaming.enabled = true
  • 1

流读不需要设置结束时间,因为一般的需求是读所有的增量数据,我们只需要验证开始时间就好了

验证默认开始时间

select * from test_flink_incremental /*+ options(    'read.streaming.enabled'='true',    'read.streaming.check-interval' = '4') */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

结果:从最新的instantTime开始增量读取,也就是默认的read.start-commit为最新的instantTime

id   name     price        ts                 dt 4     a4      40.0      4000      dt=2022-12-26
  • 1
  • 2

验证指定开始时间

select * from test_flink_incremental /*+ options(    'read.streaming.enabled'='true',    'read.streaming.check-interval' = '4',    'read.start-commit' = '20221205152712') */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

结果:

id           name        price       ts                 dt 2   hudi2_update        20.0      2000      dt=2022-11-25 3             a3        30.0      3000      dt=2022-11-26 4             a4        40.0      4000      dt=2022-11-26
  • 1
  • 2
  • 3
  • 4

如果想第一次查询全部的历史数据,可以将start-commit设置的早一点,比如设置到去年:‘read.start-commit’ = ‘20211205152712’

select * from test_flink_incremental /*+ options(    'read.streaming.enabled'='true',    'read.streaming.check-interval' = '4',    'read.start-commit' = '20211205152712') */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
id           name        price       ts                 dt 1             a1        10.0      1000      dt=2022-11-25 2   hudi2_update        20.0      2000      dt=2022-11-25 3             a3        30.0      3000      dt=2022-11-26 4             a4        40.0      4000      dt=2022-11-26
  • 1
  • 2
  • 3
  • 4
  • 5

验证流读的连续性

验证新的增量数据进来,是否可以持续消费Hudi增量数据,验证数据的准确一致性,为了方便验证,我可以使用Flink SQL增量流读Hudi表然后Sink到MySQL表中,最后通过读取MySQL表中的数据验证数据的准确性

Flink SQL读写MySQL需要配置jar包,将flink-connector-jdbc_2.12-1.14.3.jar放到lib下即可,下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.3/flink-connector-jdbc_2.12-1.14.3.jar

先在MySQL中创建一张Sink表

-- MySQLCREATE TABLE `test_sink` (  `id` int(11),  `name` text DEFAULT NULL,  `price` int(11),  `ts` int(11),  `dt`  text DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Flink中创建对应的sink表

create table test_sink (  id int,  name string,  price double,  ts bigint,  dt string) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8', 'username' = 'root', 'password' = 'root-123', 'table-name' = 'test_sink', 'sink.buffer-flush.max-rows' = '1');
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

然后流式增量读取Hudi表Sink Mysql

insert into test_sinkselect * from test_flink_incremental /*+ options(    'read.streaming.enabled'='true',    'read.streaming.check-interval' = '4',    'read.start-commit' = '20221205152712') */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这样会起一个长任务,一直处于running状态,我们可以在yarn-session界面上验证这一点

然后先在MySQL中验证一下历史数据的准确性

再利用Spark SQL往source表插入两条数据

-- Spark SQLinsert into hudi.test_flink_incremental values (5,'a5', 50, 5000, '2022-12-07');insert into hudi.test_flink_incremental values (6,'a6', 60, 6000, '2022-12-07');
  • 1
  • 2
  • 3

我们增量读取的间隔设置的4s,成功插入数据等待4s后,再在MySQL表中验证一下数据

发现新增的数据已经成功Sink到MySQL中了,并且数据没有重复

最后验证一下更新的增量数据,Spark SQL更新Hudi source表

-- Spark SQLupdate hudi.test_flink_incremental set name='hudi5_update' where id = 5;
  • 1
  • 2

继续验证结果

结果是更新的增量数据也会insert到MySQL中的sink表,但是不会更新原来的数据

那如果想实现更新的效果呢?我们需要在MySQL和Flink的sink表中加上主键字段,两者缺一不可,如下:

-- MySQLCREATE TABLE `test_sink` (  `id` int(11),  `name` text DEFAULT NULL,  `price` int(11),  `ts` int(11),  `dt`  text DEFAULT NULL,   PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
-- Flink SQLcreate table test_sink (  id int PRIMARY KEY NOT ENFORCED,  name string,  price double,  ts bigint,  dt string) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8', 'username' = 'root', 'password' = 'root-123', 'table-name' = 'test_sink', 'sink.buffer-flush.max-rows' = '1');
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

将刚才起的长任务关掉,重新执行刚才的insert语句,先跑一下历史数据,最后再验证一下增量效果

-- Spark SQLupdate hudi.test_flink_incremental set name='hudi6_update' where id = 6;insert into hudi.test_flink_incremental values (7,'a7', 70, 7000, '2022-12-07');
  • 1
  • 2
  • 3

可以看到,达到了预期效果,对于id=6的执行更新操作,对于id=7的执行插入操作。

相关阅读

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发