Install
openclaw skills install hub-clientService marketplace: publish data as services, consume hub services
openclaw skills install hub-clienttriggers: provider: - provide.*service - publish.*service - expose.*service - create.*service - make.*service - implement.*service consumer: - what services - list services - call.*service - use.*service - query.*data - fetch.*data
Enables subagents to:
import asyncio
import os
import sys
from pathlib import Path
# === 1. Setup path ===
WORKSPACE_DIR = os.getenv('WORKSPACE_DIR', '/home/t/.openclaw/workspace-subagentX')
sys.path.insert(0, WORKSPACE_DIR)
from client.client import LocalServiceRunner
# === 2. Define your service capability ===
async def your_method(**params):
"""
Service method
params: Parameters passed by the caller
Must return a dict
"""
# Your business logic here
result = {"status": "ok", "data": "..."}
return result
# === 3. Start the service ===
async def main():
runner = LocalServiceRunner(
name="your-service-name", # Service name (English, no spaces)
description="Service description", # English description
hub_url=os.getenv("HUB_WS_URL", "ws://localhost:8765")
)
# Register methods (can register multiple)
runner.register_handler("your_method", your_method)
print(f"🚀 Starting service...")
await runner.run()
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import os
import sys
WORKSPACE_DIR = os.getenv('WORKSPACE_DIR', '/home/t/.openclaw/workspace-subagentX')
sys.path.insert(0, WORKSPACE_DIR)
from client.skill_client import SkillQueryClient
async def main():
# 1. Connect to Hub
client = SkillQueryClient(
hub_url=os.getenv("HUB_WS_URL", "ws://localhost:8765")
)
await client.connect()
# 2. Discover services
services = await client.discover()
print(f"Discovered {len(services)} services")
# 3. Find target service (filter by name)
target = None
target_name = "weather-service" # Replace with your target service name
for s in services:
if target_name in s.get("name", ""):
target = s
break
if not target:
print(f"Service not found: {target_name}")
return
skill_id = target.get("skill_id")
print(f"Using service: {target.get('name')}, skill_id: {skill_id}")
# 4. Call the service
result = await client.call_service(
service_id=skill_id,
method="your_method", # Method name
params={"key": "value"} # Parameters
)
print(f"Result: {result}")
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
from pathlib import Path
DATA_DIR = Path("/path/to/data") # Change to actual directory
async def list_files(**params):
ext = params.get("extension", "")
pattern = f"*{ext}" if ext else "*"
files = [f.name for f in DATA_DIR.glob(pattern) if f.is_file()]
return {"files": files[:50], "total": len(files)}
async def read_file(**params):
filename = params.get("filename")
if not filename:
return {"error": "filename is required"}
filepath = DATA_DIR / filename
if not filepath.exists():
return {"error": f"File not found: {filename}"}
# Read text files directly
if filepath.suffix == '.txt':
return {"content": filepath.read_text()[:1000]}
# Return info for other files
return {"filename": filename, "size": filepath.stat().st_size}
import aiohttp
async def fetch_data(**params):
url = params.get("url")
if not url:
return {"error": "url is required"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(10)) as resp:
data = await resp.json()
return {"status": resp.status, "data": data}
except Exception as e:
return {"error": str(e)}
import aiohttp
async def get_weather(**params):
city = params.get("city", "Shanghai")
url = f"https://wttr.in/{city}?format=j1"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(10)) as resp:
data = await resp.json()
# Note: wttr.in returns structure at data.current_condition
c = data.get("data", {}).get("current_condition", [{}])[0]
return {
"city": city,
"temp": int(c.get("temp_C") or 0),
"condition": c.get("weatherDesc", [{}])[0].get("value", "Unknown"),
"humidity": c.get("humidity")
}
except Exception as e:
return {"error": str(e)}
async def workflow():
"""Complete workflow combining multiple services"""
client = SkillQueryClient("ws://localhost:8765")
await client.connect()
services = await client.discover()
# Find required services
weather = next((s for s in services if "weather" in s.get("name", "")), None)
images = next((s for s in services if "image" in s.get("name", "")), None)
results = {}
# Call weather service
if weather:
w = await client.call_service(weather.get("skill_id"), "get_weather", {"city": "Shanghai"})
results["weather"] = w.get("result", {})
# Call image service
if images:
i = await client.call_service(images.get("skill_id"), "list_images", {"limit": 10})
results["images"] = i.get("result", {})
await client.disconnect()
return results
pip install websockets aiohttp
| Variable | Default | Description |
|---|---|---|
| HUB_WS_URL | ws://localhost:8765 | Hub WebSocket address |
| WORKSPACE_DIR | /home/t/.../workspace-subagentX | Working directory |
cd Claw-Service-Hub
python -m server.main
# WebSocket: ws://0.0.0.0:8765
# REST API: http://0.0.0.0:3765
Error: ModuleNotFoundError: No module named 'client'
Fix: Set sys.path correctly
import os
import sys
WORKSPACE_DIR = os.getenv('WORKSPACE_DIR', '/home/t/.openclaw/workspace-subagentX')
sys.path.insert(0, WORKSPACE_DIR)
from client.client import LocalServiceRunner
Check:
Common Cause: Data structure parsing error
Fix: Print raw data first to confirm structure
async def get_data(**params):
url = params.get("url")
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
print(f"Raw data: {data}") # Add this debug line
# Then parse based on actual structure
return {"data": data}
Fix: List all services first
services = await client.discover()
for s in services:
print(f"{s.get('name')}: {s.get('skill_id')}")
Error: TypeError: ... got an unexpected keyword argument
Fix: Handlers must return dict
async def wrong(): # Wrong
return "string"
async def right(**params): # Correct
return {"result": "value"}
现象: After calling service, returns {'result': {'actual_data': '...'}}
说明: Hub wraps the handler's returned dict in the 'result' field
Fix: Use the return value directly, or extract as needed
result = await client.call_service(service_id, "method", params)
# result = {'result': {'temp': 25, 'city': 'Beijing'}}
# Method 1: Use directly (recommended)
data = result # Already unpacked data
# Method 2: If explicit extraction needed
if 'result' in result:
data = result['result']
import asyncio, os, sys
sys.path.insert(0, os.getenv('WORKSPACE_DIR','.'))
from client.client import LocalServiceRunner
async def hello(**p): return {"msg":"Hello!"}
r = LocalServiceRunner("demo","Demo Service",os.getenv("HUB_WS_URL","ws://localhost:8765"))
r.register_handler("hello", hello)
asyncio.run(r.run())
import asyncio, os, sys
sys.path.insert(0, os.getenv('WORKSPACE_DIR','.'))
from client.skill_client import SkillQueryClient
async def main():
c = SkillQueryClient()
await c.connect()
print([s.get("name") for s in await c.discover()])
await c.disconnect()
asyncio.run(main())
Claw-Service-Hub/
├── client/
│ ├── client.py # LocalServiceRunner, ToolServiceClient
│ ├── skill_client.py # SkillQueryClient
│ └── management_client.py
├── skills/
│ └── hub-client/
│ └── SKILL.md # This file
└── server/
└── main.py # Hub Server
ws://localhost:8765)python your_service.py to register serviceOptional key authorization mechanism for controlling service access.
Features:
from client.client import LocalServiceRunner
# Create service
runner = LocalServiceRunner(
name="my-protected-service",
description="Service requiring authorization",
hub_url=os.getenv("HUB_WS_URL", "ws://localhost:8765")
)
# Set default lifecycle policy
runner.set_lifecycle_policy(
duration_seconds=3600, # Default 1 hour validity
max_calls=100 # Default 100 calls
)
# Set custom policy (optional)
runner.set_custom_policy(
condition="premium", # Policy name
duration_seconds=86400, # 24 hours
max_calls=1000 # 1000 calls
)
# Register method
async def get_data(**params):
return {"data": "secret data"}
runner.register_handler("get_data", get_data)
# Start service
print(f"🚀 Starting authorized service...")
await runner.run()
from client.skill_client import SkillQueryClient
async def main():
client = SkillQueryClient(
hub_url=os.getenv("HUB_WS_URL", "ws://localhost:8765")
)
await client.connect()
# Discover services
services = await client.discover()
target = next((s for s in services if "protected" in s.get("name", "")), None)
if not target:
print("Service not found")
return
service_id = target.get("skill_id")
# Method 1: Direct call (if service doesn't require Key)
# result = await client.call_service(service_id, "get_data", {})
# Method 2: Request Key first, then call (recommended)
key_info = await client.request_key(
service_id=service_id,
purpose="Daily data query"
)
if key_info.get("success"):
key = key_info["key"]
lifecycle = key_info["lifecycle"]
print(f"✅ Key obtained successfully")
print(f" Key: {key[:20]}...")
print(f" Validity: {lifecycle.get('remaining_time')} seconds")
print(f" Remaining calls: {lifecycle.get('remaining_calls')} times")
# Call service with Key
result = await client.call_service(
service_id=service_id,
method="get_data",
params={},
key=key # Carry the Key
)
print(f"📥 Result: {result}")
else:
print(f"❌ Key request failed: {key_info.get('reason')}")
await client.disconnect()
asyncio.run(main())
async def call_with_fallback(client, service_id, method, params):
"""Call with automatic retry"""
# Try without Key first
result = await client.call_service(service_id, method, params)
# Check if Key is required
if result.get("error") and "Key" in result.get("error", ""):
print("Key required, requesting authorization...")
key_info = await client.request_key(service_id, "Auto request")
if key_info.get("success"):
key = key_info["key"]
# Retry with Key
result = await client.call_service(
service_id, method, params, key=key
)
return result
| Message Type | Direction | Description |
|---|---|---|
| lifecycle_policy | Provider→Hub | Register lifecycle policy |
| key_request | Consumer→Hub→Provider | Request Key |
| key_response | Provider→Hub→Consumer | Return Key (approve/reject) |
| key_revoke | Provider→Hub | Revoke Key |
| call_service (with key) | Consumer→Hub | Call service with Key |
# Provider registers policy
{
"duration_seconds": 3600, # Validity duration (seconds)
"max_calls": 100, # Maximum call count
"custom_policies": { # Optional: custom policies
"premium": {
"duration_seconds": 86400,
"max_calls": 1000
}
}
}
# Key verification result
{
"valid": True,
"key": "key_abc123...",
"lifecycle": {
"expires_at": "2026-03-20T03:47:00Z",
"max_calls": 100,
"call_count": 5,
"remaining_calls": 95,
"remaining_time": 3200
}
}
For Provider (Service Publisher):
set_lifecycle_policymax_calls=100, duration_seconds=3600For Consumer (Service Caller):
lifecycle.get('remaining_calls') to avoid running out of callslifecycle.get('remaining_time') and renew as neededMIT License
Copyright (c) 2026 OpenClaw
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.