Flink SQL 使用 calcite 框架作为 Flink SQL 的核心组建, calcite 虽然支持无限层的元数据结构,但 Flink SQL 中限制了元数据的组织结构为 3 级别 结构, 分别是 catalog, database, table 。

在 Flink SQL 中使用一张外部数据表, 大概有两种形式:

  1. 创建一个 catalog, 通过 catalog.database.table 来引用一张数据表.

-- 使用 CREATE CATALOG DDL
CREATE CATALOG my_catalog WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);

-- 使用 TableEnv 进行注册
HiveCatalog hiveCatalog = new HiveCatalog();

tbEnv.registerCatalog("hiveCatalog", hiveCatalog);

select * from my_catalog.database1.table1;
  1. 创建一个临时表, 通过 dafault_catalog.default_database.table 来引用这张数据表。

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);

-- 只需要引用表名即可进行表的读写操作 
-- (默认是 default_catalog.default_database,如果有切换到其他 catalog 和 database,则需要使用 default_catalog.default_database.MyUserTable)
select * from MyUserTable; 

作为一般的用户,使用 Flink SQL 进行开发, 使用上面的方式进行开发, 一切都比较容易, 作业中需要用到哪张表, 按照需要的方式进行引用即可。 但当随着业务表越来越多, 业务数据库越来越多,这时就会有专门的 meta 管理系统, 将这些系统的连接信息都保存在一个系统中。

但在 Flink SQL 要怎么引用任意一张表呢? 那就得把各个 catalog 都注册到 Flink SQL 的 catalog Manager 里面,但随着接入的系统或者 catalog 越来越多, 需要把当前已经存在的 catalog 都注册到 Flink SQL 中 ?

随着 Flink SQL 接入的系统越来越多, 虽然 catalog 的接入已经节省了各种数据表的的 CREATE TABLE 的编写, 但 create catalog 的成本依旧是比较多的, 同在在公司内部, 一般会提供统一的元数据管理, 这时 Flink SQL 需要对接的系统就更多了,我们需要提供一种更好的方式,帮助用户更加快速地创建 catalog.

如果可以在使用时,用到哪个 catalog 就会去注册哪个 catalog, 岂不是可以解决这个问题 ?

CREATE CATALOG my_catalog1 WITH(
    'type' = 'jdbc'
);

CREATE CATALOG my_catalog2 WITH(
    'type' = 'jdbc'
);

CREATE CATALOG my_catalog3 WITH(
    'type' = 'jdbc',
);

CREATE CATALOG my_catalog3 WITH(
    'type' = 'hive',
);

......

insert into my_catalog2.db1.tb1 select * from my_catalog3.db3.tb3;

当前 catalog 提供的接口

public class TableEnvironmentImpl implements TableEnvironmentInternal {
    // Flag that tells if the TableSource/TableSink used in this environment is stream table
    // source/sink,
    // and this should always be true. This avoids too many hard code.
    private static final boolean IS_STREAM_TABLE = true;
    private final CatalogManager catalogManager;
    ....

    // 注册一个 catalog
    @Override
    public void registerCatalog(String catalogName, Catalog catalog) {
        catalogManager.registerCatalog(catalogName, catalog);
    }

    // 获得一个 catalog
    @Override
    public Optional<Catalog> getCatalog(String catalogName) {
        return catalogManager.getCatalog(catalogName);
    }
}

通过查看 TalbeEnviromentImpl 提供的接口来看, 只提供了注册 catalog 和 getCatalog 的相关接口, 但是相关的注册逻辑和

classDiagram
TalbeEnviromentImpl *-- CatalogManager


class TalbeEnviromentImpl{
        -CatalogManager catalogManager
        +registerCatalog(catalogName, catalog)
        +getCatalog(catalogName)
}
class CatalogManager{
        -Map catalogs
        +registerCatalog(catalogName, catalog)
        +getCatalog(catalogName)
}

CatalogManager 的实现非常简单, 注册 catalog 的核心操作就是将 Catalog 放到一个 hashMap catalogs 中, 当需要获取某个 catalog 时,则从 hashMap 中获取即可, 当需要通过 objectIdentifier 获取某个 table 时, 先根据 catalog 名找到对应的 catalog,然后调用 catalog 的接口来获取 table 的元数据信息。

当前的 CatalogManager 是一个 final 类型, 无法通过继承来修改其 catalog 的相关行为, 只能将 catalog 以 hashMap 的形式保存在内存中。 因此,如果我们可以修改或者拓展 CatalogManager 的行为, 则可以做到使用时创建, 需要获取某个 catalog 时, 直接创建对应的 catalog 即可。

拓展 CatalogManager 的逻辑

  1. 增加一个新的接口, CatalogProvider
public interface CatalogProvider {
    Optional<Catalog> getCatalog(String catalogName);
}
  1. 给 CatalogManager 增加 CatalogProvider 的成员变量, 拓展 getCatalog 的逻辑
// 如果存在 CatalogProvider,如果 hashMap 中未获得 catalog,就通过 CatalogProvider 来获取

public CatalogManager {
    private @Nullable CatalogProvider catalogProvider;
    
    private Map<String, Catalog> catalogs;

    public void setCatalogProvider(CatalogProvider catalogProvider) {
        this.catalogProvider = catalogProvider;
    }
    
    public Optional<Catalog> getCatalog(String catalogName) {
        // 优先从注册的 catalogs map 中获取 catalog
        
        // 如果 catalogs 中没有对应的 catalog,则通过 catalogProvider 取 
        if (catalogProvider != null) {
            Optional<Catalog> catalog = catalogProvider.getCatalog(catalogName);
        }
    }
   
}
  1. 修改 TableEnv 的接口, 增加注册 catalogProvider 的逻辑

// 在 TableEnv 中,增加接口, 用户可以传入自己的 provider,能够构建

public TableEnviromentImpl {
    private CatalogManager catalogManager;


    public void setCatalogProvider(CatalogProvider catalogProvider) {
        this.catalogManager.setCatalogProvider(catalogProvider);
    }

}
  1. 增加 configuration,支持在配置中指定 catalogProvider, 支持在初始化 TableEnv 时自动通过配置创建 catalogProvider
flink.catalog.provider: org.apache.flink.xxxCatalogProvider

CatalogProvider 的使用

  1. 实现 CatalogProvider 接口

public static class MyCatalogProvider implements CatalogProvider {  
            private Map<String, Catalog> catalogs;  
  
            public MyCatalogProvider() throws DatabaseAlreadyExistException {  
                catalogs = new HashMap<>();  
  
                GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("catalog1");  
  
                catalog.createDatabase("database1", new CatalogDatabaseImpl(new HashMap<>(), ""), true);   
  
                catalogs.put("catalog1", catalog);  
            }  
  
  
            @Override  
            public Optional<Catalog> getCatalog(String catalogName) {  
                return Optional.ofNullable(catalogs.get(catalogName));  
            }  
  
            @Override  
            public Set<String> listCatalogs() {  
                return catalogs.keySet();  
            }  
    }
  1. 使用任意 catalogProvider 能够提供的 catalog
public static void main(String [] args) throws DatabaseAlreadyExistException {  
    TableEnvironmentImpl tbEnv = TableEnvironmentImpl.create(EnvironmentSettings  
            .newInstance()  
            .build());  
    tbEnv.registerCatalogProvider(new MyCatalogProvider());  
    // 注册 catalogProvider 之后,无需手动创建 catalog,即可引用任意一个 catalog
    tbEnv.executeSql("use catalog catalog1");
}