Seatunnel使用手册

官方文档: 传送门

本文档针对 SeaTunnel v2.1.2 编写


SeaTunnel是什么

概述

先来看一下官方文档上是怎么说的:

SeaTunnel is a very easy-to-use ultra-high-performance distributed data integration platform that supports real-time synchronization of massive data. It can synchronize tens of billions of data stably and efficiently every day, and has been used in the production of nearly 100 companies.

翻译一下:

SeaTunnel 是一个使用起来非常简单, 性能非常高效的分布式数据集成平台. 它支持海量数据的实时同步. 它可以每天稳定高效的同步数百亿的数据, 并且已经用于近百个公司的生产中.

可以从上面提炼出几个关键词:

  1. very easy-to-use: 使用非常简单. 其实 SeaTunnel 并不是对 Flink 或是 Spark 或是以后支持的其他技术的二次开发, 而是在其之上封装了一层, 使得这些技术使用起来会更简便.
  2. ultra-high-performance: 超高性能.
  3. real-time synchronization of massive data: 海量数据的实时同步.

以下是从官方文档摘抄:

为什么我们需要 SeaTunnel

SeaTunnel 尽所能为您解决海量数据同步中可能遇到的问题:

  • 数据丢失与重复
  • 任务堆积与延迟
  • 吞吐量低
  • 应用到生产环境周期长
  • 缺少应用运行状态监控

SeaTunnel 使用场景

  • 海量数据同步
  • 海量数据集成
  • 海量数据的 ETL
  • 海量数据聚合
  • 多源数据处理

环境依赖

组件 版本 备注
Java >=8
Flink 1.13 因为当前版本SeaTunnel还未适配Flink1.14, 所以使用Flink1.13版本进行文档编写
Spark 2.x 如果是集群方式的话, Yarn/Standalone 都是支持的

环境配置

config目录下seatunnel-env.sh中可以配置Spark和Flink的环境

# Home directory of spark distribution.
SPARK_HOME=${SPARK_HOME:-/opt/spark}
# Home directory of flink distribution.
FLINK_HOME=${FLINK_HOME:-/opt/flink}

:-代表: 若未找到之前的地址, 则用之后的地址

官方示例

首先来进行一个官方示例简单的运行一下.

config目录下创建文件example.conf:

# 配置 Spark 或 Flink 的参数
env {
    # You can set flink configuration here
    execution.parallelism = 1
    #execution.checkpoint.interval = 10000
    #execution.checkpoint.data-uri = "hdfs://hadoop102:9092/checkpoint"
}

# 在 source 所属的块中配置数据源
# 默认端口: 9999
source {
    SocketStream{
        host = hadoop102
        result_table_name = "fake"
        field_name = "info"
    }
}

# 在 transform 的块中声明转换插件
# 这里需要说明的是: Split是不会立即生效的, 只有当sql插件中的sql语句中调用了split函数才会真正的作用在数据上
transform {
    Split{
        separator = "#"
        fields = ["name","age"]
    }
    sql {
        sql = "select info, split(info) as info_row from fake"
    }
}
# 在 sink 块中声明要输出到哪
sink {
     ConsoleSink {}
}

然后cd到seatunnel目录在shell中执行:

./bin/start-seatunnel-flink.sh --config config/example.conf

nc -lk 9999模拟一下socket连接

sql执行顺序

  1. source块中, 利用SocketStream插件读取出数据, 命名为fake表, 字段名为info
  2. 拿到info字段, 利用Split插件进行切分

启动命令

./bin/start-seatunnel-flink.sh -h
Usage: start-seatunnel-flink.sh [options]
  Options:
    -t, --check    check config (default: false)
  * -c, --config   Config file
    -h, --help     Show the usage message
    -r, --run-mode job run mode, run or run-application (default: RUN) 
                   (values: [RUN, APPLICATION_RUN])
    -i, --variable variable substitution, such as -i city=beijing, or -i 
                   date=20190318 (default: [])

其中, --config是必填参数

  • -i

    当其中上面的sql改为:

    sql = "select * from (select info, split(info) as info_row from fake) where age > "${age}""

    启动命令改为:

    ./bin/start-seatunnel-flink.sh --config config/example02.conf -i age=18
  • -r

    执行 flink 自带的命令参数, 可以cd到 flink 下面 -> ./bin/flink run -h查看

应用配置的4个基本组件

一个完整的SeaTunnel配置文件应包含四个配置组件:

