SQL DDL

通过前面的一些介绍,我们已经对Flink SQL有了一些了解。Flink SQL底层使用Dynamic Table来进行流处理,它支持了时间窗口和Join操作。本节将围绕SQL DDL,主要介绍创建、获取、修改和删除元数据时所涉及的一些注意事项。

注册和获取表

Catalog简介

Catalog记录并管理了各类元数据信息,比如我们有哪些数据库(Database)、数据库是以文件还是消息队列的形式存储、某个数据库中有哪些表和视图、当前有哪些可用的函数等。Catalog提供了一个注册、管理和访问这些元数据的API。一般情况下,一个Catalog中包含一到多个Database,一个Database包含一到多个Table。

常见的Catalog包括:GenericInMemoryCatalogHiveCatalog和用户自定义的Catalog。

GenericInMemoryCatalog将元数据存储在内存中,只在一个Session会话内生效。默认情况下,都是使用这种Catalog。

Hive的元数据管理功能是SQL-on-Hadoop领域事实上的标准,很多企业的生产环境使用Hive管理元数据,Hive可以管理包括纯Hive表和非Hive表。为了与Hive生态兼容,Flink推出了HiveCatalog。对于同时部署了Hive和Flink的环境,HiveCatalog允许用户在Flink SQL上读取原来Hive中的各个表。对于只部署了Flink的环境,HiveCatalog是目前将元数据持久化的唯一方式。持久化意味着某些元数据管理团队可以先将Kafka或HDFS中的数据注册到Catalog中,其他数据分析团队无需再次注册表,只需每次从Catalog中获取表,不用关心数据管理相关问题。如果没有进行元数据持久化,用户每次都需要注册表。

用户也可以自定义Catalog,需要实现Catalog接口类。

获取表

我们编写好SQL查询语句(SELECT ...语句)后,需要使用TableEnvironment.sqlQuery("SELECT ...")来执行这个SQL语句,这个方法返回的结果是一个TableTable可以用于后续的Table API & SQL查询,也可以将Table转换为DataStream或者DataSet。总之,Table API和SQL可以完美融合。

我们使用FROM table_name从某个表中查询数据,这个table_name表必须被注册到TableEnvironment中。注册有以下几种方式:

  • 使用SQL DDL中的CREATE TABLE ...创建表
  • TableEnvironment.connect()连接一个外部系统
  • 从Catalog中获取已注册的表

第一种SQL DDL的方式,我们会用CREATE TABLE table_name ...来明确指定一个表的名字为table_name。而第二种和第三种方式,我们是在Java/Scala代码中获取一个Table对象,获取对象后,明确使用createTemporaryView()方法声明一个Table并指定一个名字。例如,下面的代码中,我们获取了一个Table,并通过createTemporaryView()注册该Table名字为user_behavior

Table userBehaviorTable = tEnv.fromDataStream(userBehaviorStream, "user_id, item_id, behavior, ts.rowtime");

// createTemporaryView创建名为user_behavior的表
tEnv.createTemporaryView("user_behavior", userBehaviorTable);

对于TableEnvironment.connect()所连接的外部系统,也可以使用createTemporaryTable方法,以链式调用的方式注册名字:

tEnv
  // 使用connect函数连接外部系统
  .connect(...)
  // 序列化方式 可以是JSON、Avro等
  .withFormat(...)
  // 数据的Schema
  .withSchema(...)
  // 临时表的表名,后续可以在SQL语句中使用这个表名
  .createTemporaryTable("user_behavior");

注册好表名后,就可以在SQL语句中使用FROM user_behavior来查询该表。

即使没有明确指定表名,也没有问题。Table.toString()可以返回表的名字,如果没有给这个Table指定名字,Flink会为其自动分配一个唯一的表名,不会与其他表名冲突。如下所示:

// 获取一个Table,并没有明确为其分配表名
Table table = ...;
String tableName = table.toString();
tEnv.sqlQuery("SELECT * FROM " + tableName);

甚至可以省略toString方法,直接将Table对象与SQL语句用加号+连接,因为+会自动调用Table.toString()方法。

// 获取一个Table,并没有明确为其分配表名
Table table = ...;
// 加号+操作符会在编译期间自动调用Table.toString()方法
tEnv.sqlQuery("SELECT * FROM " + table);

HiveCatalog

如果想将元数据持久化到HiveCatalog中:

TableEnvironment tEnv = ...

