当前位置:网站首页>数据湖(十七):Flink与Iceberg整合DataStream API操作
数据湖(十七):Flink与Iceberg整合DataStream API操作
2022-07-15 14:01:00 【51CTO】
Flink与Iceberg整合DataStream API操作
目前Flink支持使用DataStream API 和SQL API 方式实时读取和写入Iceberg表,建议大家使用SQL API 方式实时读取和写入Iceberg表。
Iceberg 支持的Flink版本为1.11.x版本以上,目前经过测试Iceberg版本与Flink的版本对应关系如下:
- Flink1.11.x版本与Iceberg0.11.1版本匹配。
- Flink1.12.x~Flink1.1.x 版本与Iceberg0.12.1版本匹配,SQL API有一些bug。
- Flink1.14.x版本与Iceberg0.12.1版本能整合但是有一些小bug,例如实时读取Iceberg中的数据有bug。
以下Flink与Iceberg整合使用的Flink版本为1.13.5,Iceberg版本为0.12.1版本。后期使用SQL API 操作时使用的Flink版本为1.11.6,Iceberg版本为0.11.1版本。
一、DataStream API 实时写入Iceberg表
DataStream Api方式操作Iceberg方式目前仅支持Java Api。使用DataStream API 实时写入Iceberg表具体操作如下:
1、首先在Maven中导入以下依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- flink 1.12.x -1.13.x 版本与Iceberg 0.12.1 版本兼容 ,不能与Flink 1.14 兼容-->
<flink.version>1.13.5</flink.version>
<!--<flink.version>1.12.1</flink.version>-->
<!--<flink.version>1.14.2</flink.version>-->
<!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
<!--<flink.version>1.11.6</flink.version>-->
<hadoop.version>3.2.2</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-iceberg</artifactId>
<version>1.13-vvr-4.0.7</version>
</dependency>
<!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.12.1</version>
<!--<version>0.11.1</version>-->
</dependency>
<!-- java 开发Flink 所需依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 读取hdfs文件需要jar包-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.

