triggers:
provider:
- provide.*service
- publish.*service
- expose.*service
- create.*service
- make.*service
- implement.*service
consumer:
- what services
- list services
- call.*service
- use.*service
- query.*data
- fetch.*data
Tool Service Hub Skill
Overview
Enables subagents to:
- Provider Mode: Publish local data/capabilities as services for other subagents to call
- Consumer Mode: Discover and call services on the Hub
1. Publishing Services as a Provider
Complete Code Template
import asyncio
import os
import sys
from pathlib import Path
# === 1. Setup path ===
WORKSPACE_DIR = os.getenv('WORKSPACE_DIR', '/home/t/.openclaw/workspace-subagentX')
sys.path.insert(0, WORKSPACE_DIR)
from client.client import LocalServiceRunner
# === 2. Define your service capability ===
async def your_method(**params):
"""
Service method
params: Parameters passed by the caller
Must return a dict
"""
# Your business logic here
result = {"status": "ok", "data": "..."}
return result
# === 3. Start the service ===
async def main():
runner = LocalServiceRunner(
name="your-service-name", # Service name (English, no spaces)
description="Service description", # English description
hub_url=os.getenv("HUB_WS_URL", "ws://localhost:8765")
)
# Register methods (can register multiple)
runner.register_handler("your_method", your_method)
print(f"🚀 Starting service...")
await runner.run()
if __name__ == "__main__":
asyncio.run(main())
2. Calling Services as a Consumer
Complete Code Template
import asyncio
import os
import sys
WORKSPACE_DIR = os.getenv('WORKSPACE_DIR', '/home/t/.openclaw/workspace-subagentX')
sys.path.insert(0, WORKSPACE_DIR)
from client.skill_client import SkillQueryClient
async def main():
# 1. Connect to Hub
client = SkillQueryClient(
hub_url=os.getenv("HUB_WS_URL", "ws://localhost:8765")
)
await client.connect()
# 2. Discover services
services = await client.discover()
print(f"Discovered {len(services)} services")
# 3. Find target service (filter by name)
target = None
target_name = "weather-service" # Replace with your target service name
for s in services:
if target_name in s.get("name", ""):
target = s
break
if not target:
print(f"Service not found: {target_name}")
return
skill_id = target.get("skill_id")
print(f"Using service: {target.get('name')}, skill_id: {skill_id}")
# 4. Call the service
result = await client.call_service(
service_id=skill_id,
method="your_method", # Method name
params={"key": "value"} # Parameters
)
print(f"Result: {result}")
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
3. Common Data Source Examples
3.1 File Data Service
from pathlib import Path
DATA_DIR = Path("/path/to/data") # Change to actual directory
async def list_files(**params):
ext = params.get("extension", "")
pattern = f"*{ext}" if ext else "*"
files = [f.name for f in DATA_DIR.glob(pattern) if f.is_file()]
return {"files": files[:50], "total": len(files)}
async def read_file(**params):
filename = params.get("filename")
if not filename:
return {"error": "filename is required"}
filepath = DATA_DIR / filename
if not filepath.exists():
return {"error": f"File not found: {filename}"}
# Read text files directly
if filepath.suffix == '.txt':
return {"content": filepath.read_text()[:1000]}
# Return info for other files
return {"filename": filename, "size": filepath.stat().st_size}
3.2 API Data Service
import aiohttp
async def fetch_data(**params):
url = params.get("url")
if not url:
return {"error": "url is required"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(10)) as resp:
data = await resp.json()
return {"status": resp.status, "data": data}
except Exception as e:
return {"error": str(e)}
3.3 Weather Service (wttr.in)
import aiohttp
async def get_weather(**params):
city = params.get("city", "Shanghai")
url = f"https://wttr.in/{city}?format=j1"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(10)) as resp:
data = await resp.json()
# Note: wttr.in returns structure at data.current_condition
c = data.get("data", {}).get("current_condition", [{}])[0]
return {
"city": city,
"temp": int(c.get("temp_C") or 0),
"condition": c.get("weatherDesc", [{}])[0].get("value", "Unknown"),
"humidity": c.get("humidity")
}
except Exception as e:
return {"error": str(e)}
4. Workflow Examples
Combining Multiple Services
async def workflow():
"""Complete workflow combining multiple services"""
client = SkillQueryClient("ws://localhost:8765")
await client.connect()
services = await client.discover()
# Find required services
weather = next((s for s in services if "weather" in s.get("name", "")), None)
images = next((s for s in services if "image" in s.get("name", "")), None)
results = {}
# Call weather service
if weather:
w = await client.call_service(weather.get("skill_id"), "get_weather", {"city": "Shanghai"})
results["weather"] = w.get("result", {})
# Call image service
if images:
i = await client.call_service(images.get("skill_id"), "list_images", {"limit": 10})
results["images"] = i.get("result", {})
await client.disconnect()
return results
5. Environment Configuration
Install Dependencies
pip install websockets aiohttp
Environment Variables
| Variable | Default | Description |
|---|
| HUB_WS_URL | ws://localhost:8765 | Hub WebSocket address |
| WORKSPACE_DIR | /home/t/.../workspace-subagentX | Working directory |
Start Hub Server (Optional)
cd Claw-Service-Hub
python -m server.main
# WebSocket: ws://0.0.0.0:8765
# REST API: http://0.0.0.0:3765
6. Troubleshooting
Issue 1: ImportError
Error: ModuleNotFoundError: No module named 'client'
Fix: Set sys.path correctly
import os
import sys
WORKSPACE_DIR = os.getenv('WORKSPACE_DIR', '/home/t/.openclaw/workspace-subagentX')
sys.path.insert(0, WORKSPACE_DIR)
from client.client import LocalServiceRunner
Issue 2: Service Registered but Cannot Be Called
Check:
- Is the Provider process still running?
- Is the method name correct (case-sensitive)?
- Is the parameter format correct?
Issue 3: API Returns None
Common Cause: Data structure parsing error
Fix: Print raw data first to confirm structure
async def get_data(**params):
url = params.get("url")
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
print(f"Raw data: {data}") # Add this debug line
# Then parse based on actual structure
return {"data": data}
Issue 4: Service Not Found
Fix: List all services first
services = await client.discover()
for s in services:
print(f"{s.get('name')}: {s.get('skill_id')}")
Issue 5: Return Value Must Be Dict
Error: TypeError: ... got an unexpected keyword argument
Fix: Handlers must return dict
async def wrong(): # Wrong
return "string"
async def right(**params): # Correct
return {"result": "value"}
Issue 6: Return Value Wrapped in 'result'
现象: After calling service, returns {'result': {'actual_data': '...'}}
说明: Hub wraps the handler's returned dict in the 'result' field
Fix: Use the return value directly, or extract as needed
result = await client.call_service(service_id, "method", params)
# result = {'result': {'temp': 25, 'city': 'Beijing'}}
# Method 1: Use directly (recommended)
data = result # Already unpacked data
# Method 2: If explicit extraction needed
if 'result' in result:
data = result['result']
7. Minimal Examples
Provider (5 lines)
import asyncio, os, sys
sys.path.insert(0, os.getenv('WORKSPACE_DIR','.'))
from client.client import LocalServiceRunner
async def hello(**p): return {"msg":"Hello!"}
r = LocalServiceRunner("demo","Demo Service",os.getenv("HUB_WS_URL","ws://localhost:8765"))
r.register_handler("hello", hello)
asyncio.run(r.run())
Consumer (6 lines)
import asyncio, os, sys
sys.path.insert(0, os.getenv('WORKSPACE_DIR','.'))
from client.skill_client import SkillQueryClient
async def main():
c = SkillQueryClient()
await c.connect()
print([s.get("name") for s in await c.discover()])
await c.disconnect()
asyncio.run(main())
8. File Structure
Claw-Service-Hub/
├── client/
│ ├── client.py # LocalServiceRunner, ToolServiceClient
│ ├── skill_client.py # SkillQueryClient
│ └── management_client.py
├── skills/
│ └── hub-client/
│ └── SKILL.md # This file
└── server/
└── main.py # Hub Server
9. Publishing Flow
- Ensure Hub Server is running (
ws://localhost:8765)
- Provider: Run
python your_service.py to register service
- Consumer: Connect to Hub to discover services
- Consumer: Call service to get results
10. Key Authorization Mechanism (Optional)
10.1 Overview
Optional key authorization mechanism for controlling service access.
Features:
- Time dimension: Key validity period (customizable by Provider)
- Count dimension: Maximum call count (customizable by Provider)
- Dual verification: Provider self-management + Hub verification
10.2 Provider Side - Set Lifecycle Policy
from client.client import LocalServiceRunner
# Create service
runner = LocalServiceRunner(
name="my-protected-service",
description="Service requiring authorization",
hub_url=os.getenv("HUB_WS_URL", "ws://localhost:8765")
)
# Set default lifecycle policy
runner.set_lifecycle_policy(
duration_seconds=3600, # Default 1 hour validity
max_calls=100 # Default 100 calls
)
# Set custom policy (optional)
runner.set_custom_policy(
condition="premium", # Policy name
duration_seconds=86400, # 24 hours
max_calls=1000 # 1000 calls
)
# Register method
async def get_data(**params):
return {"data": "secret data"}
runner.register_handler("get_data", get_data)
# Start service
print(f"🚀 Starting authorized service...")
await runner.run()
10.3 Consumer Side - Request Key and Call
from client.skill_client import SkillQueryClient
async def main():
client = SkillQueryClient(
hub_url=os.getenv("HUB_WS_URL", "ws://localhost:8765")
)
await client.connect()
# Discover services
services = await client.discover()
target = next((s for s in services if "protected" in s.get("name", "")), None)
if not target:
print("Service not found")
return
service_id = target.get("skill_id")
# Method 1: Direct call (if service doesn't require Key)
# result = await client.call_service(service_id, "get_data", {})
# Method 2: Request Key first, then call (recommended)
key_info = await client.request_key(
service_id=service_id,
purpose="Daily data query"
)
if key_info.get("success"):
key = key_info["key"]
lifecycle = key_info["lifecycle"]
print(f"✅ Key obtained successfully")
print(f" Key: {key[:20]}...")
print(f" Validity: {lifecycle.get('remaining_time')} seconds")
print(f" Remaining calls: {lifecycle.get('remaining_calls')} times")
# Call service with Key
result = await client.call_service(
service_id=service_id,
method="get_data",
params={},
key=key # Carry the Key
)
print(f"📥 Result: {result}")
else:
print(f"❌ Key request failed: {key_info.get('reason')}")
await client.disconnect()
asyncio.run(main())
10.4 Handling Key Verification Failure
async def call_with_fallback(client, service_id, method, params):
"""Call with automatic retry"""
# Try without Key first
result = await client.call_service(service_id, method, params)
# Check if Key is required
if result.get("error") and "Key" in result.get("error", ""):
print("Key required, requesting authorization...")
key_info = await client.request_key(service_id, "Auto request")
if key_info.get("success"):
key = key_info["key"]
# Retry with Key
result = await client.call_service(
service_id, method, params, key=key
)
return result
10.5 Message Protocol
| Message Type | Direction | Description |
|---|
| lifecycle_policy | Provider→Hub | Register lifecycle policy |
| key_request | Consumer→Hub→Provider | Request Key |
| key_response | Provider→Hub→Consumer | Return Key (approve/reject) |
| key_revoke | Provider→Hub | Revoke Key |
| call_service (with key) | Consumer→Hub | Call service with Key |
10.6 Lifecycle Parameters
# Provider registers policy
{
"duration_seconds": 3600, # Validity duration (seconds)
"max_calls": 100, # Maximum call count
"custom_policies": { # Optional: custom policies
"premium": {
"duration_seconds": 86400,
"max_calls": 1000
}
}
}
# Key verification result
{
"valid": True,
"key": "key_abc123...",
"lifecycle": {
"expires_at": "2026-03-20T03:47:00Z",
"max_calls": 100,
"call_count": 5,
"remaining_calls": 95,
"remaining_time": 3200
}
}
10.7 Subagent Usage Tips
For Provider (Service Publisher):
- If service requires authorization, set reasonable limits in
set_lifecycle_policy
- Recommended defaults:
max_calls=100, duration_seconds=3600
- Can set different policies for different consumers
For Consumer (Service Caller):
- Try direct call first, request Key if it fails
- Save Key to avoid repeated requests
- Check
lifecycle.get('remaining_calls') to avoid running out of calls
- Watch
lifecycle.get('remaining_time') and renew as needed
License
MIT License
Copyright (c) 2026 OpenClaw
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.