Install
openclaw skills install @printsky/flink-kafka-dual-write1为 bethune 项目生成新的 Flink Kafka 到 Hive 和 StarRocks 双写监控任务,参考 Bus_Search_ReplacePrice_KafkaToStarRock_34 及相邻的 33/35/36 模式,自动产出 Job 类、MessageModel、PO、4 个 config.properties 更新,并在可行时执行编译校验。用于“参考任务34写一个新任务”“按任务33/34/35/36模式新增 Kafka 任务”“生成类似 ReplacePrice 的埋点监控任务”等请求。
openclaw skills install @printsky/flink-kafka-dual-write1按下面流程执行,默认服务对象是 bethune 仓库中的 Kafka 日志监控任务。
先确认用户给了哪些输入。若信息不全,只问最小必需项:
若用户已经给出“按任务34类似模式”,默认理解为:
parseAndFormatLogTime()、safe()、Hive 分区补齐、4 份 config 同步更新的处理方式若任务更接近 35 或 36 这类列表展开模式,按列表展开规则处理。详细模式见 references/bethune-patterns.md。
MessageModel、Po、config.properties 键位,确认命名和字段顺序。MessageModel、Po、Job。src/main/resources/config.propertiessrc/main/resources/dev/config.propertiessrc/main/resources/product/config.propertiessrc/main/resources/stage/config.propertiesmvn -DskipTests compile 验证新增任务。TableSchema、StarRocksSinkRowBuilder、toHiveRow() 三处字段顺序完全一致。st 永远放在输出首列;Hive 行末尾永远追加 year、month、day。module 过滤值写死在 Job 类常量里,不写入配置。logTime 统一走 A 方案:为空或解析失败都记录错误日志并丢弃。message 为空直接丢弃。id 优先取 skyNetVo.getId(),为空时生成 UUID。cnt 通常固定为 1。safe() 兜底,数值字段保留原始数值类型。null 或空集合时,整条消息直接丢弃。MessageModel:src/main/java/com/ly/tms/po/carSupply/SkynetLog{BizName}MessageModel.javaPo:src/main/java/com/ly/tms/po/carSupply/SkynetLog{BizName}Po.javaJob:src/main/java/com/ly/tms/job/Bus_{BizName}_KafkaToStarRock_{任务编号}.java配置键遵循 bethune 现有分组:
kafka.bus.{biz}.topictravel.car.{biz}.groupstarrocks.fe.travel.common.{tableKey}hive.hive_train_ops.{tableKey}datas、fullPriceList 这类列表:按 35/36 模式在 flatMap() 中逐项展开。MessageModel 上补 @JSONField(name = "...")。完成后至少说明:
读取 references/bethune-patterns.md 获取以下内容:
parseAndFormatLogTime() 与 toHiveRow() 的固定模板
### StarRocks
```sql
CREATE TABLE TCTravelStreamData_db.{表名} (
st DATETIME,
apmtraceid VARCHAR(256),
id VARCHAR(256),
cnt INT,
traceid VARCHAR(256),
{其余字段:STRING→VARCHAR(512), INT→INT, DOUBLE→DOUBLE}
)
DUPLICATE KEY(st, apmtraceid)
DISTRIBUTED BY HASH(id) BUCKETS 8
PROPERTIES ("replication_num" = "3");
| 任务 | 类名 | Topic | Module | List展开字段 | SR表名 |
|---|---|---|---|---|---|
| 33 | Bus_Search_Abtest_KafkaToStarRock_33 | skynet_log_Public_SFC_ABTest_Monitor | BUS_Public_SFC_ABTest_Monitor | 无 | bus_sfc_abtest_monitor |
| 34 | Bus_Search_ReplacePrice_KafkaToStarRock_34 | skynet_log_Public_SFC_Replace_Price_Monitor | BUS_Public_SFC_Replace_Price_Monitor | 无(ReferPriceBean嵌套) | bus_sfc_replace_price_monitor |
| 35 | Bus_Carpool_CalEnter_KafkaToStarRock_35 | skynet_log_3304590_CallEnter | BUS_PUBLIC_CARPOOL_PRICING_CallEnter | fullPriceList | bus_carpool_calenter_monitor |
| 36 | Bus_Metric_Collection_KafkaToStarRock_36 | skynet_log_3309435_bus_travelmetrics | BUS_METRIC_COLLECTION | datas | bus_metric_collection_monitor |