env{}`  `source{}` --> `transform{}` --> `sink{}

env块

env块中可以直接写 spark 或 flink 支持的配置项. 比如并行度, 检查点时间, 检查点 hdfs 路径等. 以 Flink 为例, 在 SeaTunnel 源码的ConfigKeyName类中声明了所有可用的key:

package org.apache.seatunnel.flink.util;

public class ConfigKeyName {

    private ConfigKeyName() {
        throw new IllegalStateException("Utility class");
    }

    public static final String TIME_CHARACTERISTIC = "execution.time-characteristic";
    public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
    public static final String PARALLELISM = "execution.parallelism";
    public static final String MAX_PARALLELISM = "execution.max-parallelism";
    public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
    public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
    public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
    public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
    public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
    public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
    public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
    public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
    public static final String RESTART_STRATEGY = "execution.restart.strategy";
    public static final String RESTART_ATTEMPTS = "execution.restart.attempts";
    public static final String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts";
    public static final String RESTART_FAILURE_INTERVAL = "execution.restart.failureInterval";
    public static final String RESTART_FAILURE_RATE = "execution.restart.failureRate";
    public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
    public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
    public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
    public static final String STATE_BACKEND = "execution.state.backend";
    public static final String PLANNER = "execution.planner";
}

Row

在说明source块, transform块sink块之前, 需要先了解一下 SeaTunnel 中的核心数据结构: Row

Row 是 SeaTunnel 中数据传递的核心数据结构. 对来 Flink 说, source 插件需要给下游的转换插件返回一个 DataStream<Row>, 转换插件接到上游的 DataStream<Row>进行处理后需要再给下游返回一个 DataStream<Row>. 最后 Sink 插件将转换插件处理好的DataStream<Row>输出到外部的数据系统.

因为 DataStream可以很方便地和 Table 进行互转, 所以将 Row 当作核心数据结构可以让转换插件同时具有使用代码 (命令式) 和 sql (声明式) 处理数据的能力.

可以看一下上面示例中, 读取数据的源码:

package org.apache.seatunnel.flink.socket.source;

import ...

@AutoService(BaseFlinkSource.class)
public class SocketStream implements FlinkStreamSource {

    ...

    @Override
    public DataStream<Row> getData(FlinkEnvironment env) {
        final StreamExecutionEnvironment environment = env.getStreamExecutionEnvironment();
        return environment.socketTextStream(host, port)
                .map((MapFunction<String, Row>) value -> {
                    Row row = new Row(1);
                    row.setField(0, value);
                    return row;
                }).returns(new RowTypeInfo(Types.STRING()));
    }
}

感兴趣的话, 也可以看到源码中的 Split, sql, sink 都是用DataStream<Row>进行数据传递的

source块

source{}是可以配置多个 source 插件的

# 伪代码
env {
    ...
}

source {
    hdfs { ... }
    jdbc { ... }
    elasticsearch { ... }
}

transform {
    sql {
        sql = """
            select ... from hdfs_table
            join es_table on
            hdfs_table.uid = es_table.uid
            where ..."""
    }
}

sink {
    elasticsearch { ... }
}

需要注意的是: 所有的 source 插件中都可以声明result_table_name. 如果声明了result_table_name. SeaTunnel 会将 source 插件输出的DataStream<Row>转换为 Table 并注册在 Table 环境中. 当指定了result_table_name那么还可以指定field_name, 在注册时, 给 Table重设字段名.

因为每个 source 所需要的配置是不一致的 (result_table_namefield_name为共有非必填参数), 所以配置的时候查找官方文档会好一点

当前支持的source

source 支持Spark 支持Flink 备注 文档地址
Druid ✔️ 传送门
Elasticsearch ✔️ 传送门
Fake ✔️ ✔️ 改类型主要是用于方便生成指定的数据, 用作 SeaTunnel 的功能验证, 测试和性能测试. 传送门
Feishu Sheet ✔️ 传送门
File ✔️ ✔️ 从本地或者 hdfs 中读取. 传送门
HBase ✔️ 传送门
Hive ✔️ 传送门
Http ✔️ ✔️ 传送门
Hudi ✔️ 传送门
Iceberg ✔️ 传送门
InfluxDb ✔️ 传送门
Jdbc ✔️ ✔️ 传送门
Kafka ✔️ ✔️ Kafka 版本 >= 0.10.0, 目前源码中发现 Schema 的解析有问题(原因为社区把 fastjson 换成 Jackon 引起的). 传送门
Kudu ✔️ 兼容 Kerberos 认证 传送门
MongoDb ✔️ 传送门
Neo4j ✔️ 传送门
Phoenix ✔️ 传送门
Redis ✔️ 传送门
Socket ✔️ ✔️ 传送门
Tidb ✔️ 传送门
WebhookStream ✔️ 提供 http 接口推送数据, 仅支持 POST 请求 传送门

