Install
openclaw skills install rocketmq-kubectlQuery and manage RocketMQ messages via kubectl exec. Use when user needs to: (1) Query cluster information, (2) Create, delete, or manage topics, (3) Check consumer progress and message accumulation, (4) Query messages by ID, key, or offset, (5) Reset consumer offset to clear message backlog, (6) Check consumer subscription status, (7) View topic statistics and TPS, (8) Send test messages. Accepts parameters like topic, namespace, group, message_id, command_type, broker_name, offset, queue_id.
openclaw skills install rocketmq-kubectlQuery and manage RocketMQ via kubectl exec in Kubernetes cluster.
Use kubectl exec to access RocketMQ master pod:
# execute commands directly without login
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin <command> -n master1.teambition.local:9876
Default Nameserver: master1.teambition.local:9876
View cluster list, BrokerName, BrokerId, TPS and other information:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin clusterList -n master1.teambition.local:9876
View detailed cluster information (including yesterday and today's read/write totals):
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin clusterList -n master1.teambition.local:9876 -m
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin topiclist -n master1.teambition.local:9876
Create or update a topic with specified queue numbers:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin updateTopic -n master1.teambition.local:9876 -t <topic-name> -c <cluster-name> -r 8 -w 8
Parameters:
-r: Number of read queues (default: 8)-w: Number of write queues (default: 8)-c: Cluster name (can be queried via clusterList)Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin updateTopic -n master1.teambition.local:9876 -t new-topic -c DefaultCluster -r 16 -w 16
Delete a topic from a specific cluster:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin deleteTopic -n master1.teambition.local:9876 -t <topic-name> -c <cluster-name>
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin deleteTopic -n master1.teambition.local:9876 -t old-topic -c DefaultCluster
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin topicStatus -n master1.teambition.local:9876 -t <topic-name>
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin topicStatus -n master1.teambition.local:9876 -t tb-app-eventbus
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin topicRoute -n master1.teambition.local:9876 -t <topic-name>
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin topicRoute -n master1.teambition.local:9876 -t tb-app-eventbus
Check consumer progress and message accumulation. Diff total indicates the number of accumulated messages.
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin consumerProgress -n localhost:9876
Or specify namespace:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin consumerProgress -n master1.teambition.local:9876
Check if consumer subscription relationships are consistent:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin consumerStatus -n master1.teambition.local:9876 -g <consumer-group>
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin consumerStatus -n master1.teambition.local:9876 -g GID-TB-VERSION
Query message consumption status by message ID:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin queryMsgById -i <message-id> -n localhost:9876 -t <topic>
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin queryMsgById -i C0A8822F00002A9F00000008AE78670F -n localhost:9876 -t topic
Query messages by message key:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin queryMsgByKey -n master1.teambition.local:9876 -t <topic-name> -k <message-key>
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin queryMsgByKey -n master1.teambition.local:9876 -t tb-app-eventbus -k order-12345
Query messages by offset:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin queryMsgByOffset -n master1.teambition.local:9876 -t <topic-name> -b <broker-name> -i <queue-id> -o <offset>
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin queryMsgByOffset -n master1.teambition.local:9876 -t tb-app-eventbus -b broker-a -i 0 -o 1000
Note: Broker name can be queried via clusterList command.
Read message body directly from file system:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- cat /tmp/rocketmq/msgbodys/<message-id>
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- cat /tmp/rocketmq/msgbodys/C0A8822F00002A9F00000008AE78670F
Clear all accumulated messages for a consumer group:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin resetOffsetByTime -n localhost:9876 -g <consumer-group> -t <topic> -s -1
Note: The -s -1 parameter means reset to the latest offset (clear all backlog).
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin resetOffsetByTime -n localhost:9876 -g GID-TB-VERSION -t teambition-core-eventbus -s -1
View topic subscription relationships, TPS, accumulation, 24h read/write totals:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin statsAll -n master1.teambition.local:9876
Only show active topics:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin statsAll -n master1.teambition.local:9876 -a
Send a test message to a topic:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin sendMessage -n master1.teambition.local:9876 -t <topic-name> -p "test message body"
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin sendMessage -n master1.teambition.local:9876 -t tb-app-eventbus -p "test message"
Consume messages from a topic for testing:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin consumeMessage -n master1.teambition.local:9876 -t <topic-name> -g <consumer-group>
Example:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin consumeMessage -n master1.teambition.local:9876 -t tb-app-eventbus -g GID-TB-VERSION
-n: Nameserver address (default: master1.teambition.local:9876 or localhost:9876)-t: Topic name-g: Consumer group name-i: Message ID or queue ID-k: Message key-b: Broker name (not broker address)-o: Offset value-s: Timestamp or offset (-1 means latest)-r: Number of read queues-w: Number of write queues-c: Cluster name-p: Message body or print flag-a: Amount or active flag-m: More information flagkubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin consumerProgress -n localhost:9876
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin consumerStatus -n master1.teambition.local:9876 -g <consumer-group>
When consumer has accumulated too many messages:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin resetOffsetByTime -n localhost:9876 -g <consumer-group> -t <topic> -s -1
When you need to create a new topic:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin updateTopic -n master1.teambition.local:9876 -t <topic-name> -c <cluster-name> -r 8 -w 8
When you know the message key but not the message ID:
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin queryMsgByKey -n master1.teambition.local:9876 -t <topic-name> -k <message-key>
For more commands, see: https://rocketmq.apache.org/zh/docs/deploymentOperations/02admintool/
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin clusterList -n master1.teambition.local:9876
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin topiclist -n master1.teambition.local:9876
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin updateTopic -n master1.teambition.local:9876 -t new-topic -c DefaultCluster -r 16 -w 16
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin topicStatus -n master1.teambition.local:9876 -t tb-app-eventbus
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin consumerProgress -n localhost:9876
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin queryMsgById -i C0A8822F00002A9F00000008AE78670F -n localhost:9876 -t topic
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin queryMsgByKey -n master1.teambition.local:9876 -t tb-app-eventbus -k order-12345
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin statsAll -n master1.teambition.local:9876
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin resetOffsetByTime -n localhost:9876 -g GID-TB-VERSION -t teambition-core-eventbus -s -1
kubectl -n kube-system exec -it rocketmq-master1.teambition.local -- ./mqadmin sendMessage -n master1.teambition.local:9876 -t tb-app-eventbus -p "test message"