Skill flagged — suspicious patterns detected

ClawHub Security flagged this skill as suspicious. Review the scan results before using.

Task Automation Workflows

Automate repetitive tasks with scripts, workflows, and schedules. Create efficient automation for file operations, data processing, API calls, and scheduled...

MIT-0 · Free to use, modify, and redistribute. No attribution required.
0 · 10 · 0 current installs · 0 all-time installs
MIT-0
Security Scan
VirusTotalVirusTotal
Benign
View report →
OpenClawOpenClaw
Suspicious
medium confidence
Purpose & Capability
The name/description match the content: the SKILL.md is a general-purpose automation cookbook (file ops, data processing, API calls, scheduling). However the instructions include code that imports third‑party Python packages (Pillow/PIL, pandas, requests, schedule) while the skill metadata declares no required binaries or packages. That omission is disproportionate: either the skill should declare these runtime dependencies or explicitly state it only provides examples.
!
Instruction Scope
The SKILL.md contains concrete code that performs filesystem modifications (os.rename, shutil.move, writing output files), long‑running schedulers (schedule loop, cron examples), and network calls (requests.post). Those are legitimate for automation, but they give the agent/runner the ability to modify local files and send data to external endpoints. The instructions do not constrain which directories or endpoints to use, nor do they warn about destructive operations or sensitive-data exfiltration risk.
Install Mechanism
No install spec is provided (instruction‑only), which is low risk in terms of supply chain. However because the examples rely on external Python packages, the lack of any declared dependency/install instructions means an operator might attempt to run snippets without providing required libraries — a usability and coherence gap.
Credentials
The skill declares no environment variables or credentials, which matches its examples that do not explicitly reference secret env vars. That said, many API examples implicitly require endpoints/keys (requests.post) and the skill does not document what secrets would be needed or how they should be provided, leaving room for accidental misconfiguration or insecure handling of credentials.
Persistence & Privilege
The skill does not request always:true, does not include install hooks, and does not claim to modify other skills or system agent configuration. Cron examples show how a user might persist a job, but persistence would be created manually by the user — the skill itself does not request elevated persistence or privileges.
What to consider before installing
This skill is essentially an examples cookbook for automations; it is not malicious but has some gaps you should be aware of before using it. Things to consider before installing or running these examples: 1) Dependencies: the examples use Pillow (PIL), pandas, requests, and schedule — ensure you install and verify these packages in a controlled environment. 2) Review code: the snippets perform destructive file operations (rename/move) and write files — review and test on a safe directory or with dry‑run logic first. 3) Network calls: batch API examples will send data to endpoints; never run them against production endpoints or with real credentials until you understand what will be transmitted. 4) Scheduling: cron examples create persistent jobs; add them only after validating scripts and paths. 5) Least privilege and sandboxing: run experiments in an isolated account/container and avoid giving this skill broad filesystem or credential access. 6) If you want the skill to be safer/coherent, ask the author to: declare required packages, list any environment variables/credentials needed, add warnings about destructive operations, and provide non‑destructive example modes (dry-run).

Like a lobster shell, security has layers — review code before you run it.

Current versionv1.0.0
Download zip
automationvk978g67p0bzqe42td1nzpnej5d830054batchvk978g67p0bzqe42td1nzpnej5d830054cronvk978g67p0bzqe42td1nzpnej5d830054latestvk978g67p0bzqe42td1nzpnej5d830054schedulingvk978g67p0bzqe42td1nzpnej5d830054scriptsvk978g67p0bzqe42td1nzpnej5d830054workflowvk978g67p0bzqe42td1nzpnej5d830054

License

MIT-0
Free to use, modify, and redistribute. No attribution required.

SKILL.md

Task Automation

Turn repetitive work into automated workflows. Save time, reduce errors, scale operations.

Automation Types

1. File Operations

Batch Rename:

import os
import re

def batch_rename(directory, pattern, replacement):
    """Rename files matching pattern"""
    for filename in os.listdir(directory):
        if re.match(pattern, filename):
            new_name = re.sub(pattern, replacement, filename)
            os.rename(
                os.path.join(directory, filename),
                os.path.join(directory, new_name)
            )
            print(f"Renamed: {filename} -> {new_name}")

Batch Convert:

from PIL import Image
import os

def convert_images(input_dir, output_dir, format='webp'):
    """Convert all images to format"""
    os.makedirs(output_dir, exist_ok=True)
    
    for filename in os.listdir(input_dir):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            img = Image.open(os.path.join(input_dir, filename))
            name = os.path.splitext(filename)[0]
            img.save(os.path.join(output_dir, f"{name}.{format}"), format.upper())
            print(f"Converted: {filename}")

