Task Automation Workflows
Automate repetitive tasks with scripts, workflows, and schedules. Create efficient automation for file operations, data processing, API calls, and scheduled...
MIT-0 · Free to use, modify, and redistribute. No attribution required.
⭐ 0 · 10 · 0 current installs · 0 all-time installs
MIT-0
Security Scan
OpenClaw
Suspicious
medium confidencePurpose & Capability
The name/description match the content: the SKILL.md is a general-purpose automation cookbook (file ops, data processing, API calls, scheduling). However the instructions include code that imports third‑party Python packages (Pillow/PIL, pandas, requests, schedule) while the skill metadata declares no required binaries or packages. That omission is disproportionate: either the skill should declare these runtime dependencies or explicitly state it only provides examples.
Instruction Scope
The SKILL.md contains concrete code that performs filesystem modifications (os.rename, shutil.move, writing output files), long‑running schedulers (schedule loop, cron examples), and network calls (requests.post). Those are legitimate for automation, but they give the agent/runner the ability to modify local files and send data to external endpoints. The instructions do not constrain which directories or endpoints to use, nor do they warn about destructive operations or sensitive-data exfiltration risk.
Install Mechanism
No install spec is provided (instruction‑only), which is low risk in terms of supply chain. However because the examples rely on external Python packages, the lack of any declared dependency/install instructions means an operator might attempt to run snippets without providing required libraries — a usability and coherence gap.
Credentials
The skill declares no environment variables or credentials, which matches its examples that do not explicitly reference secret env vars. That said, many API examples implicitly require endpoints/keys (requests.post) and the skill does not document what secrets would be needed or how they should be provided, leaving room for accidental misconfiguration or insecure handling of credentials.
Persistence & Privilege
The skill does not request always:true, does not include install hooks, and does not claim to modify other skills or system agent configuration. Cron examples show how a user might persist a job, but persistence would be created manually by the user — the skill itself does not request elevated persistence or privileges.
What to consider before installing
This skill is essentially an examples cookbook for automations; it is not malicious but has some gaps you should be aware of before using it. Things to consider before installing or running these examples: 1) Dependencies: the examples use Pillow (PIL), pandas, requests, and schedule — ensure you install and verify these packages in a controlled environment. 2) Review code: the snippets perform destructive file operations (rename/move) and write files — review and test on a safe directory or with dry‑run logic first. 3) Network calls: batch API examples will send data to endpoints; never run them against production endpoints or with real credentials until you understand what will be transmitted. 4) Scheduling: cron examples create persistent jobs; add them only after validating scripts and paths. 5) Least privilege and sandboxing: run experiments in an isolated account/container and avoid giving this skill broad filesystem or credential access. 6) If you want the skill to be safer/coherent, ask the author to: declare required packages, list any environment variables/credentials needed, add warnings about destructive operations, and provide non‑destructive example modes (dry-run).Like a lobster shell, security has layers — review code before you run it.
Current versionv1.0.0
Download zipautomationbatchcronlatestschedulingscriptsworkflow
License
MIT-0
Free to use, modify, and redistribute. No attribution required.
SKILL.md
Task Automation
Turn repetitive work into automated workflows. Save time, reduce errors, scale operations.
Automation Types
1. File Operations
Batch Rename:
import os
import re
def batch_rename(directory, pattern, replacement):
"""Rename files matching pattern"""
for filename in os.listdir(directory):
if re.match(pattern, filename):
new_name = re.sub(pattern, replacement, filename)
os.rename(
os.path.join(directory, filename),
os.path.join(directory, new_name)
)
print(f"Renamed: {filename} -> {new_name}")
Batch Convert:
from PIL import Image
import os
def convert_images(input_dir, output_dir, format='webp'):
"""Convert all images to format"""
os.makedirs(output_dir, exist_ok=True)
for filename in os.listdir(input_dir):
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
img = Image.open(os.path.join(input_dir, filename))
name = os.path.splitext(filename)[0]
img.save(os.path.join(output_dir, f"{name}.{format}"), format.upper())
print(f"Converted: {filename}")
Organize Files:
import os
import shutil
def organize_by_type(directory):
"""Move files into type folders"""
extensions = {
'images': ['.jpg', '.jpeg', '.png', '.gif', '.webp'],
'documents': ['.pdf', '.doc', '.docx', '.txt', '.md'],
'videos': ['.mp4', '.mov', '.avi', '.mkv'],
'audio': ['.mp3', '.wav', '.flac'],
'code': ['.py', '.js', '.ts', '.go', '.rs'],
}
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
ext = os.path.splitext(filename)[1].lower()
for folder, exts in extensions.items():
if ext in exts:
target = os.path.join(directory, folder)
os.makedirs(target, exist_ok=True)
shutil.move(filepath, os.path.join(target, filename))
print(f"Moved {filename} to {folder}/")
break
2. Data Processing
Batch Transform:
import pandas as pd
def process_csv_batch(input_dir, output_file, transform_func):
"""Process multiple CSVs and combine"""
dfs = []
for filename in os.listdir(input_dir):
if filename.endswith('.csv'):
df = pd.read_csv(os.path.join(input_dir, filename))
df = transform_func(df)
dfs.append(df)
combined = pd.concat(dfs, ignore_index=True)
combined.to_csv(output_file, index=False)
print(f"Processed {len(dfs)} files into {output_file}")
Data Pipeline:
def create_pipeline(steps):
"""Create reusable data pipeline"""
def pipeline(data):
result = data
for step in steps:
result = step(result)
return result
return pipeline
# Example usage:
pipeline = create_pipeline([
lambda x: x.dropna(),
lambda x: x.drop_duplicates(),
lambda x: x[x['value'] > 0],
lambda x: x.sort_values('date')
])
clean_data = pipeline(raw_data)
3. API Operations
Rate-Limited API Client:
import time
from functools import wraps
def rate_limit(calls_per_second=2):
"""Decorator to rate limit API calls"""
min_interval = 1.0 / calls_per_second
last_call = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decorator
@rate_limit(2) # 2 calls per second
def api_call(endpoint, data):
return requests.post(endpoint, json=data)
Batch API Calls:
def batch_api_calls(items, endpoint, batch_size=100):
"""Process API calls in batches"""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Process batch
response = requests.post(endpoint, json={'items': batch})
if response.status_code == 200:
results.extend(response.json())
else:
print(f"Batch {i//batch_size} failed: {response.status_code}")
time.sleep(1) # Rate limiting
return results
Retry with Backoff:
import time
import random
def retry_with_backoff(func, max_retries=3, base_delay=1):
"""Retry failed calls with exponential backoff"""
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.random()
print(f"Retry {attempt + 1}/{max_retries} in {delay:.1f}s: {e}")
time.sleep(delay)
4. Scheduled Tasks
Cron Jobs:
# Every hour
0 * * * * /path/to/script.sh
# Every day at 9 AM
0 9 * * * /path/to/script.sh
# Every Monday at 9 AM
0 9 * * 1 /path/to/script.sh
# Every hour on weekdays
0 * * * 1-5 /path/to/script.sh
Python Scheduler:
import schedule
import time
def job():
print("Running scheduled task...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("09:00").do(job)
schedule.every().monday.do(job)
while True:
schedule.run_pending()
time.sleep(60)
OpenClaw Cron:
openclaw cron add \
--name "Daily Report" \
--schedule "0 9 * * *" \
--task "Generate daily report and send to slack"
Workflow Patterns
Sequential Pipeline
def sequential_workflow(steps):
"""Run steps in sequence"""
results = []
for i, step in enumerate(steps):
try:
result = step['action'](**step.get('params', {}))
results.append({'step': i, 'status': 'success', 'result': result})
except Exception as e:
results.append({'step': i, 'status': 'error', 'error': str(e)})
if step.get('stop_on_error', True):
break
return results
Parallel Execution
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_workflow(tasks, max_workers=5):
"""Run tasks in parallel"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(task['action'], **task.get('params', {})): task
for task in tasks}
for future in as_completed(futures):
task = futures[future]
try:
result = future.result()
results.append({'task': task['name'], 'status': 'success', 'result': result})
except Exception as e:
results.append({'task': task['name'], 'status': 'error', 'error': str(e)})
return results
Conditional Workflow
def conditional_workflow(steps):
"""Run steps based on conditions"""
context = {}
for step in steps:
# Check condition
if 'condition' in step:
if not step['condition'](context):
print(f"Skipping {step['name']}: condition not met")
continue
# Execute step
result = step['action'](**step.get('params', {}), context=context)
context[step['name']] = result
return context
Error Handling
Graceful Degradation
def robust_operation(data, fallback=None):
"""Try operation with fallback"""
try:
return primary_operation(data)
except SpecificError as e:
print(f"Primary failed: {e}, trying fallback")
return fallback_operation(data) if fallback else None
except Exception as e:
print(f"All options failed: {e}")
return fallback
Error Notification
def notify_on_error(func, notify_func):
"""Decorator to notify on errors"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
notify_func(f"Error in {func.__name__}: {e}")
raise
return wrapper
@notify_on_error(send_slack_message)
def important_operation():
# ...
Monitoring
Progress Tracking
from tqdm import tqdm
def process_with_progress(items):
"""Process items with progress bar"""
results = []
for item in tqdm(items, desc="Processing"):
results.append(process(item))
return results
Logging
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='automation.log'
)
logger = logging.getLogger(__name__)
def logged_operation(data):
logger.info(f"Starting operation with {len(data)} items")
try:
result = process(data)
logger.info(f"Operation completed: {len(result)} results")
return result
except Exception as e:
logger.error(f"Operation failed: {e}")
raise
Best Practices
1. Start Simple
Manual → Script → Scheduled → Monitored
Don't over-engineer. Start with a manual process, then automate.
2. Make Idempotent
def safe_operation(data):
"""Can be run multiple times safely"""
# Check if already done
if already_processed(data):
return get_cached_result(data)
# Process
result = process(data)
# Mark as done
mark_processed(data)
return result
3. Add Checkpoints
def long_running_workflow(data):
"""Save progress at checkpoints"""
checkpoint_file = "workflow_checkpoint.json"
# Load checkpoint if exists
if os.path.exists(checkpoint_file):
with open(checkpoint_file) as f:
state = json.load(f)
start_from = state['step']
data = state['data']
else:
start_from = 0
# Process with checkpoints
for i, step in enumerate(steps[start_from:], start=start_from):
result = step(data)
# Save checkpoint
with open(checkpoint_file, 'w') as f:
json.dump({'step': i + 1, 'data': result}, f)
# Clean up
os.remove(checkpoint_file)
return result
4. Test Thoroughly
def test_automation():
"""Test automation with mock data"""
test_data = create_mock_data()
# Dry run
result = automation(test_data, dry_run=True)
# Validate
assert result['status'] == 'success'
assert len(result['output']) == expected_count
print("All tests passed!")
5. Document Everything
def automated_task(config):
"""
Process daily sales data and generate report.
Args:
config: Dict with keys:
- input_dir: Directory with CSV files
- output_file: Path for output report
- notify: Email to notify on completion
Returns:
Dict with keys:
- status: 'success' or 'error'
- records_processed: Number of records
- output_file: Path to generated report
Example:
result = automated_task({
'input_dir': '/data/sales',
'output_file': '/reports/daily.csv',
'notify': 'team@company.com'
})
"""
# Implementation...
Common Use Cases
Daily Report Automation
def daily_report():
# 1. Fetch data
data = fetch_from_sources()
# 2. Process
processed = process_data(data)
# 3. Generate report
report = generate_report(processed)
# 4. Distribute
send_email(report)
upload_to_slack(report)
# 5. Archive
archive_report(report)
Data Synchronization
def sync_data():
# 1. Get last sync state
last_sync = get_last_sync_time()
# 2. Fetch changes
changes = fetch_changes_since(last_sync)
# 3. Apply changes
for change in changes:
apply_change(change)
# 4. Update sync state
update_sync_time()
Cleanup Automation
def cleanup():
# 1. Remove old files
remove_old_files(days=30)
# 2. Clear temp directories
clear_temp_dirs()
# 3. Archive old logs
archive_logs()
# 4. Optimize database
optimize_database()
Files
1 totalSelect a file
Select a file to preview.
Comments
Loading comments…