// 创建一个HiveCatalog
// 四个参数分别为:catalogName、databaseName、hiveConfDir、hiveVersion
Catalog catalog = new HiveCatalog("mycatalog", null, "<path_of_hive_conf>", "<hive_version>");

// 注册catalog,取名为mycatalog
tEnv.registerCatalog("mycatalog", catalog);

// 创建一个Database,取名为mydb
tEnv.sqlUpdate("CREATE DATABASE mydb WITH (...)");

// 创建一个Table,取名为mytable
tEnv.sqlUpdate("CREATE TABLE mytable (name STRING, age INT) WITH (...)");

// 返回所有Table
tEnv.listTables(); 

USE和SHOW语句

创建完Catalog、Database后,可以像其他SQL引擎一样,使用SHOWUSE语句:

-- 展示所有Catalog
SHOW CATALOGS;

-- 使用mycatalog
USE CATALOG mycatalog;

-- 展示当前Catalog里所有Database
SHOW DATABASES;

-- 使用mydatabase
USE mydb;

-- 展示当前Catalog里当前Database里所有Table
SHOW TABLES;

这些语句需要粘贴到sqlUpdate方法中:

Catalog catalog = ...

tEnv.registerCatalog("mycatalog", catalog);
tEnv.sqlUpdate("USE catalog mycatalog");
tEnv.sqlUpdate("CREATE DATABASE mydb");
tEnv.sqlUpdate("USE mydb");

CREATE、DROP、ALTER

CREATEALTERDROP是SQL中最常见的三种DDL语句,可以创建、修改和删除数据库(Database)、表(Table)和函数(Function)。

CREATE语句可以创建一个Database、Table和Function:

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE FNCTION

ALTER语句可以修改已有的Database、Table和Function:

  • ALTER TABLE
  • ALTER DATABASE
  • ALTER FNCTION

DROP语句可以删除之前创建的Database、Table和Function:

  • DROP TABLE
  • DROP DATABASE
  • DROP FUNCTION

这些语句可以放到TableEnvironment.sqlUpdate()的参数里,也可以在SQL Client里执行。TableEnvironment.sqlUpdate()执行成功后没有返回结果,执行失败则会抛出异常。下面例子展示了如何在TableEnvironment中使用sqlUpdate

// tEnv:TableEnvironment
// CREATE TABLE
tEnv.sqlUpdate("CREATE TABLE user_behavior (" +
                "    user_id BIGINT," +
                "    item_id BIGINT," +
                "    category_id BIGINT," +
                "    behavior STRING," +
                "    ts TIMESTAMP(3)," +
                "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列" +
                ") ...");

// ALTER DATABASE
tEnv.sqlUpdate("ALTER DATABASE db1 set ('k1' = 'a', 'k2' = 'b')")

// DROP TABLE
tEnv.sqlUpdate("DROP TABLE user_behavior");

CREATE/ALTER/DROP TABLE

CREATE TABLE

CREATE TABLE需要按照下面的模板编写:

