您现在的位置是:首页 >学无止境 >Hudi(六)集成Hive网站首页学无止境
Hudi(六)集成Hive
Hudi源表对应一份HDFS数据,通过Spark,Flink组件或者HudiCLI,可以将Hudi表的数据映射为Hive外部表,基于该外部表,Hive可以方便的进行实时视图,读优化视图以及增量视图的查询。
1、集成步骤
1、拷贝编译好的jar包
cp /opt/software/hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar /opt/module/hive/lib/
cp /opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib/
2、配置完后重启hive
// 按照需求选择合适的方式重启
nohup hive --service metastore &
nohup hive --service hiveserver2 &
2、Hive同步
2.1、Flink同步Hive
1、使用方式
Flink hive sync现在支持两种hive sync mode, 分别是hms和jdbc模式。 其中hms只需要配置metastore uris;而jdbc模式需要同时配置jdbc属性和metastore uris,具体配置模版如下:
## hms mode 配置
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成parquet文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.table'='${hive_table}', --required, hive新建的表名
'hive_sync.db'='${hive_db}', -- required, hive 新建的数据库名
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口
);
3、Flink使用HiveCatalog
flink使用hivetalog不仅可以看到hudi的元数据,还可以看到hive的元数据,还可以调用hive的函数。
CREATE CATALOG hive_catalog
WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/opt/module/hive/conf',
'hadoop-conf-dir'='/opt/module/hadoop/etc/hadoop'
);
use catalog hive_catalog;
-- hive-connector内置了hive module,提供了hive自带的系统函数
load module hive with ('hive-version'='2.7.6');
show modules;
show functions;
-- 可以调用hive的split函数
select split('a,b', ',');
4、创建Hive外表
一般来说Hudi表在用Spark或者Flink写入数据时会自动同步到Hive外部表,此时可以直接通过beeline查询同步的外部表,若写入引擎没有开启自动同步,则需要手动利用hudi客户端工具run_hive_sync_tool.sh进行同步,具体后面介绍。
5、查询Hive外表
5.1、设置参数
使用Hive查询Hudi表前,需要通过set命令设置hive.input.format,否则会出现数据重复,查询异常等错误,如下面这个报错就是典型的没有设置hive.input.format导致的:
java.lang.IllegalArgumentException: HoodieRealtimeReader can oly work on RealTimeSplit and not with xxxxxxxxxx
除此之外对于增量查询,还需要set命令额外设置3个参数。
set hoodie.mytableName.consume.mode=INCREMENTAL;
set hoodie.mytableName.consume.max.commits=3;
set hoodie.mytableName.consume.start.timestamp=commitTime;
注意这3个参数是表级别参数。
参数名 | 描述 |
hoodie.mytableName.consume.mode | Hudi表的查询模式。 增量查询 :INCREMENTAL。 非增量查询:不设置或者设为SNAPSHOT |
hoodie.mytableName.consume.start.timestamp | Hudi表增量查询起始时间。 |
hoodie. mytableName.consume.max.commits | Hudi表基于 hoodie.mytableName.consume.start.timestamp之后要查询的增量commit次数。 例如: 设置为3时,增量查询从指定的起始时间之后commit 3次的数据 设为-1时,增量查询从指定的起始时间之后提交的所有数据 |
5.2、 COW表查询
这里假设同步的 Hive 外表名为hudi_cow。
5.2.1、实时视图
设置hive.input.format为以下两个之一:
- org.apache.hadoop.hive.ql.io.HiveInputFormat
- org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat
像普通的hive表一样查询即可:
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
select count(*) from hudi_cow;
5.2.2、增量视图
除了要设置hive.input.format,还需要设置上述的3个增量查询参数,且增量查询语句中的必须添加where关键字并将`_hoodie_commit_time>'startCommitTime'作为过滤条件(这地方主要是hudi的小文件合并会把新旧commit的数据合并成新数据,hive是没法直接从parquet文件知道哪些是新数据哪些是老数据)
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hoodie.hudicow.consume.mode= INCREMENTAL;
set hoodie.hudicow.consume.max.commits=3;
set hoodie.hudicow.consume.start.timestamp= xxxx;
select count(*) from hudicow where `_hoodie_commit_time`>'xxxx'
-- (这里注意`_hoodie_commit_time` 的引号是反引号(tab键上面那个)不是单引号, 'xxxx'是单引号)
5.3、MOR表查询
这里假设MOR类型Hudi源表的表名为hudi_mor,映射为两张Hive外部表hudi_mor_ro(ro表)和hudi_mor_rt(rt表)。
5.3.1、实时视图
针对的是rt表,最新的全量数据。设置了hive.input.format之后,即可查询到Hudi源表的最新数据
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
select * from hudicow_rt;
5.3.2、读优化视图
针对的是ro表,只会查询到列式数据 。ro表全称read optimized table,对于MOR表同步的xxx_ro表,只暴露压缩后的parquet。其查询方式和COW表类似。设置完hiveInputFormat之后和普通的Hive表一样查询即可。
5.3.3、增量视图
这个增量查询针对的rt表,不是ro表。同COW表的增量查询类似:
set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; // 这地方指定为HoodieCombineHiveInputFormat
set hoodie.hudimor.consume.mode=INCREMENTAL;
set hoodie.hudimor.consume.max.commits=-1;
set hoodie.hudimor.consume.start.timestamp=xxxx;
select * from hudimor_rt where `_hoodie_commit_time`>'xxxx';// 这个表名要是rt表
说明:
- set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;最好只用于rt表的增量查询当然其他种类的查询也可以设置为这个,这个参数会影响到普通的hive表查询,因此在rt表增量查询完成后,应该设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;或者改为默认值set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;用于其他表的查询。
- set hoodie.mytableName.consume.mode=INCREMENTAL;仅用于该表的增量查询模式,若要对该表切换为其他查询模式,应设置set hoodie.hudisourcetablename.consume.mode=SNAPSHOT;
6、hive sync tool
若写入引擎没有开启自动同步,则需要手动利用Hudi客户端工具进行同步,Hudi提供Hive sync tool用于同步Hudi最新的元数据(包含自动建表、增加字段、同步分区信息)到hive metastore。
Hive sync tool提供三种同步模式,JDBC,HMS,HIVESQL。这些模式只是针对Hive执行DDL的三种不同方式。在这些模式中,JDBC或HMS优于HIVESQL, HIVESQL主要用于运行DML而不是DDL。
6.1、使用语法及参数
脚本位置在hudi源码路径下的hudi-sync/hudi-hive-sync/run_sync_tool.sh。注意jdbc方式的端口是10000,hms方式的端口是9083
6.1.1、语法
#查看语法帮助
./run_sync_tool.sh --help
#语法:
./run_sync_tool.sh
--jdbc-url jdbc:hive2://hiveserver:10000
--user hive
--pass hive
--partitioned-by partition
--base-path <basePath>
--database default
--table <tableName>
从Hudi0.5.1版本开始,读时合并优化版本的表默认带有'_ro'后缀。为了向后兼容旧的Hudi版本,提供了一个可选的配置--skip-ro-suffix,如果需要,可以关闭'_ro'后缀。
6.1.2、参数说明
HiveSyncConfig | DataSourceWriteOption | 描述 |
--database | hoodie.datasource.hive_sync.database | 同步到hive的目标库名 |
--table | hoodie.datasource.hive_sync.table | 同步到hive的目标表名 |
--user | hoodie.datasource.hive_sync.username | hive metastore 用户名 |
--pass | hoodie.datasource.hive_sync.password | hive metastore 密码 |
--use-jdbc | hoodie.datasource.hive_sync.use_jdbc | 使用JDBC连接到hive metastore |
--jdbc-url | hoodie.datasource.hive_sync.jdbcurl | Hive metastore url |
--sync-mode | hoodie.datasource.hive_sync.mode | 同步hive元数据的方式. 有效值为 hms, jdbc 和hiveql. |
--partitioned-by | hoodie.datasource.hive_sync.partition_fields | hive分区字段名,多个字段使用逗号连接. |
--partition-value-extractor | hoodie.datasource.hive_sync.partition_extractor_class | 解析分区值的类名,默认SlashEncodedDayPartitionValueExtractor |