Skill flagged — suspicious patterns detected

ClawHub Security flagged this skill as suspicious. Review the scan results before using.

Agent Ops Kit

v1.0.0

Production-grade health monitoring, alerting, and service management for OpenClaw agents. Monitor URLs, auto-restart services, get Telegram alerts, track upt...

0· 65·0 current·0 all-time

Install

OpenClaw Prompt Flow

Install with OpenClaw

Best for remote or guided setup. Copy the exact prompt, then paste it into OpenClaw for mrsmithai/agent-ops-kit.

Previewing Install & Setup.
Prompt PreviewInstall & Setup
Install the skill "Agent Ops Kit" (mrsmithai/agent-ops-kit) from ClawHub.
Skill page: https://clawhub.ai/mrsmithai/agent-ops-kit
Keep the work scoped to this skill only.
After install, inspect the skill metadata and help me finish setup.
Use only the metadata you can verify from ClawHub; do not invent missing requirements.
Ask before making any broader environment changes.

Command Line

CLI Commands

Use the direct CLI path if you want to install manually and keep every step visible.

OpenClaw CLI

Bare skill slug

openclaw skills install agent-ops-kit

ClawHub CLI

Package manager switcher

npx clawhub@latest install agent-ops-kit
Security Scan
VirusTotalVirusTotal
Suspicious
View report →
OpenClawOpenClaw
Benign
high confidence
Purpose & Capability
Name and description match the included assets: a shell health-check script, a Python health-check class, and a sample JSON config. Required resources (none declared) align with a lightweight, instruction-only monitoring tool.
Instruction Scope
SKILL.md instructs creating ~/.agent-ops, copying/running the provided scripts, scheduling via cron, and placing a Telegram bot token in the config. The runtime instructions and scripts only contact user-configured targets and the Telegram API. However, the scripts execute restart commands taken directly from the config (via eval), which means the tool will run arbitrary shell commands the user places in services.json; that is expected for an auto-recovery tool but is a privileged action and should be constrained and reviewed by the user.
Install Mechanism
Instruction-only skill with bundled scripts — no network downloads or package installs. Files are written only to user home paths (~/.agent-ops). No high-risk install operations detected.
Credentials
The skill does not declare any required environment variables or secrets. It expects Telegram credentials in the user config file; storing tokens in ~/.agent-ops/config/services.json is functional but places secrets on disk in plaintext. Sample restart commands include 'sudo' and systemctl/docker commands — these are proportional to service restart needs but imply the script may require elevated privileges when run, depending on configuration.
Persistence & Privilege
always:false and no system-wide modifications are requested. The skill writes state, logs, and metrics under ~/.agent-ops (normal). Note: the platform-default autonomous invocation is allowed; if the agent is permitted to invoke the skill autonomously, it could trigger configured restart commands — review config and permissions before enabling autnonomous runs.
Assessment
This toolkit appears to do what it says: health checks, metrics, Telegram alerts, and optional auto-restarts via commands you put in services.json. Before installing: 1) inspect and lock down ~/.agent-ops/config/services.json (it will contain your Telegram bot token and restart commands) and set file permissions so others cannot read/modify it; 2) review every restart_command you add — the script will eval and run them (including sudo/systemctl/docker commands); avoid putting arbitrary shell commands there; 3) if you must allow service restarts, prefer restricted sudoers entries so the monitoring user can only run the specific restart commands required; 4) run and test in a non-production environment first to ensure restart behavior and cron scheduling act as you expect; 5) if you enable autonomous agent invocation, be aware the agent could trigger restarts and alerts automatically — only enable that if you trust the agent and the config.

Like a lobster shell, security has layers — review code before you run it.

latestvk978bwyxwa5h1qq6a96akcjk0x84rvbp
65downloads
0stars
1versions
Updated 2w ago
v1.0.0
MIT-0

Agent Ops Kit

A complete operational toolkit for running production services with autonomous health monitoring, intelligent alerting, service auto-recovery, task board management, and uptime tracking. Built from real production infrastructure running 24/7 across multiple sites.


Table of Contents

  1. Quick Start
  2. Health Monitoring System
  3. Telegram Alerting
  4. Service Auto-Recovery
  5. Task Board System
  6. Uptime Metrics Tracking
  7. Orchestrator Loop
  8. Configuration Reference
  9. Recipes

Quick Start

Prerequisites

1. Create the directory structure

mkdir -p ~/.agent-ops/{config,logs,metrics,scripts,state}

2. Create your configuration file

cat > ~/.agent-ops/config/services.json << 'CONF'
{
  "services": [
    {
      "name": "My Web App",
      "type": "url",
      "target": "https://example.com",
      "expected_status": 200,
      "timeout_seconds": 10
    },
    {
      "name": "API Server",
      "type": "url",
      "target": "https://api.example.com/health",
      "expected_status": 200,
      "timeout_seconds": 5
    },
    {
      "name": "Local Dev Server",
      "type": "port",
      "host": "127.0.0.1",
      "port": 3000
    },
    {
      "name": "Background Worker",
      "type": "process",
      "process_name": "celery",
      "restart_command": "systemctl restart celery"
    }
  ],
  "alerting": {
    "telegram_bot_token": "YOUR_BOT_TOKEN",
    "telegram_chat_id": "YOUR_CHAT_ID",
    "rate_limit_seconds": 300,
    "alert_on_recovery": true
  },
  "check_interval_seconds": 300,
  "disk_warning_gb": 5,
  "stalled_task_hours": 24,
  "metrics_retention_days": 90
}
CONF

3. Deploy the health check script

Copy the health-check.sh from this skill's scripts/ directory to ~/.agent-ops/scripts/ and make it executable:

chmod +x ~/.agent-ops/scripts/health-check.sh

4. Run your first check

~/.agent-ops/scripts/health-check.sh

5. Schedule it

# crontab: run every 5 minutes
(crontab -l 2>/dev/null; echo "*/5 * * * * ~/.agent-ops/scripts/health-check.sh >> ~/.agent-ops/logs/cron.log 2>&1") | crontab -

# Or on macOS, create a launchd plist (see Orchestrator Loop section)

Health Monitoring System