2、编写代码使用DataStream API将Kafka数据写入到Iceberg表
import
com.
google.
common.
collect.
ImmutableMap;
import
org.
apache.
flink.
api.
common.
eventtime.
WatermarkStrategy;
import
org.
apache.
flink.
api.
common.
functions.
MapFunction;
import
org.
apache.
flink.
api.
common.
serialization.
SimpleStringSchema;
import
org.
apache.
flink.
connector.
kafka.
source.
KafkaSource;
import
org.
apache.
flink.
connector.
kafka.
source.
enumerator.
initializer.
OffsetsInitializer;
import
org.
apache.
flink.
streaming.
api.
datastream.
DataStreamSource;
import
org.
apache.
flink.
streaming.
api.
datastream.
SingleOutputStreamOperator;
import
org.
apache.
flink.
streaming.
api.
environment.
StreamExecutionEnvironment;
import
org.
apache.
flink.
table.
data.
GenericRowData;
import
org.
apache.
flink.
table.
data.
RowData;
import
org.
apache.
hadoop.
conf.
Configuration;
import
org.
apache.
iceberg.
*;
import
org.
apache.
iceberg.
catalog.
Catalog;
import
org.
apache.
iceberg.
catalog.
TableIdentifier;
import
org.
apache.
iceberg.
flink.
TableLoader;
import
org.
apache.
flink.
table.
data.
StringData;
import
org.
apache.
iceberg.
flink.
sink.
FlinkSink;
import
org.
apache.
iceberg.
hadoop.
HadoopCatalog;
import
org.
apache.
iceberg.
types.
Types;
import
java.
util.
Map;
/**
* 使用DataStream Api 向Iceberg 表写入数据
*/
public
class
StreamAPIWriteIceberg {
public
static
void
main(
String[]
args)
throws
Exception {
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment.
getExecutionEnvironment();
//1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。
env.
enableCheckpointing(
5000);
//2.读取Kafka 中的topic 数据
KafkaSource
<
String
>
source
=
KafkaSource.
<
String
>
builder()
.
setBootstrapServers(
"node1:9092,node2:9092,node3:9092")
.
setTopics(
"flink-iceberg-topic")
.
setGroupId(
"my-group-id")
.
setStartingOffsets(
OffsetsInitializer.
latest())
.
setValueOnlyDeserializer(
new
SimpleStringSchema())
.
build();
DataStreamSource
<
String
>
kafkaSource
=
env.
fromSource(
source,
WatermarkStrategy.
noWatermarks(),
"Kafka Source");
//3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。
SingleOutputStreamOperator
<
RowData
>
dataStream
=
kafkaSource.
map(
new
MapFunction
<
String,
RowData
>() {
@Override
public
RowData
map(
String
s)
throws
Exception {
System.
out.
println(
"s = "
+
s);
String[]
split
=
s.
split(
",");
GenericRowData
row
=
new
GenericRowData(
4);
row.
setField(
0,
Integer.
valueOf(
split[
0]));
row.
setField(
1,
StringData.
fromString(
split[
1]));
row.
setField(
2,
Integer.
valueOf(
split[
2]));
row.
setField(
3,
StringData.
fromString(
split[
3]));
return
row;
}
});
//4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
Configuration
hadoopConf
=
new
Configuration();
Catalog
catalog
=
new
HadoopCatalog(
hadoopConf,
"hdfs://mycluster/flink_iceberg/");
//配置iceberg 库名和表名
TableIdentifier
name
=
TableIdentifier.
of(
"icebergdb",
"flink_iceberg_tbl");
//创建Icebeng表Schema
Schema
schema
=
new
Schema(
Types.
NestedField.
required(
1,
"id",
Types.
IntegerType.
get()),
Types.
NestedField.
required(
2,
"nane",
Types.
StringType.
get()),
Types.
NestedField.
required(
3,
"age",
Types.
IntegerType.
get()),
Types.
NestedField.
required(
4,
"loc",
Types.
StringType.
get()));
//如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区
// PartitionSpec spec = PartitionSpec.unpartitioned();
PartitionSpec
spec
=
PartitionSpec.
builderFor(
schema).
identity(
"loc").
build();
//指定Iceberg表数据格式化为Parquet存储
Map
<
String,
String
>
props
=
ImmutableMap.
of(
TableProperties.
DEFAULT_FILE_FORMAT,
FileFormat.
PARQUET.
name());
Table
table
=
null;
// 通过catalog判断表是否存在,不存在就创建,存在就加载
if (
!
catalog.
tableExists(
name)) {
table
=
catalog.
createTable(
name,
schema,
spec,
props);
}
else {
table
=
catalog.
loadTable(
name);
}
TableLoader
tableLoader
=
TableLoader.
fromHadoopTable(
"hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl",
hadoopConf);
//5.通过DataStream Api 向Iceberg中写入数据
FlinkSink.
forRowData(
dataStream)
//这个 .table 也可以不写,指定tableLoader 对应的路径就可以。
.
table(
table)
.
tableLoader(
tableLoader)
//默认为false,追加数据。如果设置为true 就是覆盖数据
.
overwrite(
false)
.
build();
env.
execute(
"DataStream Api Write Data To Iceberg");
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.

以上代码有如下几个注意点:
- 需要设置Checkpoint,Flink向Iceberg中写入Commit数据时,只有Checkpoint成功之后才会Commit数据,否则后期在Hive中查询不到数据。
- 读取Kafka数据后需要包装成RowData或者Row对象,才能向Iceberg表中写出数据。写出数据时默认是追加数据,如果指定overwrite就是全部覆盖数据。
- 在向Iceberg表中写数据之前需要创建对应的Catalog、表Schema,否则写出时只指定对应的路径会报错找不到对应的Iceberg表。
- 不建议使用DataStream API 向Iceberg中写数据,建议使用SQL API。
3、在Kafka 中创建代码中指定的“flink-iceberg-topic”并启动代码生产数据
# 在Kafka 中创建 flink-iceberg-topic topic
[[email protected] bin]
# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic flink-iceberg-topic --partitions 3 --replication-factor 3
- 1.
- 2.

创建好以上topic之后,启动代码,然后向topic中生产以下数据:
[[email protected] bin]
#./kafka-console-producer.sh --topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:9092
1,zs,18,beijing
2,ls,19,shanghai
3,ww,20,beijing
4,ml,21,shanghai
- 1.
- 2.
- 3.
- 4.
- 5.

可以看到在HDFS 对应的路径中保存了对应的数据:


4、通过Hive查看保存到Iceberg中的数据
启动Hive、Hive Metastore 在Hive中创建映射Iceberg的外表:
CREATE
TABLE flink_iceberg_tbl
(
id
int
,
name string
,
age
int
,
loc string
)
STORED
BY
'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION
'hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl'
TBLPROPERTIES
(
'iceberg.catalog'
=
'location_based_table'
)
;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.

注意:虽然loc是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存Iceberg数据路径一致。
通过Hive查询对应的Iceberg表中的数据,结果如下:

二、DataStream API 批量/实时读取Iceberg表
DataStream API 读取Iceberg表又分为批量读取和实时读取。通过方法“streaming(true/false)”来控制。
1、批量/全量读取
import
org.
apache.
flink.
api.
common.
functions.
MapFunction;
import
org.
apache.
flink.
streaming.
api.
datastream.
DataStream;
import
org.
apache.
flink.
streaming.
api.
environment.
StreamExecutionEnvironment;
import
org.
apache.
flink.
table.
data.
RowData;
import
org.
apache.
hadoop.
conf.
Configuration;
import
org.
apache.
iceberg.
flink.
TableLoader;
import
org.
apache.
iceberg.
flink.
source.
FlinkSource;
/**
* 使用DataStream Api 批量/实时 读取Iceberg 数据
*/
public
class
StreamAPIReadIceberg {
public
static
void
main(
String[]
args)
throws
Exception {
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment.
getExecutionEnvironment();
//1.配置TableLoader
Configuration
hadoopConf
=
new
Configuration();
TableLoader
tableLoader
=
TableLoader.
fromHadoopTable(
"hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl",
hadoopConf);
//2.从Iceberg中读取全量/增量读取数据
DataStream
<
RowData
>
batchData
=
FlinkSource.
forRowData().
env(
env)
.
tableLoader(
tableLoader)
//默认为false,整批次读取,设置为true 为流式读取
.
streaming(
false)
.
build();
batchData.
map(
new
MapFunction
<
RowData,
String
>() {
@Override
public
String
map(
RowData
rowData)
throws
Exception {
int
id
=
rowData.
getInt(
0);
String
name
=
rowData.
getString(
1).
toString();
int
age
=
rowData.
getInt(
2);
String
loc
=
rowData.
getString(
3).
toString();
return
id
+
","
+
name
+
","
+
age
+
","
+
loc;
}
}).
print();
env.
execute(
"DataStream Api Read Data From Iceberg");
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.

结果如下:


2、实时读取

修改以上代码并启动,向Hive 对应的Iceberg表“flink_iceberg_tbl”中插入2条数据:
在向Hive的Iceberg表中插入数据之前需要加入以下两个包:

向Hive 中Iceberg 表插入两条数据

插入完成之后,可以看到Flink 控制台实时读取到对应数据


三、指定基于快照实时增量读取数据
以上案例我们发现Flink将表中所有数据都读取出来,我们也可以指定对应的snapshot-id 决定基于哪些数据增量读取数据。

结果只读取到指定快照往后的数据,如下:


四、合并data files
Iceberg提供Api将小文件合并成大文件,可以通过Flink 批任务来执行。Flink中合并小文件与Spark中小文件合并完全一样。
代码如下:
import
org.
apache.
flink.
api.
java.
ExecutionEnvironment;
import
org.
apache.
hadoop.
conf.
Configuration;
import
org.
apache.
iceberg.
Table;
import
org.
apache.
iceberg.
actions.
RewriteDataFilesActionResult;
import
org.
apache.
iceberg.
catalog.
Catalog;
import
org.
apache.
iceberg.
catalog.
TableIdentifier;
import
org.
apache.
iceberg.
flink.
TableLoader;
import
org.
apache.
iceberg.
flink.
actions.
Actions;
import
org.
apache.
iceberg.
hadoop.
HadoopCatalog;
/**
* 可以通过提交Flink批量任务来合并Data Files 文件。
*/
public
class
RewrietDataFiles {
public
static
void
main(
String[]
args) {
ExecutionEnvironment
env
=
ExecutionEnvironment.
getExecutionEnvironment();
//1.配置TableLoader
Configuration
hadoopConf
=
new
Configuration();
//2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
Catalog
catalog
=
new
HadoopCatalog(
hadoopConf,
"hdfs://mycluster/flink_iceberg/");
//3.配置iceberg 库名和表名并加载表
TableIdentifier
name
=
TableIdentifier.
of(
"icebergdb",
"flink_iceberg_tbl");
Table
table
=
catalog.
loadTable(
name);
//4..合并 data files 小文件
RewriteDataFilesActionResult
result
=
Actions.
forTable(
table)
.
rewriteDataFiles()
//默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。
.
targetSizeInBytes(
536870912L)
.
execute();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.

边栏推荐
- 关于Anaconda的一些操作(安装软件和快速打开)
- 备忘录模式 - Unity
- Why are the prices of industrial switches high and low?
- 程序运行问题排查和解决:an instance of ‘std::logic_error‘what(): basic_string::_M_construct null not valid
- 盘点波卡生态潜力项目 | 跨链特性促进多赛道繁荣
- 【走进go的内心深处】
- 100行以上就应该拆成函数
- 【第二十四题】逻辑闭环(北理工/北京理工大学/程序设计方法与实践/小学期 )
- 低 EMI、高性能4通道 LED 驱动器TPS61194PWPRQ1
- 太卷了, 某公司把自家运营多年的核心系统(智慧系统)完全开源了....
猜你喜欢

如何定制.NET6.0的日志记录

Probe into parental delegation mechanism from source code

MGRE comprehensive experiment

MIMX8MD6CVAHZAB I.MX 8MDUAL Cortex-A53 - 微处理器

扁平化骑手注册表单

BLOOM模型背后的技术实践:1760亿参数模型如何炼成?

zabbix 监控服务 (三) 配置管理图形和窗口

工业交换机的单模和多模能否互相替代?

如何查看是否有清华源/删除清华源,保留默认源

Advanced architects, 16 common principles of microservice design and Governance
随机推荐
【第二十二题】地下城与勇士(北理工/北京理工大学/程序设计方法与实践/小学期 )
BLOOM模型背后的技术实践:1760亿参数模型如何炼成?
Creative ribbon style landing page
【第二十四题】逻辑闭环(北理工/北京理工大学/程序设计方法与实践/小学期 )
《天天数学》连载60:二月二十九日
太卷了, 某公司把自家运营多年的核心系统(智慧系统)完全开源了....
Neural network loss and ACC drawing method plot
云计算SLA思考
程序运行问题排查和解决:an instance of ‘std::logic_error‘what(): basic_string::_M_construct null not valid
JS effect - table interlaced color change
T40N智能视频应用处理器-电池摄像机SOC
盘点波卡生态潜力项目 | 跨链特性促进多赛道繁荣
Class的生命周期
MIMX8MD6CVAHZAB I.MX 8MDUAL Cortex-A53 - 微处理器
2022-07-15日报:Meta宣布推出Make-A-Scene:可基于文字和草图控制AI图像生成
What is the difference between statements and expressions
工业交换机如何进入web管理界面?
tinymce5.0.8编辑器最新版本中文版
【走進go的內心深處】
MGRE and OSPF