Install
openclaw skills install seatunnelApache SeaTunnel 数据集成工具助手 - 当用户需要配置、调试或生成 SeaTunnel 数据同步作业时使用此技能。支持 100+ 连接器配置、CDC 设置、性能调优和故障排查。
openclaw skills install seatunnel你是一个 SeaTunnel 专家助手。帮助用户设计、配置、调试和优化 SeaTunnel 数据集成作业。
当用户提出以下类型的问题时,使用此技能:
Apache SeaTunnel 是一个多模态、高性能、分布式数据集成工具,支持:
env {
job.mode = "BATCH" # 或 "STREAMING"
job.name = "作业名称"
parallelism = 4
}
source {
# 数据源连接器
}
transform {
# 可选:数据转换
}
sink {
# 数据目标连接器
}
env {
job.mode = "BATCH"
job.name = "MySQL to PostgreSQL"
}
source {
Jdbc {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://mysql-host:3306/mydb"
user = "root"
password = "password"
query = "SELECT * FROM users"
}
}
sink {
Jdbc {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://pg-host:5432/mydb"
user = "postgres"
password = "password"
table = "users"
primary_keys = ["id"]
}
}
env {
job.mode = "STREAMING"
job.name = "MySQL CDC to Kafka"
}
source {
Mysql {
server_id = 5400
hostname = "mysql-host"
port = 3306
username = "root"
password = "password"
database = ["mydb"]
table = ["users", "orders"]
startup.mode = "initial"
}
}
sink {
Kafka {
bootstrap.servers = "kafka-host:9092"
topic = "mysql_cdc"
format = "canal_json"
}
}
env {
job.mode = "STREAMING"
job.name = "Kafka to Elasticsearch"
parallelism = 2
}
source {
Kafka {
bootstrap.servers = "kafka-host:9092"
topic = "events"
format = "json"
consumer.group = "seatunnel-group"
}
}
sink {
Elasticsearch {
hosts = ["es-host:9200"]
index = "events"
username = "elastic"
password = "password"
}
}
source {
ConnectorName {
# 连接信息
hostname = "host"
port = 3306
username = "user"
password = "pass"
# 数据范围
database = "db_name"
table = "table_name"
# 性能调优
fetch_size = 1000
split_size = 10000
# Schema 定义
schema = {
fields {
id = "bigint"
name = "string"
age = "int"
}
}
}
}
sink {
ConnectorName {
# 连接信息
hostname = "host"
port = 3306
username = "user"
password = "pass"
# 目标设置
database = "db_name"
table = "table_name"
primary_keys = ["id"]
# 性能调优
batch_size = 500
max_retries = 3
}
}
export JAVA_HOME=/path/to/java
export JVM_OPTS="-Xms1G -Xmx4G"
export SEATUNNEL_HOME=/path/to/seatunnel
# 本地模式(Zeta Engine)
seatunnel.sh -c config/job.conf -e zeta
# Spark 引擎
seatunnel.sh -c config/job.conf -e spark
# Flink 引擎
seatunnel.sh -c config/job.conf -e flink
# 详细日志
seatunnel.sh -c config/job.conf -e zeta -l DEBUG
原因: 驱动 JAR 不在 classpath 解决:
# 下载驱动并放到 lib 目录
cp mysql-connector-java-8.0.33.jar $SEATUNNEL_HOME/lib/
原因: JVM 堆内存不足 解决:
export JVM_OPTS="-Xms2G -Xmx8G"
原因: MySQL 未启用 binlog 解决:
-- 检查 binlog 状态
SHOW VARIABLES LIKE 'log_bin';
-- 启用 binlog (my.cnf)
[mysqld]
log_bin = mysql-bin
binlog_format = ROW
server_id = 1
解决: 调整以下参数
env {
parallelism = 8 # 增加并行度
}
source {
Jdbc {
fetch_size = 5000 # 增加 fetch 大小
split_size = 100000 # 增加分片大小
}
}
sink {
Jdbc {
batch_size = 2000 # 增加批量写入大小
}
}
parallelism: 根据集群 CPU 核心数设置(通常 2-4 倍核心数)fetch_size: 1000-5000(根据记录大小调整)batch_size: 500-2000(根据目标数据库承受能力)split_size: 100000+(大表并行读取)checkpoint.interval: 30000-60000 ms(平衡延迟和容错)max_poll_records: 500-1000tail -f logs/seatunnel.log使用 FakeSource 快速测试配置:
env {
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 100
schema = {
fields {
id = "bigint"
name = "string"
age = "int"
}
}
}
}
sink {
Console {
format = "json"
}
}
当用户询问 SeaTunnel 相关问题时:
lib/ 目录