The health monitoring system supports four types of checks, all configurable via services.json.

Check Types

1. URL Checks

Monitor any HTTP/HTTPS endpoint. Verifies status code and optionally response time.

# Simple URL check — returns 0 on success, 1 on failure
check_url() {
  local name="$1" url="$2" expected="${3:-200}" timeout="${4:-10}"
  local start_ms=$(python3 -c "import time; print(int(time.time()*1000))")

  status=$(curl -s -o /dev/null -w "%{http_code}" --max-time "$timeout" "$url" 2>/dev/null || echo "000")
  local end_ms=$(python3 -c "import time; print(int(time.time()*1000))")
  local latency_ms=$(( end_ms - start_ms ))

  if [ "$status" = "$expected" ]; then
    record_metric "$name" "up" "$latency_ms"
    return 0
  else
    record_metric "$name" "down" "$latency_ms"
    return 1
  fi
}

2. Port Checks

Verify a TCP port is accepting connections (useful for databases, local services).

check_port() {
  local name="$1" host="$2" port="$3" timeout="${4:-3}"
  if python3 -c "
import socket, sys
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout($timeout)
try:
    s.connect(('$host', $port))
    s.close()
    sys.exit(0)
except:
    sys.exit(1)
" 2>/dev/null; then
    record_metric "$name" "up" "0"
    return 0
  else
    record_metric "$name" "down" "0"
    return 1
  fi
}

3. Process Checks

Verify a named process is running. Optionally auto-restart it.

check_process() {
  local name="$1" process_name="$2" restart_cmd="$3"
  if pgrep -f "$process_name" > /dev/null 2>&1; then
    record_metric "$name" "up" "0"
    return 0
  else
    record_metric "$name" "down" "0"
    if [ -n "$restart_cmd" ]; then
      log "RESTART: Attempting to restart $name via: $restart_cmd"
      eval "$restart_cmd" 2>/dev/null || true
      sleep 3
      if pgrep -f "$process_name" > /dev/null 2>&1; then
        log "RESTART: $name recovered successfully"
        record_metric "$name" "recovered" "0"
      else
        log "RESTART: $name failed to recover"
      fi
    fi
    return 1
  fi
}

4. Disk Space Check

check_disk() {
  local warning_gb="${1:-5}"
  local free_gb

  if [[ "$(uname)" == "Darwin" ]]; then
    free_gb=$(df -g / | awk 'NR==2 {print $4}')
  else
    free_gb=$(df -BG / | awk 'NR==2 {print $4}' | tr -d 'G')
  fi

  record_metric "disk_free_gb" "$free_gb" "0"

  if [ "$free_gb" -lt "$warning_gb" ]; then
    return 1
  fi
  return 0
}

Python Health Check Class

For more sophisticated checks, use this Python class that reads your config and runs all checks:

#!/usr/bin/env python3
"""
agent_ops_health.py — Configurable health checker with metrics and alerting.
Drop this into ~/.agent-ops/scripts/ and run it directly or import it.
"""

import json
import os
import socket
import subprocess
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime

CONFIG_DIR = os.path.expanduser("~/.agent-ops/config")
METRICS_DIR = os.path.expanduser("~/.agent-ops/metrics")
STATE_DIR = os.path.expanduser("~/.agent-ops/state")
LOG_FILE = os.path.expanduser("~/.agent-ops/logs/health.log")

class HealthResult:
    """Result of a single health check."""
    def __init__(self, name, ok, detail="", latency_ms=0):
        self.name = name
        self.ok = ok
        self.detail = detail
        self.latency_ms = latency_ms
        self.timestamp = datetime.utcnow().isoformat() + "Z"

    def to_dict(self):
        return {
            "name": self.name,
            "status": "up" if self.ok else "down",
            "detail": self.detail,
            "latency_ms": self.latency_ms,
            "timestamp": self.timestamp,
        }

    def __str__(self):
        icon = "OK" if self.ok else "FAIL"
        d = f" - {self.detail}" if self.detail else ""
        lat = f" ({self.latency_ms}ms)" if self.latency_ms else ""
        return f"[{icon}] {self.name}{d}{lat}"


class HealthChecker:
    """Run health checks against configured services."""

    def __init__(self, config_path=None):
        self.config_path = config_path or os.path.join(CONFIG_DIR, "services.json")
        self.config = self._load_config()
        self.results = []
        os.makedirs(METRICS_DIR, exist_ok=True)
        os.makedirs(STATE_DIR, exist_ok=True)
        os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)

    def _load_config(self):
        with open(self.config_path, "r") as f:
            return json.load(f)

    def check_url(self, service):
        """Check an HTTP(S) endpoint."""
        name = service["name"]
        url = service["target"]
        expected = service.get("expected_status", 200)
        timeout = service.get("timeout_seconds", 10)

        start = time.time()
        try:
            req = urllib.request.Request(url, headers={"User-Agent": "AgentOpsKit/1.0"})
            with urllib.request.urlopen(req, timeout=timeout) as resp:
                status = resp.status
                latency = int((time.time() - start) * 1000)
                ok = (status == expected)
                detail = f"HTTP {status}" + ("" if ok else f" (expected {expected})")
                return HealthResult(name, ok, detail, latency)
        except urllib.error.HTTPError as e:
            latency = int((time.time() - start) * 1000)
            return HealthResult(name, False, f"HTTP {e.code}", latency)
        except Exception as e:
            latency = int((time.time() - start) * 1000)
            return HealthResult(name, False, str(e)[:100], latency)

    def check_port(self, service):
        """Check a TCP port is open."""
        name = service["name"]
        host = service.get("host", "127.0.0.1")
        port = service["port"]
        timeout = service.get("timeout_seconds", 3)

        start = time.time()
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(timeout)
            s.connect((host, port))
            s.close()
            latency = int((time.time() - start) * 1000)
            return HealthResult(name, True, f"port {port} open", latency)
        except Exception as e:
            latency = int((time.time() - start) * 1000)
            return HealthResult(name, False, f"port {port}: {e}", latency)

    def check_process(self, service):
        """Check a process is running."""
        name = service["name"]
        proc_name = service["process_name"]

        try:
            r = subprocess.run(
                ["pgrep", "-f", proc_name],
                capture_output=True, text=True, timeout=5
            )
            if r.returncode == 0:
                pids = r.stdout.strip().split("\n")
                return HealthResult(name, True, f"{len(pids)} process(es) running")
            else:
                return HealthResult(name, False, "not running")
        except Exception as e:
            return HealthResult(name, False, str(e)[:100])

    def check_disk(self):
        """Check disk space."""
        warning_gb = self.config.get("disk_warning_gb", 5)
        try:
            st = os.statvfs("/")
            free_gb = (st.f_bavail * st.f_frsize) / (1024 ** 3)
            ok = free_gb > warning_gb
            return HealthResult("Disk Space", ok, f"{free_gb:.1f} GB free")
        except Exception as e:
            return HealthResult("Disk Space", False, str(e))

    def run_all(self):
        """Run all configured checks and return results."""
        self.results = []

        for svc in self.config.get("services", []):
            svc_type = svc.get("type", "url")
            if svc_type == "url":
                self.results.append(self.check_url(svc))
            elif svc_type == "port":
                self.results.append(self.check_port(svc))
            elif svc_type == "process":
                self.results.append(self.check_process(svc))

        # Always check disk
        self.results.append(self.check_disk())

        # Save metrics
        self._save_metrics()

        return self.results

    def _save_metrics(self):
        """Append results to daily metrics file (JSONL)."""
        date_str = datetime.utcnow().strftime("%Y-%m-%d")
        metrics_file = os.path.join(METRICS_DIR, f"{date_str}.jsonl")
        with open(metrics_file, "a") as f:
            for r in self.results:
                f.write(json.dumps(r.to_dict()) + "\n")

    def get_failures(self):
        return [r for r in self.results if not r.ok]

    def summary(self):
        total = len(self.results)
        passed = sum(1 for r in self.results if r.ok)
        failed = total - passed
        return f"{passed}/{total} checks passed, {failed} failed"


