SeaTunnel 集成

SeaTunnel 集成 #

Apache SeaTunnel 是一款高性能分布式数据集成平台,支持海量数据的实时和离线同步。

SeaTunnel 从 2.3.4 版本开始内置了原生的 INFINI Easysearch Connector,同时支持 Source(读取)和 Sink(写入),使用 easysearch-client 专用客户端,无需依赖 Elasticsearch 兼容层。

Sink 关键特性:Exactly-Once 语义、CDC(INSERT / UPDATE / DELETE)、HTTPS/TLS。 Source 关键特性:Batch / Stream 模式、Exactly-Once、列投影、并行读取、自定义分片。

典型场景 #

场景Source → Sink
MySQL 全量/增量同步MySQL → Easysearch
日志接入Kafka → Easysearch
数据仓库导出Hive / ClickHouse → Easysearch
跨集群迁移Elasticsearch → Easysearch
Easysearch 数据导出Easysearch → ClickHouse / Kafka / 文件
CDC 实时同步MySQL CDC → Easysearch

安装 SeaTunnel #

# 下载(建议使用 2.3.12)
wget https://archive.apache.org/dist/seatunnel/2.3.12/apache-seatunnel-2.3.12-bin.tar.gz
tar -xzf apache-seatunnel-2.3.12-bin.tar.gz
cd apache-seatunnel-2.3.12

数据类型映射 #

Easysearch 类型SeaTunnel 类型
STRING / KEYWORD / TEXTSTRING
BOOLEANBOOLEAN
BYTEBYTE
SHORTSHORT
INTEGERINT
LONGLONG
FLOAT / HALF_FLOATFLOAT
DOUBLEDOUBLE
DateLOCAL_DATE_TIME_TYPE

配置示例:MySQL → Easysearch #

创建任务配置文件 mysql-to-easysearch.conf

env {
  parallelism = 2
  job.mode = "BATCH"
}

source {
  Jdbc {
    url = "jdbc:mysql://localhost:3306/mydb"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "password"
    query = "SELECT id, title, content, created_at FROM articles"
  }
}

transform {
}

sink {
  Easysearch {
    hosts = ["https://localhost:9200"]
    username = "admin"
    password = "your-password"
    index = "articles"
    tls_verify_certificate = false
    tls_verify_hostname = false
  }
}

运行:

bin/seatunnel.sh --config mysql-to-easysearch.conf

配置示例:Kafka → Easysearch(实时) #

env {
  parallelism = 4
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Kafka {
    bootstrap.servers = "kafka:9092"
    topic = "app-logs"
    format = "json"
    consumer.group = "seatunnel-es"
    start_mode = "latest"
  }
}

sink {
  Easysearch {
    hosts = ["https://localhost:9200"]
    username = "admin"
    password = "your-password"
    index = "app-logs-${now('yyyy-MM-dd')}"
    tls_verify_certificate = false
  }
}

配置示例:ES → Easysearch 迁移 #

env {
  parallelism = 4
  job.mode = "BATCH"
}

source {
  Elasticsearch {
    hosts = ["http://old-es:9200"]
    index = "old-index"
    query = {"match_all": {}}
    scroll_time = "5m"
    scroll_size = 1000
  }
}

sink {
  Easysearch {
    hosts = ["https://localhost:9200"]
    username = "admin"
    password = "your-password"
    index = "new-index"
    tls_verify_certificate = false
  }
}

配置示例:Easysearch 作为 Source 读取数据 #

Easysearch Connector 同时提供 Source 能力,可以从 Easysearch 读取数据同步到其他系统:

env {
  parallelism = 4
  job.mode = "BATCH"
}

source {
  Easysearch {
    hosts = ["https://localhost:9200"]
    username = "admin"
    password = "your-password"
    index = "articles"
    source = ["_id", "title", "content", "views"]
    query = {"match_all": {}}
    scroll_time = "5m"
    scroll_size = 1000
    tls_verify_certificate = false
  }
}

sink {
  Console {}
}

Source 名称同样是 Easysearch,支持通配符索引匹配(如 seatunnel-*)和 DSL 查询过滤。

配置示例:CDC 实时同步 #

env {
  parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MySQL-CDC {
    hostname = "localhost"
    port = 3306
    username = "root"
    password = "password"
    database-name = "mydb"
    table-name = "orders"
  }
}

sink {
  Easysearch {
    hosts = ["https://localhost:9200"]
    username = "admin"
    password = "your-password"
    index = "orders"
    primary_keys = ["order_id"]
    tls_verify_certificate = false
  }
}

CDC 模式下必须指定 primary_keys,Easysearch Connector 会自动将 INSERT / UPDATE / DELETE 事件映射为对应的文档操作。

配置示例:TLS 证书验证 #

sink {
  Easysearch {
    hosts = ["https://localhost:9200"]
    username = "admin"
    password = "your-password"
    index = "my-index"
    tls_keystore_path = "/path/to/easysearch/config/certs/http.p12"
    tls_keystore_password = "your-keystore-password"
  }
}

Sink 参数说明 #

参数类型必填默认值说明
hostslistEasysearch 节点地址,格式 host:port
indexstring目标索引名,支持字段变量如 seatunnel_${age}
primary_keyslist用于生成 _id 的主键字段(CDC 模式必填)
key_delimiterstring_复合主键的分隔符
usernamestring认证用户名
passwordstring认证密码
max_batch_sizeint10批量写入的文档数
max_retry_countint3失败重试次数
tls_verify_certificatebooltrue是否验证 TLS 证书
tls_verify_hostnamebooltrue是否验证主机名
tls_keystore_pathstringPEM 或 JKS 密钥库路径
tls_keystore_passwordstring密钥库密码
tls_truststore_pathstringPEM 或 JKS 信任库路径
tls_truststore_passwordstring信任库密码
schema_save_modeenumCREATE_SCHEMA_WHEN_NOT_EXIST目标索引处理策略:RECREATE_SCHEMA / CREATE_SCHEMA_WHEN_NOT_EXIST / ERROR_WHEN_SCHEMA_NOT_EXIST / IGNORE
data_save_modeenumAPPEND_DATA目标数据处理策略:DROP_DATA / APPEND_DATA / ERROR_WHEN_DATA_EXISTS

Source 参数说明 #

参数类型必填默认值说明
hostslistEasysearch 节点地址,格式 host:port
indexstring源索引名,支持 * 通配符匹配
usernamestring认证用户名
passwordstring认证密码
sourcelist要读取的字段列表,可含 _id;不配置则需配置 schema
queryjsonEasysearch DSL 查询条件,控制读取范围
scroll_timestringScroll 上下文保持时间
scroll_sizeint每次 Scroll 请求返回的最大文档数
schemaobject数据结构定义;不配置则需配置 source
tls_verify_certificatebooltrue是否验证 TLS 证书
tls_verify_hostnamebooltrue是否验证主机名
tls_keystore_pathstringPEM 或 JKS 密钥库路径
tls_keystore_passwordstring密钥库密码
tls_truststore_pathstringPEM 或 JKS 信任库路径
tls_truststore_passwordstring信任库密码

注意事项 #

注意项说明
Connector 名称Source 和 Sink 名称均为 Easysearch(不是 Elasticsearch),这是 SeaTunnel 的原生连接器
版本要求SeaTunnel ≥ 2.3.4(建议 2.3.12)
HTTPSEasysearch 默认启用 HTTPS,需配置 tls_verify_certificate = false 或提供证书
Mapping建议提前创建目标索引的 Mapping,避免自动映射不符合预期
性能调整 parallelismmax_batch_size 以优化吞吐

延伸阅读 #