Seatunnel使用手册
官方文档: 传送门
本文档针对 SeaTunnel v2.1.2 编写
SeaTunnel是什么
概述
先来看一下官方文档上是怎么说的:
SeaTunnel is a very easy-to-use ultra-high-performance distributed data integration platform that supports real-time synchronization of massive data. It can synchronize tens of billions of data stably and efficiently every day, and has been used in the production of nearly 100 companies.
翻译一下:
SeaTunnel 是一个使用起来非常简单, 性能非常高效的分布式数据集成平台. 它支持海量数据的实时同步. 它可以每天稳定高效的同步数百亿的数据, 并且已经用于近百个公司的生产中.
可以从上面提炼出几个关键词:
- very easy-to-use: 使用非常简单. 其实 SeaTunnel 并不是对 Flink 或是 Spark 或是以后支持的其他技术的二次开发, 而是在其之上封装了一层, 使得这些技术使用起来会更简便.
- ultra-high-performance: 超高性能.
- real-time synchronization of massive data: 海量数据的实时同步.
以下是从官方文档摘抄:
为什么我们需要 SeaTunnel
SeaTunnel 尽所能为您解决海量数据同步中可能遇到的问题:
- 数据丢失与重复
- 任务堆积与延迟
- 吞吐量低
- 应用到生产环境周期长
- 缺少应用运行状态监控
SeaTunnel 使用场景
- 海量数据同步
- 海量数据集成
- 海量数据的 ETL
- 海量数据聚合
- 多源数据处理
环境依赖
| 组件 | 版本 | 备注 |
|---|---|---|
| Java | >=8 | |
| Flink | 1.13 | 因为当前版本SeaTunnel还未适配Flink1.14, 所以使用Flink1.13版本进行文档编写 |
| Spark | 2.x | 如果是集群方式的话, Yarn/Standalone 都是支持的 |
环境配置
config目录下seatunnel-env.sh中可以配置Spark和Flink的环境
|
|
:-代表: 若未找到之前的地址, 则用之后的地址
官方示例
首先来进行一个官方示例简单的运行一下.
在config目录下创建文件example.conf:
|
|
然后cd到seatunnel目录在shell中执行:
|
|
用
nc -lk 9999模拟一下socket连接
sql执行顺序
- 在
source块中, 利用SocketStream插件读取出数据, 命名为fake表, 字段名为info - 拿到
info字段, 利用Split插件进行切分
启动命令
|
|
其中, --config是必填参数
-
-i当其中上面的sql改为:
1sql = "select * from (select info, split(info) as info_row from fake) where age > "${age}""启动命令改为:
1./bin/start-seatunnel-flink.sh --config config/example02.conf -i age=18 -
-r执行 flink 自带的命令参数, 可以
cd到 flink 下面 ->./bin/flink run -h查看
应用配置的4个基本组件
一个完整的SeaTunnel配置文件应包含四个配置组件:
|
|
env块
env块中可以直接写 spark 或 flink 支持的配置项. 比如并行度, 检查点时间, 检查点 hdfs 路径等. 以 Flink 为例, 在 SeaTunnel 源码的ConfigKeyName类中声明了所有可用的key:
|
|
Row
在说明source块, transform块和sink块之前, 需要先了解一下 SeaTunnel 中的核心数据结构: Row
Row 是 SeaTunnel 中数据传递的核心数据结构. 对来 Flink 说, source 插件需要给下游的转换插件返回一个 DataStream<Row>, 转换插件接到上游的 DataStream<Row>进行处理后需要再给下游返回一个 DataStream<Row>. 最后 Sink 插件将转换插件处理好的DataStream<Row>输出到外部的数据系统.
因为
DataStream可以很方便地和 Table 进行互转, 所以将 Row 当作核心数据结构可以让转换插件同时具有使用代码 (命令式) 和 sql (声明式) 处理数据的能力.
可以看一下上面示例中, 读取数据的源码:
|
|
感兴趣的话, 也可以看到源码中的 Split, sql, sink 都是用DataStream<Row>进行数据传递的
source块
source{}是可以配置多个 source 插件的
|
|
需要注意的是: 所有的 source 插件中都可以声明
result_table_name. 如果声明了result_table_name. SeaTunnel 会将 source 插件输出的DataStream<Row>转换为 Table 并注册在 Table 环境中. 当指定了result_table_name那么还可以指定field_name, 在注册时, 给 Table重设字段名.
因为每个 source 所需要的配置是不一致的 (
result_table_name和field_name为共有非必填参数), 所以配置的时候查找官方文档会好一点
当前支持的source
| source | 支持Spark | 支持Flink | 备注 | 文档地址 |
|---|---|---|---|---|
| Druid | ❌ | ✔️ | 传送门 | |
| Elasticsearch | ✔️ | ❌ | 传送门 | |
| Fake | ✔️ | ✔️ | 改类型主要是用于方便生成指定的数据, 用作 SeaTunnel 的功能验证, 测试和性能测试. | 传送门 |
| Feishu Sheet | ✔️ | ❌ | 传送门 | |
| File | ✔️ | ✔️ | 从本地或者 hdfs 中读取. | 传送门 |
| HBase | ✔️ | ❌ | 传送门 | |
| Hive | ✔️ | ❌ | 传送门 | |
| Http | ✔️ | ✔️ | 传送门 | |
| Hudi | ✔️ | ❌ | 传送门 | |
| Iceberg | ✔️ | ❌ | 传送门 | |
| InfluxDb | ❌ | ✔️ | 传送门 | |
| Jdbc | ✔️ | ✔️ | 传送门 | |
| Kafka | ✔️ | ✔️ | Kafka 版本 >= 0.10.0, |
传送门 |
| Kudu | ✔️ | ❌ | 兼容 Kerberos 认证 | 传送门 |
| MongoDb | ✔️ | ❌ | 传送门 | |
| Neo4j | ✔️ | ❌ | 传送门 | |
| Phoenix | ✔️ | ❌ | 传送门 | |
| Redis | ✔️ | ❌ | 传送门 | |
| Socket | ✔️ | ✔️ | 传送门 | |
| Tidb | ✔️ | ❌ | 传送门 | |
| WebhookStream | ✔️ | ❌ | 提供 http 接口推送数据, 仅支持 POST 请求 | 传送门 |
transform块
目前社区对 transform 插件做了很多规划, 但截至 v2.1.2 版本, 可用的插件有3个: Split, Sql和Json. 其中Json只适配 Spark 可用.
transform{}中可以声明多个转换插件. 所有的转换插件都可以使用source_table_name, 和result_table_name. 同样, 如果声明了result_table_name, 那么就能声明field_name.
Split插件
这里着重说一下 Split 插件:
|
|
从源码中可以发现 Split 插件并没有对数据流进行任何的处理, 而是将它直接return了. 反之, 它向表环境中注册了一个名为 split 的 UDF(用户自定义函数). 而且, 函数名是写死的. 这意味着, 如果声明了多个 Split 后面的 UDF 还会把前面的覆盖.
从 Split 插件中就能看出了, 这个插件其实是通过注册方法的方式来调用的. 但是, transform 接口其实是预留了直接操作数据的能力的 (比如 Sql 插件中的处理方式), 也就是
processStream方法. 那么, 一个 transform 插件其实同时履行了 process 和 UDF 的职责, 这是违背单一职责原则的. 所以要判断一个 transform 插件在做什么就只能从源码和文档的方面来加以区分了.
Sql 插件
sql插件中需要特别说明的是, 指定source_table_name对于 sql 插件的意义不大, 因为可以通过from子句来决定从哪个表里抽取数据.
Sink块
sink块里可以声明多个 sink 插件, 每个 sink 插件都可以指定source_table_name.
当前支持的sink
| source | 支持Spark | 支持Flink | 备注 | 文档地址 |
|---|---|---|---|---|
| Clickhouse | ✔️ | ✔️ | 使用 Clickhouse-jdbc 根据字段名称对应数据源, 并将其写入. 使用前需创建对应的数据表. | 传送门 |
| ClickhouseFile | ✔️ | ✔️ | 通过 clickhouse-local 程序生成 Clickhouse 数据文件, 然后将其发送到 Clickhouse 服务器, 也称为是 bulk load (批量加载). | 传送门 |
| Console | ✔️ | ✔️ | 将数据输出到标准终端或 Flink 的 TaskManager. 通常用于调试和易于观察的数据. | 传送门 |
| Doris | ✔️ | ✔️ | 传送门 | |
| Druid | ❌ | ✔️ | 传送门 | |
| Elasticsearch | ✔️ | ✔️ | Spark 支持的 Elasticsearch >= 2.0 并且 < 7.0.0; Flink 支持的 Elasticsearch = 7.x, 如果要用 Elasticsearch 6.x, 需用源码执行命令 mvn clean package -Delasticsearch=6重新打包. |
传送门 |
| ✔️ | ❌ | 支持通过 email 附件输出数据. | 传送门 | |
| File | ✔️ | ✔️ | 传送门 | |
| Hbase | ✔️ | ✔️ | 使用 hbase-connectors 将数据输出到 Hbase(>=2.1.0) 和 Spark(>=2.0.0) 版本兼容性取决于 hbase-connectors. | 传送门 |
| Hive | ✔️ | ❌ | 传送门 | |
| Hudi | ✔️ | ❌ | 传送门 | |
| Iceberg | ✔️ | ❌ | 传送门 | |
| InfluxDB | ❌️ | ✔️ | 传送门 | |
| Jdbc | ✔️ | ✔️ | 传送门 | |
| Kafka | ✔️ | ✔️ | 传送门 | |
| Kudu | ✔️ | ❌ | 传送门 | |
| MongoDB | ✔️ | ❌ | 传送门 | |
| Phoenix | ✔️ | ❌ | 传送门 | |
| Redis | ✔️ | ❌ | 传送门 | |
| TiDb | ✔️ | ❌️ | 传送门 |