def main():
    checker = HealthChecker()
    results = checker.run_all()

    for r in results:
        print(r)

    failures = checker.get_failures()
    if failures:
        print(f"\n{len(failures)} failure(s) detected.")
        sys.exit(1)
    else:
        print(f"\nAll {len(results)} checks passed.")
        sys.exit(0)


if __name__ == "__main__":
    main()

Telegram Alerting

Setting Up a Telegram Bot

  1. Open Telegram and message @BotFather
  2. Send /newbot and follow the prompts
  3. Copy the bot token (looks like 110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw)
  4. Start a conversation with your new bot (send it any message)
  5. Get your chat ID:
# Replace YOUR_BOT_TOKEN with your actual token
curl -s "https://api.telegram.org/botYOUR_BOT_TOKEN/getUpdates" | python3 -c "
import json, sys
data = json.load(sys.stdin)
for r in data.get('result', []):
    chat = r.get('message', {}).get('chat', {})
    if chat:
        print(f\"Chat ID: {chat['id']}  ({chat.get('first_name', '')} {chat.get('last_name', '')})\")
        break
"
  1. Add your token and chat ID to ~/.agent-ops/config/services.json in the alerting section.

Alert System with Rate Limiting

The alert system includes built-in rate limiting to prevent alert storms. Each unique topic is limited to one alert per 5 minutes (configurable).

#!/usr/bin/env python3
"""
agent_ops_alert.py — Telegram alerting with rate limiting and topic deduplication.

Usage:
  python3 agent_ops_alert.py send "Something broke"
  python3 agent_ops_alert.py send "Disk low" --level warning
  python3 agent_ops_alert.py send "API down" --topic api-health --level error
  python3 agent_ops_alert.py send "Recovered" --level info
"""

import json
import os
import sys
import time
import urllib.request
import urllib.error

CONFIG_FILE = os.path.expanduser("~/.agent-ops/config/services.json")
RATE_STATE_FILE = os.path.expanduser("~/.agent-ops/state/alert-rate.json")

LEVEL_PREFIX = {
    "info": "[INFO]",
    "warning": "[WARNING]",
    "error": "[ERROR]",
    "recovery": "[RECOVERED]",
}

def load_config():
    with open(CONFIG_FILE, "r") as f:
        return json.load(f)

def load_rate_state():
    if os.path.exists(RATE_STATE_FILE):
        try:
            with open(RATE_STATE_FILE, "r") as f:
                return json.load(f)
        except (json.JSONDecodeError, IOError):
            return {}
    return {}

def save_rate_state(state):
    os.makedirs(os.path.dirname(RATE_STATE_FILE), exist_ok=True)
    now = time.time()
    pruned = {k: v for k, v in state.items() if now - v < 3600}
    with open(RATE_STATE_FILE, "w") as f:
        json.dump(pruned, f, indent=2)

def is_rate_limited(topic, rate_limit_seconds):
    if not topic:
        return False
    state = load_rate_state()
    last_sent = state.get(topic, 0)
    return (time.time() - last_sent) < rate_limit_seconds

def record_sent(topic):
    if not topic:
        return
    state = load_rate_state()
    state[topic] = time.time()
    save_rate_state(state)

def send_telegram(bot_token, chat_id, text):
    """Send a message via Telegram Bot API."""
    url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
    payload = json.dumps({
        "chat_id": chat_id,
        "text": text,
        "parse_mode": "Markdown",
        "disable_web_page_preview": True,
    }).encode("utf-8")

    req = urllib.request.Request(url, data=payload, headers={
        "Content-Type": "application/json",
    })

    try:
        with urllib.request.urlopen(req, timeout=10) as resp:
            result = json.loads(resp.read().decode("utf-8"))
            if result.get("ok"):
                return True
            else:
                print(f"Telegram API error: {result}", file=sys.stderr)
                return False
    except urllib.error.HTTPError as e:
        body = e.read().decode("utf-8", errors="replace")
        print(f"Telegram HTTP {e.code}: {body}", file=sys.stderr)
        return False
    except Exception as e:
        print(f"Network error: {e}", file=sys.stderr)
        return False

def send_alert(message, level="info", topic=None):
    """High-level: send an alert with rate limiting."""
    config = load_config()
    alerting = config.get("alerting", {})

    bot_token = alerting.get("telegram_bot_token")
    chat_id = alerting.get("telegram_chat_id")
    rate_limit = alerting.get("rate_limit_seconds", 300)

    if not bot_token or not chat_id:
        print("ERROR: telegram_bot_token and telegram_chat_id required in config", file=sys.stderr)
        return False

    if is_rate_limited(topic, rate_limit):
        print(f"Rate limited: topic '{topic}' sent within last {rate_limit}s. Skipping.")
        return True  # Not an error, just throttled

    prefix = LEVEL_PREFIX.get(level, "")
    text = f"{prefix} *Agent Ops Alert*\n\n{message}"

    success = send_telegram(bot_token, chat_id, text)
    if success:
        record_sent(topic)
    return success

def main():
    if len(sys.argv) < 3 or sys.argv[1] != "send":
        print(__doc__.strip())
        sys.exit(0)

    message = sys.argv[2]
    level = "info"
    topic = None

    i = 3
    while i < len(sys.argv):
        if sys.argv[i] == "--level" and i + 1 < len(sys.argv):
            level = sys.argv[i + 1]
            i += 2
        elif sys.argv[i] == "--topic" and i + 1 < len(sys.argv):
            topic = sys.argv[i + 1]
            i += 2
        else:
            i += 1

    success = send_alert(message, level=level, topic=topic)
    sys.exit(0 if success else 1)

if __name__ == "__main__":
    main()

Integration with Health Checks

Wire alerting into your health checker by adding this to your check loop:

from agent_ops_alert import send_alert

def check_and_alert(checker):
    """Run checks, alert on failures, and optionally alert on recovery."""
    # Load previous state for recovery detection
    state_file = os.path.expanduser("~/.agent-ops/state/last-status.json")
    prev_status = {}
    if os.path.exists(state_file):
        with open(state_file, "r") as f:
            prev_status = json.load(f)

    results = checker.run_all()
    failures = checker.get_failures()
    current_status = {r.name: r.ok for r in results}

    # Alert on new failures
    if failures:
        lines = ["Health check failures:"]
        for f in failures:
            lines.append(f"  - {f.name}: {f.detail}")
        send_alert("\n".join(lines), level="error", topic="health-check")

    # Alert on recoveries (was down, now up)
    if checker.config.get("alerting", {}).get("alert_on_recovery", False):
        for name, ok in current_status.items():
            if ok and prev_status.get(name) is False:
                send_alert(f"{name} has recovered.", level="recovery", topic=f"recovery-{name}")

    # Save current state
    os.makedirs(os.path.dirname(state_file), exist_ok=True)
    with open(state_file, "w") as f:
        json.dump(current_status, f, indent=2)

    return results

Service Auto-Recovery

The auto-recovery system attempts to restart failed services using configurable restart commands.

Configuration

Add restart_command to any service in your config:

{
  "name": "Web Server",
  "type": "process",
  "process_name": "nginx",
  "restart_command": "sudo systemctl restart nginx",
  "restart_cooldown_seconds": 300,
  "max_restart_attempts": 3
}

For URL-based services, you can also define a restart:

{
  "name": "My App",
  "type": "url",
  "target": "https://myapp.example.com/health",
  "expected_status": 200,
  "restart_command": "cd /opt/myapp && docker compose restart web",
  "restart_cooldown_seconds": 600
}

Auto-Recovery Engine

#!/usr/bin/env python3
"""
agent_ops_recovery.py — Automatic service recovery with cooldowns and attempt tracking.
"""

import json
import os
import subprocess
import time
from datetime import datetime

STATE_FILE = os.path.expanduser("~/.agent-ops/state/recovery.json")
LOG_FILE = os.path.expanduser("~/.agent-ops/logs/recovery.log")

def log(msg):
    os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
    ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    with open(LOG_FILE, "a") as f:
        f.write(f"[{ts}] {msg}\n")

def load_state():
    if os.path.exists(STATE_FILE):
        try:
            with open(STATE_FILE, "r") as f:
                return json.load(f)
        except:
            return {}
    return {}

def save_state(state):
    os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
    with open(STATE_FILE, "w") as f:
        json.dump(state, f, indent=2)

def attempt_recovery(service_name, restart_command, cooldown=300, max_attempts=3):
    """
    Attempt to recover a failed service.

    Returns: "recovered", "cooldown", "max_attempts", or "failed"
    """
    state = load_state()
    svc_state = state.get(service_name, {"attempts": 0, "last_attempt": 0})

    now = time.time()

    # Check cooldown
    if now - svc_state.get("last_attempt", 0) < cooldown:
        remaining = int(cooldown - (now - svc_state["last_attempt"]))
        log(f"COOLDOWN: {service_name} — {remaining}s remaining")
        return "cooldown"

    # Check max attempts (reset after 1 hour of no attempts)
    if now - svc_state.get("last_attempt", 0) > 3600:
        svc_state["attempts"] = 0

    if svc_state["attempts"] >= max_attempts:
        log(f"MAX_ATTEMPTS: {service_name} — {svc_state['attempts']} attempts exhausted")
        return "max_attempts"

    # Attempt restart
    log(f"RESTART: {service_name} — attempt {svc_state['attempts'] + 1}/{max_attempts}")
    try:
        result = subprocess.run(
            restart_command, shell=True,
            capture_output=True, text=True, timeout=30,
            env={**os.environ, "PATH": "/opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin:/sbin:/usr/sbin"}
        )
        svc_state["attempts"] += 1
        svc_state["last_attempt"] = now

        if result.returncode == 0:
            log(f"RESTART OK: {service_name}")
            svc_state["last_success"] = now
            state[service_name] = svc_state
            save_state(state)
            return "recovered"
        else:
            log(f"RESTART FAIL: {service_name} — {result.stderr[:200]}")
            state[service_name] = svc_state
            save_state(state)
            return "failed"

    except subprocess.TimeoutExpired:
        log(f"RESTART TIMEOUT: {service_name}")
        svc_state["attempts"] += 1
        svc_state["last_attempt"] = now
        state[service_name] = svc_state
        save_state(state)
        return "failed"
    except Exception as e:
        log(f"RESTART ERROR: {service_name} — {e}")
        return "failed"


def recover_failed_services(health_results, services_config):
    """
    Given health check results and service configs, attempt recovery for
    any failed services that have a restart_command.
    """
    from agent_ops_alert import send_alert

    services_by_name = {s["name"]: s for s in services_config}
    actions = []

    for result in health_results:
        if result.ok:
            continue

        svc = services_by_name.get(result.name, {})
        restart_cmd = svc.get("restart_command")
        if not restart_cmd:
            continue

        cooldown = svc.get("restart_cooldown_seconds", 300)
        max_attempts = svc.get("max_restart_attempts", 3)

        outcome = attempt_recovery(result.name, restart_cmd, cooldown, max_attempts)
        actions.append((result.name, outcome))

        if outcome == "recovered":
            send_alert(
                f"Auto-recovered: {result.name}\nRestart command: `{restart_cmd}`",
                level="info", topic=f"recovery-{result.name}"
            )
        elif outcome == "max_attempts":
            send_alert(
                f"CRITICAL: {result.name} failed after max restart attempts.\nManual intervention required.",
                level="error", topic=f"critical-{result.name}"
            )

    return actions

macOS launchd Auto-Recovery

For macOS services managed by launchd:

# Restart a launchd service
restart_launchd_service() {
  local service_label="$1"
  local uid=$(id -u)
  launchctl kickstart -k "gui/${uid}/${service_label}" 2>/dev/null
  return $?
}

# Example: restart a tunnel
restart_launchd_service "com.myapp.cloudflared"

systemd Auto-Recovery (Linux)

restart_systemd_service() {
  local service_name="$1"
  sudo systemctl restart "$service_name"
  sleep 2
  if systemctl is-active --quiet "$service_name"; then
    return 0
  else
    return 1
  fi
}

Task Board System

A lightweight JSON-based task board for tracking operational work. Uses atomic writes and file locking to prevent data corruption.

Board Structure

{
  "columns": ["open", "in-progress", "in-review", "done"],
  "tasks": [
    {
      "id": "task-abc123",
      "title": "Investigate high latency on API",
      "column": "open",
      "owner": "agent-1",
      "priority": "high",
      "description": "API response times spiked to 2s+",
      "createdAt": "2025-01-15T10:30:00Z",
      "updatedAt": "2025-01-15T10:30:00Z",
      "tags": ["ops", "performance"]
    }
  ]
}

Board I/O Module (Atomic, Locked)

This is critical for production use. Never read/write the board file directly; always go through this module to prevent data corruption.

#!/usr/bin/env python3
"""
agent_ops_board.py — Atomic, locked task board I/O.

Guarantees:
- Atomic writes (write to .tmp, then os.replace)
- Exclusive file locking (prevents concurrent corruption)
- Read-modify-write under one lock
- Auto-backup before every write
- JSON validation with fallback to backup
"""

import json
import os
import fcntl
import shutil
import time
import hashlib
from contextlib import contextmanager
from datetime import datetime

BOARD_DIR = os.path.expanduser("~/.agent-ops/state")
BOARD_FILE = os.path.join(BOARD_DIR, "board.json")
BOARD_LOCK = os.path.join(BOARD_DIR, "board.lock")
BOARD_BACKUP = os.path.join(BOARD_DIR, "board.json.bak")

DEFAULT_COLUMNS = ["open", "in-progress", "in-review", "done"]
MAX_LOCK_WAIT = 30
LOCK_RETRY = 0.1

def _ensure_dir():
    os.makedirs(BOARD_DIR, exist_ok=True)

@contextmanager
def _file_lock():
    """Exclusive file lock with timeout."""
    _ensure_dir()
    deadline = time.time() + MAX_LOCK_WAIT
    lock_fd = open(BOARD_LOCK, "w")
    while True:
        try:
            fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
            break
        except BlockingIOError:
            if time.time() > deadline:
                lock_fd.close()
                raise TimeoutError(f"Could not acquire board lock within {MAX_LOCK_WAIT}s")
            time.sleep(LOCK_RETRY)
    try:
        yield lock_fd
    finally:
        fcntl.flock(lock_fd, fcntl.LOCK_UN)
        lock_fd.close()

def board_read():
    """Read board.json under lock. Returns dict."""
    _ensure_dir()
    with _file_lock():
        if not os.path.exists(BOARD_FILE):
            return {"columns": list(DEFAULT_COLUMNS), "tasks": []}
        try:
            with open(BOARD_FILE, "r") as f:
                data = json.load(f)
            if "columns" not in data:
                data["columns"] = list(DEFAULT_COLUMNS)
            if "tasks" not in data:
                data["tasks"] = []
            return data
        except json.JSONDecodeError:
            # Try backup
            if os.path.exists(BOARD_BACKUP):
                with open(BOARD_BACKUP, "r") as f:
                    return json.load(f)
            return {"columns": list(DEFAULT_COLUMNS), "tasks": []}

def board_write(data):
    """Write board.json atomically under lock."""
    _ensure_dir()
    with _file_lock():
        # Backup current
        if os.path.exists(BOARD_FILE):
            shutil.copy2(BOARD_FILE, BOARD_BACKUP)
        # Atomic write
        tmp = BOARD_FILE + ".tmp"
        with open(tmp, "w") as f:
            json.dump(data, f, indent=2)
        os.replace(tmp, BOARD_FILE)

def board_update(fn):
    """Read-modify-write under one lock. fn receives the board dict and must return it."""
    _ensure_dir()
    with _file_lock():
        if os.path.exists(BOARD_FILE):
            with open(BOARD_FILE, "r") as f:
                data = json.load(f)
        else:
            data = {"columns": list(DEFAULT_COLUMNS), "tasks": []}
        data = fn(data)
        if os.path.exists(BOARD_FILE):
            shutil.copy2(BOARD_FILE, BOARD_BACKUP)
        tmp = BOARD_FILE + ".tmp"
        with open(tmp, "w") as f:
            json.dump(data, f, indent=2)
        os.replace(tmp, BOARD_FILE)
        return data

def add_task(title, owner="", priority="medium", description="", tags=None):
    """Add a new task to the board."""
    now = datetime.utcnow().isoformat() + "Z"
    task_id = "task-" + hashlib.md5(f"{title}{now}".encode()).hexdigest()[:8]
    task = {
        "id": task_id,
        "title": title,
        "column": "open",
        "owner": owner,
        "priority": priority,
        "description": description,
        "tags": tags or [],
        "createdAt": now,
        "updatedAt": now,
    }

    def _add(board):
        board["tasks"].append(task)
        return board

    board_update(_add)
    return task

def move_task(task_id, new_column):
    """Move a task to a new column."""
    now = datetime.utcnow().isoformat() + "Z"

    def _move(board):
        for t in board["tasks"]:
            if t["id"] == task_id:
                t["column"] = new_column
                t["updatedAt"] = now
                break
        return board

    board_update(_move)

def get_stalled_tasks(hours=24):
    """Find tasks stuck in 'in-progress' longer than the given hours."""
    board = board_read()
    stalled = []
    now = datetime.utcnow()
    for t in board["tasks"]:
        if t["column"] == "in-progress":
            updated = t.get("updatedAt", t.get("createdAt", ""))
            if updated:
                try:
                    ts = datetime.fromisoformat(updated.replace("Z", ""))
                    if (now - ts).total_seconds() > hours * 3600:
                        stalled.append(t)
                except ValueError:
                    pass
    return stalled

CLI Usage

# Add a task
python3 -c "
from agent_ops_board import add_task
t = add_task('Fix API timeout issue', owner='ops-agent', priority='high', tags=['ops', 'urgent'])
print(f'Created: {t[\"id\"]} — {t[\"title\"]}')
"

# Move a task
python3 -c "
from agent_ops_board import move_task
move_task('task-abc123', 'in-progress')
print('Moved to in-progress')
"

# Find stalled tasks
python3 -c "
from agent_ops_board import get_stalled_tasks
for t in get_stalled_tasks(hours=4):
    print(f'STALLED: {t[\"owner\"]}: {t[\"title\"]} (since {t[\"updatedAt\"]})')
"

Uptime Metrics Tracking

Metrics Storage

Metrics are stored as daily JSONL files in ~/.agent-ops/metrics/:

~/.agent-ops/metrics/
  2025-01-15.jsonl
  2025-01-16.jsonl
  ...

Each line is a JSON object:

{"name": "My App", "status": "up", "latency_ms": 145, "timestamp": "2025-01-15T10:30:00Z"}

Uptime Calculator

#!/usr/bin/env python3
"""
agent_ops_metrics.py — Calculate uptime percentages and generate reports.
"""

import json
import os
import glob
from datetime import datetime, timedelta
from collections import defaultdict

METRICS_DIR = os.path.expanduser("~/.agent-ops/metrics")

def load_metrics(days=7):
    """Load metrics for the last N days."""
    entries = []
    now = datetime.utcnow()
    for i in range(days):
        date_str = (now - timedelta(days=i)).strftime("%Y-%m-%d")
        path = os.path.join(METRICS_DIR, f"{date_str}.jsonl")
        if os.path.exists(path):
            with open(path, "r") as f:
                for line in f:
                    line = line.strip()
                    if line:
                        try:
                            entries.append(json.loads(line))
                        except json.JSONDecodeError:
                            pass
    return entries

def calculate_uptime(days=7):
    """Calculate uptime percentage per service over the last N days."""
    entries = load_metrics(days)

    # Group by service
    checks = defaultdict(lambda: {"up": 0, "down": 0, "total_latency": 0, "count": 0})

    for e in entries:
        name = e.get("name", "unknown")
        status = e.get("status", "unknown")
        latency = e.get("latency_ms", 0)

        if status == "up":
            checks[name]["up"] += 1
        elif status == "down":
            checks[name]["down"] += 1

        checks[name]["total_latency"] += latency
        checks[name]["count"] += 1

    results = {}
    for name, data in checks.items():
        total = data["up"] + data["down"]
        if total > 0:
            uptime_pct = (data["up"] / total) * 100
            avg_latency = data["total_latency"] / data["count"] if data["count"] else 0
        else:
            uptime_pct = 0
            avg_latency = 0

        results[name] = {
            "uptime_percent": round(uptime_pct, 2),
            "total_checks": total,
            "failures": data["down"],
            "avg_latency_ms": round(avg_latency, 1),
        }

    return results

def uptime_report(days=7):
    """Generate a human-readable uptime report."""
    results = calculate_uptime(days)
    lines = [f"=== Uptime Report (last {days} days) ===\n"]

    for name, data in sorted(results.items()):
        pct = data["uptime_percent"]
        indicator = "OK" if pct >= 99.5 else ("WARN" if pct >= 95 else "CRITICAL")
        lines.append(
            f"[{indicator}] {name}: {pct}% uptime "
            f"({data['failures']} failures in {data['total_checks']} checks, "
            f"avg {data['avg_latency_ms']}ms)"
        )

    return "\n".join(lines)

def cleanup_old_metrics(retention_days=90):
    """Delete metrics files older than retention period."""
    cutoff = datetime.utcnow() - timedelta(days=retention_days)
    removed = 0
    for path in glob.glob(os.path.join(METRICS_DIR, "*.jsonl")):
        basename = os.path.basename(path).replace(".jsonl", "")
        try:
            file_date = datetime.strptime(basename, "%Y-%m-%d")
            if file_date < cutoff:
                os.remove(path)
                removed += 1
        except ValueError:
            pass
    return removed

if __name__ == "__main__":
    print(uptime_report())

Telegram Uptime Reports

Send a daily uptime summary to Telegram:

#!/bin/bash
# daily-uptime-report.sh — Send uptime summary to Telegram
# Schedule: 0 9 * * * (daily at 9 AM)

cd ~/.agent-ops

report=$(python3 -c "
from scripts.agent_ops_metrics import uptime_report
print(uptime_report(days=1))
")

python3 scripts/agent_ops_alert.py send "$report" --level info --topic daily-uptime

Orchestrator Loop

The orchestrator ties everything together into a single periodic loop.

Full Orchestrator Script

#!/bin/bash
# agent-ops-orchestrator.sh — Main loop: check, recover, alert, track.
# Run every 5-10 minutes via cron or launchd.

set -euo pipefail

OPS_DIR="$HOME/.agent-ops"
LOG="$OPS_DIR/logs/orchestrator.log"
SCRIPTS="$OPS_DIR/scripts"

mkdir -p "$OPS_DIR/logs"

log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> "$LOG"; }

log "=== Orchestrator run starting ==="

# Phase 1: Health checks
log "Phase 1: Health checks"
python3 "$SCRIPTS/agent_ops_health.py" > "$OPS_DIR/logs/last-health.txt" 2>&1
health_exit=$?

if [ $health_exit -ne 0 ]; then
  log "Health check found failures"

  # Phase 2: Auto-recovery
  log "Phase 2: Auto-recovery"
  python3 -c "
from scripts.agent_ops_health import HealthChecker
from scripts.agent_ops_recovery import recover_failed_services

checker = HealthChecker()
results = checker.run_all()
failures = checker.get_failures()

if failures:
    actions = recover_failed_services(failures, checker.config.get('services', []))
    for name, outcome in actions:
        print(f'  {name}: {outcome}')
" 2>&1 | tee -a "$LOG"

  # Phase 3: Alert
  log "Phase 3: Alerting"
  python3 -c "
from scripts.agent_ops_health import HealthChecker
from scripts.agent_ops_alert import send_alert

checker = HealthChecker()
results = checker.run_all()
failures = checker.get_failures()

if failures:
    lines = ['Health check failures after recovery attempt:']
    for f in failures:
        lines.append(f'  - {f.name}: {f.detail}')
    send_alert(chr(10).join(lines), level='error', topic='health-check')
" 2>&1 | tee -a "$LOG"

else
  log "All checks passed"
fi

# Phase 4: Stalled task detection
log "Phase 4: Stalled task check"
python3 -c "
from scripts.agent_ops_board import get_stalled_tasks
from scripts.agent_ops_alert import send_alert

stalled = get_stalled_tasks(hours=24)
if stalled:
    lines = ['Stalled tasks (>24h in progress):']
    for t in stalled:
        lines.append(f\"  - {t.get('owner', '?')}: {t['title'][:60]}\")
    send_alert(chr(10).join(lines), level='warning', topic='stalled-tasks')
    print(f'{len(stalled)} stalled task(s) found')
else:
    print('No stalled tasks')
" 2>&1 | tee -a "$LOG"

# Phase 5: Metrics cleanup
log "Phase 5: Metrics maintenance"
python3 -c "
from scripts.agent_ops_metrics import cleanup_old_metrics
removed = cleanup_old_metrics(retention_days=90)
if removed:
    print(f'Cleaned up {removed} old metrics file(s)')
" 2>&1 | tee -a "$LOG"

# Phase 6: Log rotation (rotate if > 50MB)
for logfile in "$OPS_DIR"/logs/*.log; do
  if [ -f "$logfile" ]; then
    size=$(stat -f%z "$logfile" 2>/dev/null || stat --format=%s "$logfile" 2>/dev/null || echo 0)
    if [ "$size" -gt 52428800 ]; then
      mv "$logfile" "${logfile}.$(date +%Y%m%d)"
      gzip "${logfile}.$(date +%Y%m%d)" 2>/dev/null || true
      log "Rotated $(basename "$logfile")"
    fi
  fi
done

log "=== Orchestrator run complete ==="

macOS launchd Setup

Create ~/Library/LaunchAgents/com.agent-ops.orchestrator.plist:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
    <key>Label</key>
    <string>com.agent-ops.orchestrator</string>
    <key>ProgramArguments</key>
    <array>
        <string>/bin/bash</string>
        <string>-c</string>
        <string>~/.agent-ops/scripts/agent-ops-orchestrator.sh</string>
    </array>
    <key>StartInterval</key>
    <integer>300</integer>
    <key>StandardOutPath</key>
    <string>~/.agent-ops/logs/launchd-stdout.log</string>
    <key>StandardErrorPath</key>
    <string>~/.agent-ops/logs/launchd-stderr.log</string>
    <key>RunAtLoad</key>
    <true/>
    <key>EnvironmentVariables</key>
    <dict>
        <key>PATH</key>
        <string>/opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin</string>
    </dict>
</dict>
</plist>

Load it:

launchctl load ~/Library/LaunchAgents/com.agent-ops.orchestrator.plist

Linux systemd Setup

Create /etc/systemd/system/agent-ops.service:

[Unit]
Description=Agent Ops Kit Orchestrator
After=network.target

[Service]
Type=oneshot
User=your-username
ExecStart=/bin/bash /home/your-username/.agent-ops/scripts/agent-ops-orchestrator.sh
Environment=PATH=/usr/local/bin:/usr/bin:/bin

[Install]
WantedBy=multi-user.target

And a timer /etc/systemd/system/agent-ops.timer:

[Unit]
Description=Run Agent Ops every 5 minutes

[Timer]
OnBootSec=60
OnUnitActiveSec=300

[Install]
WantedBy=timers.target
sudo systemctl enable agent-ops.timer
sudo systemctl start agent-ops.timer

Configuration Reference

Full services.json Schema

{
  "services": [
    {
      "name": "string (required) — Human-readable service name",
      "type": "url | port | process (required)",
      "target": "string — URL for type=url",
      "host": "string — Host for type=port (default: 127.0.0.1)",
      "port": "number — Port for type=port",
      "process_name": "string — Process name for type=process (used with pgrep -f)",
      "expected_status": "number — Expected HTTP status (default: 200)",
      "timeout_seconds": "number — Request timeout (default: 10)",
      "restart_command": "string — Shell command to restart the service",
      "restart_cooldown_seconds": "number — Min seconds between restart attempts (default: 300)",
      "max_restart_attempts": "number — Max restarts before giving up (default: 3)"
    }
  ],
  "alerting": {
    "telegram_bot_token": "string — Telegram bot API token",
    "telegram_chat_id": "string — Telegram chat ID for alerts",
    "rate_limit_seconds": "number — Min seconds between same-topic alerts (default: 300)",
    "alert_on_recovery": "boolean — Send alert when a service recovers (default: true)"
  },
  "check_interval_seconds": "number — How often to run checks (for reference, actual scheduling is external)",
  "disk_warning_gb": "number — Alert when free disk space drops below this (default: 5)",
  "stalled_task_hours": "number — Hours before a task is considered stalled (default: 24)",
  "metrics_retention_days": "number — Days to keep metrics files (default: 90)"
}

Environment Variables (Optional Overrides)

VariableDescriptionDefault
AGENT_OPS_CONFIGPath to services.json~/.agent-ops/config/services.json
AGENT_OPS_LOG_DIRLog directory~/.agent-ops/logs
AGENT_OPS_METRICS_DIRMetrics directory~/.agent-ops/metrics
TELEGRAM_BOT_TOKENOverride bot token from config(from config)
TELEGRAM_CHAT_IDOverride chat ID from config(from config)

Recipes

Recipe 1: Monitor a Cloudflare Pages Site

{
  "name": "My Blog",
  "type": "url",
  "target": "https://myblog.pages.dev",
  "expected_status": 200,
  "timeout_seconds": 15
}

Recipe 2: Monitor a Docker Compose Stack

[
  {
    "name": "Web Frontend",
    "type": "url",
    "target": "http://localhost:3000/health",
    "restart_command": "cd /opt/myapp && docker compose restart web"
  },
  {
    "name": "API Backend",
    "type": "url",
    "target": "http://localhost:8080/api/health",
    "restart_command": "cd /opt/myapp && docker compose restart api"
  },
  {
    "name": "Redis",
    "type": "port",
    "host": "127.0.0.1",
    "port": 6379,
    "restart_command": "cd /opt/myapp && docker compose restart redis"
  },
  {
    "name": "PostgreSQL",
    "type": "port",
    "host": "127.0.0.1",
    "port": 5432,
    "restart_command": "cd /opt/myapp && docker compose restart db"
  }
]

Recipe 3: Monitor Multiple Domains with SSL Check

# Add this to your health check loop for SSL expiry monitoring
check_ssl_expiry() {
  local domain="$1" warning_days="${2:-14}"
  local expiry_date
  expiry_date=$(echo | openssl s_client -servername "$domain" -connect "$domain:443" 2>/dev/null | \
    openssl x509 -noout -enddate 2>/dev/null | cut -d= -f2)

  if [ -z "$expiry_date" ]; then
    echo "FAIL: Could not check SSL for $domain"
    return 1
  fi

  local expiry_epoch
  if [[ "$(uname)" == "Darwin" ]]; then
    expiry_epoch=$(date -j -f "%b %d %H:%M:%S %Y %Z" "$expiry_date" +%s 2>/dev/null || echo 0)
  else
    expiry_epoch=$(date -d "$expiry_date" +%s 2>/dev/null || echo 0)
  fi

  local now_epoch=$(date +%s)
  local days_left=$(( (expiry_epoch - now_epoch) / 86400 ))

  if [ "$days_left" -lt "$warning_days" ]; then
    echo "WARNING: $domain SSL expires in $days_left days"
    return 1
  fi
  echo "OK: $domain SSL valid for $days_left days"
  return 0
}

Recipe 4: Weekly Uptime Digest

#!/bin/bash
# weekly-digest.sh — Send weekly uptime digest every Monday at 9 AM
# Cron: 0 9 * * 1 ~/.agent-ops/scripts/weekly-digest.sh

report=$(python3 -c "
from agent_ops_metrics import uptime_report, calculate_uptime

# 7-day report
print(uptime_report(days=7))
print()

# Highlight any service below 99.9%
results = calculate_uptime(days=7)
concerns = [(n, d) for n, d in results.items() if d['uptime_percent'] < 99.9]
if concerns:
    print('Services below 99.9% SLA:')
    for name, data in concerns:
        print(f'  {name}: {data[\"uptime_percent\"]}% ({data[\"failures\"]} failures)')
else:
    print('All services above 99.9% SLA target.')
")

python3 ~/.agent-ops/scripts/agent_ops_alert.py send "$report" --level info --topic weekly-digest

Recipe 5: Create Ops Tasks from Health Failures

# Auto-create task board entries when services fail repeatedly

from agent_ops_board import add_task, board_read
from agent_ops_metrics import calculate_uptime

def create_ops_tasks_from_failures():
    """Create investigation tasks for services with poor uptime."""
    results = calculate_uptime(days=1)
    board = board_read()
    existing_titles = {t["title"] for t in board["tasks"] if t["column"] != "done"}

    for name, data in results.items():
        if data["uptime_percent"] < 95:
            title = f"Investigate: {name} at {data['uptime_percent']}% uptime"
            if title not in existing_titles:
                add_task(
                    title=title,
                    owner="ops-agent",
                    priority="high",
                    description=f"{data['failures']} failures in {data['total_checks']} checks. "
                                f"Avg latency: {data['avg_latency_ms']}ms",
                    tags=["auto-generated", "ops", "uptime"],
                )
                print(f"Created task: {title}")

Troubleshooting

Alert not sending

  1. Verify bot token: curl https://api.telegram.org/botYOUR_TOKEN/getMe
  2. Verify chat ID: Send a message to the bot, then check /getUpdates
  3. Check rate limiting: Look at ~/.agent-ops/state/alert-rate.json

Health checks timing out

  1. Increase timeout_seconds in the service config
  2. Check if the service is behind a firewall or VPN
  3. Test manually: curl -v --max-time 10 <URL>

Board file locked

If the board file appears locked (stale lock):

# Check if any process holds the lock
lsof ~/.agent-ops/state/board.lock

# If no process holds it, remove the stale lock
rm ~/.agent-ops/state/board.lock

Metrics disk usage

# Check metrics directory size
du -sh ~/.agent-ops/metrics/

# Manual cleanup (keep last 30 days)
python3 -c "
from agent_ops_metrics import cleanup_old_metrics
removed = cleanup_old_metrics(retention_days=30)
print(f'Removed {removed} files')
"

License

MIT License. Built by OpenClaw Systems.

Comments

Loading comments...