CREATE TABLE [catalog_name.][db_name.]table_name
  (
    { <column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)

一个名为table_nameTable隶属于一个名为db_name的Database,db_name又隶属于名为catalog_name的Catalog。如果不明确指定db_namecatalog_name,该表被注册到默认的Catalog和默认的Database中。如果table_name与已有的表名重复,会抛出异常。

<column_definition><computed_column_definition><watermark_definition>需要符合下面的模板:

<column_definition>:
  column_name column_type [COMMENT column_comment]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

COMMENT用来对字段做注释,使用DESCRIBE table_name命令时,可以查看到字段的一些注释和描述信息。

<column_definition>在传统的SQL DDL中经常见到,Watermark策略在窗口部分已经介绍,不再赘述,这里主要介绍计算列(Computed Column)。

计算列

计算列是虚拟字段,不是一个实际存储在表中的字段。计算列可以通过表达式、内置函数、或是自定义函数等方式,使用其它列的数据,计算出其该列的数值。比如,一个订单表中有单价price和数量quantity,总价可以被定义为total AS price * quantity。计算列表达式可以是已有的物理列、常量、函数的组合,但不能是SELECT ...式的子查询。计算列虽然是个虚拟字段,但在Flink SQL中可以像普通字段一样被使用。

计算列常常被用于定义时间属性,在窗口部分我们曾介绍了如何定义Processing Time:使用proc AS PROCTIME()来定义一个名为proc的计算列,该列可以在后续计算中被用做时间属性。PROCTIME()是Flink提供的内置函数,用来生成Processing Time。

CREATE TABLE mytable (
    id BIGINT,
    -- 在原有Schema基础上添加一列proc
    proc as PROCTIME()
) WITH (
    ...
);

关于计算列,相关知识点可以总结为:

  • 计算列是一个虚拟列,计算的过程可以由函数、已有物理列、常量等组成,可以用在SELECT语句中。
  • 计算列不能作为INSERT语句的输出目的地,或者说我们不能使用INSERT语句将数据插入到目标表的计算列上。

WITH

Table API & SQL简介中我们曾介绍连接外部系统时必须配置相应参数,这些参数以key=value等形式被放在WITH语句中。

CREATE TABLE my_table (
  -- Schema
  ...
) WITH (
  -- 声明参数
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  ...
)

在这个例子中使用的外部系统是Kafka,我们需要配置一些Kafka连接相关的参数。Flink的官方文档中有不同Connector的详细参数配置示例,这里不再详细介绍每种Connector需要配置哪些参数,

使用CREATE TABLE创建的表可以作为Source也可以作为Sink。

PARTITIONED BY

根据某个字段进行分区。如果这个表中的数据实际存储在一个文件系统上,Flink会为每个分区创建一个文件夹。例如,以日期为分区,那么每天的数据被放在一个文件夹中。PARTITIONED BY经常被用在批处理中。

这里是PARTITIONED BY,与OVER WINDOW中的PARTITON BY语法和含义均不同。

ALTER TABLE

ALTER TABLE目前支持修改表名和一些参数。

-- 修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

-- 修改参数,如果某个Key之前被WITH语句设置过,再次设置会将老数据覆盖
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

DROP TABLE

DROP TABLE用来删除一个表。IF EXISTS表示只对已经存在的表进行删除。

DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

CREATE/ALTER/DROP DATABASE

CREATE DATABASE

CREATE DATABASE一般使用下面的模板:

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)

这里的语法与其他SQL引擎比较相似。其中IF NOT EXISTS表示,如果这个Database不存在才创建,否则不会创建。

ALTER DATABASE

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

ALTER DATABASE支持修改参数,新数据会覆盖老数据。

DROP DATABASE

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]

RESTRICT选项表示如果Database非空,那么会抛出异常,默认开启本选项;CASCADE选项表示会将Database下所属的Table和Function等都删除。

CREATE/ALTER/DROP FUNCTION

除了传统SQL引擎都会提供的上述CREATE功能,Table API & SQL还提供了函数(Function)功能。我们也可以用Java或Scala语言自定义一个函数,然后注册进来,在SQL语句中使用。CREATE FUNCTION的模板如下所示,我们将在用户自定义函数部分的详细介绍如何使用Java/Scala自定义函数。

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
  [IF NOT EXISTS] [catalog_name.][db_name.]function_name 
  AS identifier [LANGUAGE JAVA|SCALA]

INSERT

INSERT语句可以向表中插入数据,一般用于向外部系统输出数据。它的语法模板如下:

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] 
SELECT ...

part_spec:
  (part_col_name1=val1 [, part_col_name2=val2, ...])

上面的SQL语句将SELECT查询结果写入到目标表中。OVERWRITE选项表示将原来的数据覆盖,否则新数据只是追加进去。PARTITION表示数据将写入哪个分区。如果CREATE TABLE是按照日期进行PARTITIONED BY分区,那么每个日期会有一个文件夹,PARTITION后要填入一个日期,数据将写入日期对应目录中。分区一般常用于批处理场景。

-- 创建一个表,用来做输出
CREATE TABLE behavior_cnt (
  user_id BIGINT,
  cnt BIGINT
) WITH (
  'connector.type' = 'filesystem',  -- 使用 filesystem connector
  'connector.path' = 'file:///tmp/behavior_cnt',  -- 输出地址为一个本地路径
  'format.type' = 'csv'  -- 数据源格式为 json
)

-- 向一个Append-only表中输出数据
INSERT INTO behavior_cnt 
SELECT 
	user_id, 
	COUNT(behavior) AS cnt 
FROM user_behavior 
GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)

或者是将特定的值写入目标表:

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]

例如:

CREATE TABLE students (name STRING, age INT, score DECIMAL(3, 2)) WITH (...);

INSERT INTO students
  VALUES ('Li Lei', 35, 1.28), ('Han Meimei', 32, 2.32);