Organize Files:

import os
import shutil

def organize_by_type(directory):
    """Move files into type folders"""
    extensions = {
        'images': ['.jpg', '.jpeg', '.png', '.gif', '.webp'],
        'documents': ['.pdf', '.doc', '.docx', '.txt', '.md'],
        'videos': ['.mp4', '.mov', '.avi', '.mkv'],
        'audio': ['.mp3', '.wav', '.flac'],
        'code': ['.py', '.js', '.ts', '.go', '.rs'],
    }
    
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)
        if os.path.isfile(filepath):
            ext = os.path.splitext(filename)[1].lower()
            for folder, exts in extensions.items():
                if ext in exts:
                    target = os.path.join(directory, folder)
                    os.makedirs(target, exist_ok=True)
                    shutil.move(filepath, os.path.join(target, filename))
                    print(f"Moved {filename} to {folder}/")
                    break

2. Data Processing

Batch Transform:

import pandas as pd

def process_csv_batch(input_dir, output_file, transform_func):
    """Process multiple CSVs and combine"""
    dfs = []
    
    for filename in os.listdir(input_dir):
        if filename.endswith('.csv'):
            df = pd.read_csv(os.path.join(input_dir, filename))
            df = transform_func(df)
            dfs.append(df)
    
    combined = pd.concat(dfs, ignore_index=True)
    combined.to_csv(output_file, index=False)
    print(f"Processed {len(dfs)} files into {output_file}")

Data Pipeline:

def create_pipeline(steps):
    """Create reusable data pipeline"""
    def pipeline(data):
        result = data
        for step in steps:
            result = step(result)
        return result
    return pipeline

# Example usage:
pipeline = create_pipeline([
    lambda x: x.dropna(),
    lambda x: x.drop_duplicates(),
    lambda x: x[x['value'] > 0],
    lambda x: x.sort_values('date')
])

clean_data = pipeline(raw_data)

3. API Operations

Rate-Limited API Client:

import time
from functools import wraps