transform块

目前社区对 transform 插件做了很多规划, 但截至 v2.1.2 版本, 可用的插件有3个: Split, SqlJson. 其中Json只适配 Spark 可用.

transform{}中可以声明多个转换插件. 所有的转换插件都可以使用source_table_name, 和result_table_name. 同样, 如果声明了result_table_name, 那么就能声明field_name.

Split插件

这里着重说一下 Split 插件:

    @Override
    public DataSet<Row> processBatch(FlinkEnvironment env, DataSet<Row> data) {
        return data;
    }

    @Override
    public DataStream<Row> processStream(FlinkEnvironment env, DataStream<Row> dataStream) {
        return dataStream;
    }

    @Override
    public void registerFunction(FlinkEnvironment flinkEnvironment) {
        if (flinkEnvironment.isStreaming()) {
            flinkEnvironment
                    .getStreamTableEnvironment()
                    .registerFunction("split", new ScalarSplit(rowTypeInfo, num, separator));
        } else {
            flinkEnvironment
                    .getBatchTableEnvironment()
                    .registerFunction("split", new ScalarSplit(rowTypeInfo, num, separator));
        }
    }

从源码中可以发现 Split 插件并没有对数据流进行任何的处理, 而是将它直接return了. 反之, 它向表环境中注册了一个名为 split 的 UDF(用户自定义函数). 而且, 函数名是写死的. 这意味着, 如果声明了多个 Split 后面的 UDF 还会把前面的覆盖.

从 Split 插件中就能看出了, 这个插件其实是通过注册方法的方式来调用的. 但是, transform 接口其实是预留了直接操作数据的能力的 (比如 Sql 插件中的处理方式), 也就是processStream方法. 那么, 一个 transform 插件其实同时履行了 process 和 UDF 的职责, 这是违背单一职责原则的. 所以要判断一个 transform 插件在做什么就只能从源码和文档的方面来加以区分了.

Sql 插件

sql插件中需要特别说明的是, 指定source_table_name对于 sql 插件的意义不大, 因为可以通过from子句来决定从哪个表里抽取数据.

Sink块

sink块里可以声明多个 sink 插件, 每个 sink 插件都可以指定source_table_name.

当前支持的sink

source 支持Spark 支持Flink 备注 文档地址
Clickhouse ✔️ ✔️ 使用 Clickhouse-jdbc 根据字段名称对应数据源, 并将其写入. 使用前需创建对应的数据表. 传送门
ClickhouseFile ✔️ ✔️ 通过 clickhouse-local 程序生成 Clickhouse 数据文件, 然后将其发送到 Clickhouse 服务器, 也称为是 bulk load (批量加载). 传送门
Console ✔️ ✔️ 将数据输出到标准终端或 Flink 的 TaskManager. 通常用于调试和易于观察的数据. 传送门
Doris ✔️ ✔️ 传送门
Druid ✔️ 传送门
Elasticsearch ✔️ ✔️ Spark 支持的 Elasticsearch >= 2.0 并且 < 7.0.0; Flink 支持的 Elasticsearch = 7.x, 如果要用 Elasticsearch 6.x, 需用源码执行命令 mvn clean package -Delasticsearch=6重新打包. 传送门
Email ✔️ 支持通过 email 附件输出数据. 传送门
File ✔️ ✔️ 传送门
Hbase ✔️ ✔️ 使用 hbase-connectors 将数据输出到 Hbase(>=2.1.0) 和 Spark(>=2.0.0) 版本兼容性取决于 hbase-connectors. 传送门
Hive ✔️ 传送门
Hudi ✔️ 传送门
Iceberg ✔️ 传送门
InfluxDB ❌️ ✔️ 传送门
Jdbc ✔️ ✔️ 传送门
Kafka ✔️ ✔️ 传送门
Kudu ✔️ 传送门
MongoDB ✔️ 传送门
Phoenix ✔️ 传送门
Redis ✔️ 传送门
TiDb ✔️ ❌️ 传送门
0%