SQL DDL
通过前面的一些介绍,我们已经对Flink SQL有了一些了解。Flink SQL底层使用Dynamic Table来进行流处理,它支持了时间窗口和Join操作。本节将围绕SQL DDL,主要介绍创建、获取、修改和删除元数据时所涉及的一些注意事项。
注册和获取表
Catalog简介
Catalog记录并管理了各类元数据信息,比如我们有哪些数据库(Database)、数据库是以文件还是消息队列的形式存储、某个数据库中有哪些表和视图、当前有哪些可用的函数等。Catalog提供了一个注册、管理和访问这些元数据的API。一般情况下,一个Catalog中包含一到多个Database,一个Database包含一到多个Table。
常见的Catalog包括:GenericInMemoryCatalog
、HiveCatalog
和用户自定义的Catalog。
GenericInMemoryCatalog
将元数据存储在内存中,只在一个Session会话内生效。默认情况下,都是使用这种Catalog。
Hive的元数据管理功能是SQL-on-Hadoop领域事实上的标准,很多企业的生产环境使用Hive管理元数据,Hive可以管理包括纯Hive表和非Hive表。为了与Hive生态兼容,Flink推出了HiveCatalog
。对于同时部署了Hive和Flink的环境,HiveCatalog
允许用户在Flink SQL上读取原来Hive中的各个表。对于只部署了Flink的环境,HiveCatalog
是目前将元数据持久化的唯一方式。持久化意味着某些元数据管理团队可以先将Kafka或HDFS中的数据注册到Catalog中,其他数据分析团队无需再次注册表,只需每次从Catalog中获取表,不用关心数据管理相关问题。如果没有进行元数据持久化,用户每次都需要注册表。
用户也可以自定义Catalog,需要实现Catalog
接口类。
获取表
我们编写好SQL查询语句(SELECT ...
语句)后,需要使用TableEnvironment.sqlQuery("SELECT ...")
来执行这个SQL语句,这个方法返回的结果是一个Table
。Table
可以用于后续的Table API & SQL查询,也可以将Table
转换为DataStream
或者DataSet
。总之,Table API和SQL可以完美融合。
我们使用FROM table_name
从某个表中查询数据,这个table_name
表必须被注册到TableEnvironment
中。注册有以下几种方式:
- 使用SQL DDL中的
CREATE TABLE ...
创建表 TableEnvironment.connect()
连接一个外部系统- 从Catalog中获取已注册的表
第一种SQL DDL的方式,我们会用CREATE TABLE table_name ...
来明确指定一个表的名字为table_name
。而第二种和第三种方式,我们是在Java/Scala代码中获取一个Table
对象,获取对象后,明确使用createTemporaryView()
方法声明一个Table
并指定一个名字。例如,下面的代码中,我们获取了一个Table
,并通过createTemporaryView()
注册该Table
名字为user_behavior
。
Table userBehaviorTable = tEnv.fromDataStream(userBehaviorStream, "user_id, item_id, behavior, ts.rowtime");
// createTemporaryView创建名为user_behavior的表
tEnv.createTemporaryView("user_behavior", userBehaviorTable);
对于TableEnvironment.connect()
所连接的外部系统,也可以使用createTemporaryTable
方法,以链式调用的方式注册名字:
tEnv
// 使用connect函数连接外部系统
.connect(...)
// 序列化方式 可以是JSON、Avro等
.withFormat(...)
// 数据的Schema
.withSchema(...)
// 临时表的表名,后续可以在SQL语句中使用这个表名
.createTemporaryTable("user_behavior");
注册好表名后,就可以在SQL语句中使用FROM user_behavior
来查询该表。
即使没有明确指定表名,也没有问题。Table.toString()
可以返回表的名字,如果没有给这个Table
指定名字,Flink会为其自动分配一个唯一的表名,不会与其他表名冲突。如下所示:
// 获取一个Table,并没有明确为其分配表名
Table table = ...;
String tableName = table.toString();
tEnv.sqlQuery("SELECT * FROM " + tableName);
甚至可以省略toString
方法,直接将Table
对象与SQL语句用加号+
连接,因为+
会自动调用Table.toString()
方法。
// 获取一个Table,并没有明确为其分配表名
Table table = ...;
// 加号+操作符会在编译期间自动调用Table.toString()方法
tEnv.sqlQuery("SELECT * FROM " + table);
HiveCatalog
如果想将元数据持久化到HiveCatalog
中:
TableEnvironment tEnv = ...
// 创建一个HiveCatalog
// 四个参数分别为:catalogName、databaseName、hiveConfDir、hiveVersion
Catalog catalog = new HiveCatalog("mycatalog", null, "<path_of_hive_conf>", "<hive_version>");
// 注册catalog,取名为mycatalog
tEnv.registerCatalog("mycatalog", catalog);
// 创建一个Database,取名为mydb
tEnv.sqlUpdate("CREATE DATABASE mydb WITH (...)");
// 创建一个Table,取名为mytable
tEnv.sqlUpdate("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
// 返回所有Table
tEnv.listTables();
USE和SHOW语句
创建完Catalog、Database后,可以像其他SQL引擎一样,使用SHOW
和USE
语句:
-- 展示所有Catalog
SHOW CATALOGS;
-- 使用mycatalog
USE CATALOG mycatalog;
-- 展示当前Catalog里所有Database
SHOW DATABASES;
-- 使用mydatabase
USE mydb;
-- 展示当前Catalog里当前Database里所有Table
SHOW TABLES;
这些语句需要粘贴到sqlUpdate
方法中:
Catalog catalog = ...
tEnv.registerCatalog("mycatalog", catalog);
tEnv.sqlUpdate("USE catalog mycatalog");
tEnv.sqlUpdate("CREATE DATABASE mydb");
tEnv.sqlUpdate("USE mydb");
CREATE、DROP、ALTER
CREATE
、ALTER
、DROP
是SQL中最常见的三种DDL语句,可以创建、修改和删除数据库(Database)、表(Table)和函数(Function)。
CREATE
语句可以创建一个Database、Table和Function:
CREATE TABLE
CREATE DATABASE
CREATE FNCTION
ALTER
语句可以修改已有的Database、Table和Function:
ALTER TABLE
ALTER DATABASE
ALTER FNCTION
DROP
语句可以删除之前创建的Database、Table和Function:
DROP TABLE
DROP DATABASE
DROP FUNCTION
这些语句可以放到TableEnvironment.sqlUpdate()
的参数里,也可以在SQL Client里执行。TableEnvironment.sqlUpdate()
执行成功后没有返回结果,执行失败则会抛出异常。下面例子展示了如何在TableEnvironment
中使用sqlUpdate
:
// tEnv:TableEnvironment
// CREATE TABLE
tEnv.sqlUpdate("CREATE TABLE user_behavior (" +
" user_id BIGINT," +
" item_id BIGINT," +
" category_id BIGINT," +
" behavior STRING," +
" ts TIMESTAMP(3)," +
" WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列" +
") ...");
// ALTER DATABASE
tEnv.sqlUpdate("ALTER DATABASE db1 set ('k1' = 'a', 'k2' = 'b')")
// DROP TABLE
tEnv.sqlUpdate("DROP TABLE user_behavior");
CREATE/ALTER/DROP TABLE
CREATE TABLE
CREATE TABLE
需要按照下面的模板编写:
CREATE TABLE [catalog_name.][db_name.]table_name
(
{ <column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
一个名为table_name
的Table
隶属于一个名为db_name
的Database,db_name
又隶属于名为catalog_name
的Catalog。如果不明确指定db_name
和catalog_name
,该表被注册到默认的Catalog和默认的Database中。如果table_name
与已有的表名重复,会抛出异常。
<column_definition>
、<computed_column_definition>
和<watermark_definition>
需要符合下面的模板:
<column_definition>:
column_name column_type [COMMENT column_comment]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
COMMENT
用来对字段做注释,使用DESCRIBE table_name
命令时,可以查看到字段的一些注释和描述信息。
<column_definition>
在传统的SQL DDL中经常见到,Watermark策略在窗口部分已经介绍,不再赘述,这里主要介绍计算列(Computed Column)。
计算列
计算列是虚拟字段,不是一个实际存储在表中的字段。计算列可以通过表达式、内置函数、或是自定义函数等方式,使用其它列的数据,计算出其该列的数值。比如,一个订单表中有单价price
和数量quantity
,总价可以被定义为total AS price * quantity
。计算列表达式可以是已有的物理列、常量、函数的组合,但不能是SELECT ...
式的子查询。计算列虽然是个虚拟字段,但在Flink SQL中可以像普通字段一样被使用。
计算列常常被用于定义时间属性,在窗口部分我们曾介绍了如何定义Processing Time:使用proc AS PROCTIME()
来定义一个名为proc
的计算列,该列可以在后续计算中被用做时间属性。PROCTIME()
是Flink提供的内置函数,用来生成Processing Time。
CREATE TABLE mytable (
id BIGINT,
-- 在原有Schema基础上添加一列proc
proc as PROCTIME()
) WITH (
...
);
关于计算列,相关知识点可以总结为:
- 计算列是一个虚拟列,计算的过程可以由函数、已有物理列、常量等组成,可以用在
SELECT
语句中。 - 计算列不能作为
INSERT
语句的输出目的地,或者说我们不能使用INSERT
语句将数据插入到目标表的计算列上。
WITH
在Table API & SQL简介中我们曾介绍连接外部系统时必须配置相应参数,这些参数以key=value
等形式被放在WITH
语句中。
CREATE TABLE my_table (
-- Schema
...
) WITH (
-- 声明参数
'connector.type' = 'kafka',
'connector.version' = 'universal',
...
)
在这个例子中使用的外部系统是Kafka,我们需要配置一些Kafka连接相关的参数。Flink的官方文档中有不同Connector的详细参数配置示例,这里不再详细介绍每种Connector需要配置哪些参数,
使用CREATE TABLE
创建的表可以作为Source也可以作为Sink。
PARTITIONED BY
根据某个字段进行分区。如果这个表中的数据实际存储在一个文件系统上,Flink会为每个分区创建一个文件夹。例如,以日期为分区,那么每天的数据被放在一个文件夹中。PARTITIONED BY
经常被用在批处理中。
注
这里是PARTITIONED BY
,与OVER WINDOW
中的PARTITON BY
语法和含义均不同。
ALTER TABLE
ALTER TABLE
目前支持修改表名和一些参数。
-- 修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
-- 修改参数,如果某个Key之前被WITH语句设置过,再次设置会将老数据覆盖
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
DROP TABLE
DROP TABLE
用来删除一个表。IF EXISTS
表示只对已经存在的表进行删除。
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
CREATE/ALTER/DROP DATABASE
CREATE DATABASE
CREATE DATABASE
一般使用下面的模板:
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
这里的语法与其他SQL引擎比较相似。其中IF NOT EXISTS
表示,如果这个Database不存在才创建,否则不会创建。
ALTER DATABASE
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
ALTER DATABASE
支持修改参数,新数据会覆盖老数据。
DROP DATABASE
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
RESTRICT
选项表示如果Database非空,那么会抛出异常,默认开启本选项;CASCADE
选项表示会将Database下所属的Table和Function等都删除。
CREATE/ALTER/DROP FUNCTION
除了传统SQL引擎都会提供的上述CREATE
功能,Table API & SQL还提供了函数(Function)功能。我们也可以用Java或Scala语言自定义一个函数,然后注册进来,在SQL语句中使用。CREATE FUNCTION
的模板如下所示,我们将在用户自定义函数部分的详细介绍如何使用Java/Scala自定义函数。
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [catalog_name.][db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA]
INSERT
INSERT
语句可以向表中插入数据,一般用于向外部系统输出数据。它的语法模板如下:
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec]
SELECT ...
part_spec:
(part_col_name1=val1 [, part_col_name2=val2, ...])
上面的SQL语句将SELECT
查询结果写入到目标表中。OVERWRITE
选项表示将原来的数据覆盖,否则新数据只是追加进去。PARTITION
表示数据将写入哪个分区。如果CREATE TABLE
是按照日期进行PARTITIONED BY
分区,那么每个日期会有一个文件夹,PARTITION
后要填入一个日期,数据将写入日期对应目录中。分区一般常用于批处理场景。
-- 创建一个表,用来做输出
CREATE TABLE behavior_cnt (
user_id BIGINT,
cnt BIGINT
) WITH (
'connector.type' = 'filesystem', -- 使用 filesystem connector
'connector.path' = 'file:///tmp/behavior_cnt', -- 输出地址为一个本地路径
'format.type' = 'csv' -- 数据源格式为 json
)
-- 向一个Append-only表中输出数据
INSERT INTO behavior_cnt
SELECT
user_id,
COUNT(behavior) AS cnt
FROM user_behavior
GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)
或者是将特定的值写入目标表:
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
例如:
CREATE TABLE students (name STRING, age INT, score DECIMAL(3, 2)) WITH (...);
INSERT INTO students
VALUES ('Li Lei', 35, 1.28), ('Han Meimei', 32, 2.32);