def rate_limit(calls_per_second=2):
    """Decorator to rate limit API calls"""
    min_interval = 1.0 / calls_per_second
    last_call = [0.0]
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_call[0]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            last_call[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limit(2)  # 2 calls per second
def api_call(endpoint, data):
    return requests.post(endpoint, json=data)

Batch API Calls:

def batch_api_calls(items, endpoint, batch_size=100):
    """Process API calls in batches"""
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        
        # Process batch
        response = requests.post(endpoint, json={'items': batch})
        
        if response.status_code == 200:
            results.extend(response.json())
        else:
            print(f"Batch {i//batch_size} failed: {response.status_code}")
        
        time.sleep(1)  # Rate limiting
    
    return results

Retry with Backoff:

import time
import random

def retry_with_backoff(func, max_retries=3, base_delay=1):
    """Retry failed calls with exponential backoff"""
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = base_delay * (2 ** attempt) + random.random()
            print(f"Retry {attempt + 1}/{max_retries} in {delay:.1f}s: {e}")
            time.sleep(delay)

4. Scheduled Tasks

Cron Jobs:

# Every hour
0 * * * * /path/to/script.sh

# Every day at 9 AM
0 9 * * * /path/to/script.sh

# Every Monday at 9 AM
0 9 * * 1 /path/to/script.sh

# Every hour on weekdays
0 * * * 1-5 /path/to/script.sh

Python Scheduler:

import schedule
import time

def job():
    print("Running scheduled task...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("09:00").do(job)
schedule.every().monday.do(job)

while True:
    schedule.run_pending()
    time.sleep(60)

OpenClaw Cron:

openclaw cron add \
  --name "Daily Report" \
  --schedule "0 9 * * *" \
  --task "Generate daily report and send to slack"

Workflow Patterns

Sequential Pipeline

def sequential_workflow(steps):
    """Run steps in sequence"""
    results = []
    
    for i, step in enumerate(steps):
        try:
            result = step['action'](**step.get('params', {}))
            results.append({'step': i, 'status': 'success', 'result': result})
        except Exception as e:
            results.append({'step': i, 'status': 'error', 'error': str(e)})
            if step.get('stop_on_error', True):
                break
    
    return results

Parallel Execution

from concurrent.futures import ThreadPoolExecutor, as_completed

def parallel_workflow(tasks, max_workers=5):
    """Run tasks in parallel"""
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(task['action'], **task.get('params', {})): task 
                   for task in tasks}
        
        for future in as_completed(futures):
            task = futures[future]
            try:
                result = future.result()
                results.append({'task': task['name'], 'status': 'success', 'result': result})
            except Exception as e:
                results.append({'task': task['name'], 'status': 'error', 'error': str(e)})
    
    return results

Conditional Workflow

def conditional_workflow(steps):
    """Run steps based on conditions"""
    context = {}
    
    for step in steps:
        # Check condition
        if 'condition' in step:
            if not step['condition'](context):
                print(f"Skipping {step['name']}: condition not met")
                continue
        
        # Execute step
        result = step['action'](**step.get('params', {}), context=context)
        context[step['name']] = result
    
    return context

Error Handling

Graceful Degradation

def robust_operation(data, fallback=None):
    """Try operation with fallback"""
    try:
        return primary_operation(data)
    except SpecificError as e:
        print(f"Primary failed: {e}, trying fallback")
        return fallback_operation(data) if fallback else None
    except Exception as e:
        print(f"All options failed: {e}")
        return fallback

Error Notification

def notify_on_error(func, notify_func):
    """Decorator to notify on errors"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            notify_func(f"Error in {func.__name__}: {e}")
            raise
    return wrapper

@notify_on_error(send_slack_message)
def important_operation():
    # ...

Monitoring

Progress Tracking

from tqdm import tqdm

def process_with_progress(items):
    """Process items with progress bar"""
    results = []
    for item in tqdm(items, desc="Processing"):
        results.append(process(item))
    return results

Logging

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='automation.log'
)

logger = logging.getLogger(__name__)

def logged_operation(data):
    logger.info(f"Starting operation with {len(data)} items")
    try:
        result = process(data)
        logger.info(f"Operation completed: {len(result)} results")
        return result
    except Exception as e:
        logger.error(f"Operation failed: {e}")
        raise

Best Practices

1. Start Simple

Manual → Script → Scheduled → Monitored

Don't over-engineer. Start with a manual process, then automate.

2. Make Idempotent

def safe_operation(data):
    """Can be run multiple times safely"""
    # Check if already done
    if already_processed(data):
        return get_cached_result(data)
    
    # Process
    result = process(data)
    
    # Mark as done
    mark_processed(data)
    
    return result

3. Add Checkpoints

def long_running_workflow(data):
    """Save progress at checkpoints"""
    checkpoint_file = "workflow_checkpoint.json"
    
    # Load checkpoint if exists
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file) as f:
            state = json.load(f)
            start_from = state['step']
            data = state['data']
    else:
        start_from = 0
    
    # Process with checkpoints
    for i, step in enumerate(steps[start_from:], start=start_from):
        result = step(data)
        
        # Save checkpoint
        with open(checkpoint_file, 'w') as f:
            json.dump({'step': i + 1, 'data': result}, f)
    
    # Clean up
    os.remove(checkpoint_file)
    return result

4. Test Thoroughly

def test_automation():
    """Test automation with mock data"""
    test_data = create_mock_data()
    
    # Dry run
    result = automation(test_data, dry_run=True)
    
    # Validate
    assert result['status'] == 'success'
    assert len(result['output']) == expected_count
    
    print("All tests passed!")

5. Document Everything

def automated_task(config):
    """
    Process daily sales data and generate report.
    
    Args:
        config: Dict with keys:
            - input_dir: Directory with CSV files
            - output_file: Path for output report
            - notify: Email to notify on completion
    
    Returns:
        Dict with keys:
            - status: 'success' or 'error'
            - records_processed: Number of records
            - output_file: Path to generated report
    
    Example:
        result = automated_task({
            'input_dir': '/data/sales',
            'output_file': '/reports/daily.csv',
            'notify': 'team@company.com'
        })
    """
    # Implementation...

Common Use Cases

Daily Report Automation

def daily_report():
    # 1. Fetch data
    data = fetch_from_sources()
    
    # 2. Process
    processed = process_data(data)
    
    # 3. Generate report
    report = generate_report(processed)
    
    # 4. Distribute
    send_email(report)
    upload_to_slack(report)
    
    # 5. Archive
    archive_report(report)

Data Synchronization

def sync_data():
    # 1. Get last sync state
    last_sync = get_last_sync_time()
    
    # 2. Fetch changes
    changes = fetch_changes_since(last_sync)
    
    # 3. Apply changes
    for change in changes:
        apply_change(change)
    
    # 4. Update sync state
    update_sync_time()

Cleanup Automation

def cleanup():
    # 1. Remove old files
    remove_old_files(days=30)
    
    # 2. Clear temp directories
    clear_temp_dirs()
    
    # 3. Archive old logs
    archive_logs()
    
    # 4. Optimize database
    optimize_database()

Files

1 total
Select a file
Select a file to preview.

Comments

Loading comments…