{"skill":{"slug":"task-automation-workflows","displayName":"Task Automation Workflows","summary":"Automate repetitive tasks with scripts, workflows, and schedules. Create efficient automation for file operations, data processing, API calls, and scheduled...","description":"---\nname: task-automation\ndescription: Automate repetitive tasks with scripts, workflows, and schedules. Create efficient automation for file operations, data processing, API calls, and scheduled jobs. Use when automating repetitive work, creating workflows, or scheduling tasks. Triggers on \"automate\", \"workflow\", \"schedule\", \"repetitive\", \"batch\", \"cron\".\n---\n\n# Task Automation\n\nTurn repetitive work into automated workflows. Save time, reduce errors, scale operations.\n\n## Automation Types\n\n### 1. File Operations\n\n**Batch Rename:**\n```python\nimport os\nimport re\n\ndef batch_rename(directory, pattern, replacement):\n    \"\"\"Rename files matching pattern\"\"\"\n    for filename in os.listdir(directory):\n        if re.match(pattern, filename):\n            new_name = re.sub(pattern, replacement, filename)\n            os.rename(\n                os.path.join(directory, filename),\n                os.path.join(directory, new_name)\n            )\n            print(f\"Renamed: {filename} -> {new_name}\")\n```\n\n**Batch Convert:**\n```python\nfrom PIL import Image\nimport os\n\ndef convert_images(input_dir, output_dir, format='webp'):\n    \"\"\"Convert all images to format\"\"\"\n    os.makedirs(output_dir, exist_ok=True)\n    \n    for filename in os.listdir(input_dir):\n        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):\n            img = Image.open(os.path.join(input_dir, filename))\n            name = os.path.splitext(filename)[0]\n            img.save(os.path.join(output_dir, f\"{name}.{format}\"), format.upper())\n            print(f\"Converted: {filename}\")\n```\n\n**Organize Files:**\n```python\nimport os\nimport shutil\n\ndef organize_by_type(directory):\n    \"\"\"Move files into type folders\"\"\"\n    extensions = {\n        'images': ['.jpg', '.jpeg', '.png', '.gif', '.webp'],\n        'documents': ['.pdf', '.doc', '.docx', '.txt', '.md'],\n        'videos': ['.mp4', '.mov', '.avi', '.mkv'],\n        'audio': ['.mp3', '.wav', '.flac'],\n        'code': ['.py', '.js', '.ts', '.go', '.rs'],\n    }\n    \n    for filename in os.listdir(directory):\n        filepath = os.path.join(directory, filename)\n        if os.path.isfile(filepath):\n            ext = os.path.splitext(filename)[1].lower()\n            for folder, exts in extensions.items():\n                if ext in exts:\n                    target = os.path.join(directory, folder)\n                    os.makedirs(target, exist_ok=True)\n                    shutil.move(filepath, os.path.join(target, filename))\n                    print(f\"Moved {filename} to {folder}/\")\n                    break\n```\n\n### 2. Data Processing\n\n**Batch Transform:**\n```python\nimport pandas as pd\n\ndef process_csv_batch(input_dir, output_file, transform_func):\n    \"\"\"Process multiple CSVs and combine\"\"\"\n    dfs = []\n    \n    for filename in os.listdir(input_dir):\n        if filename.endswith('.csv'):\n            df = pd.read_csv(os.path.join(input_dir, filename))\n            df = transform_func(df)\n            dfs.append(df)\n    \n    combined = pd.concat(dfs, ignore_index=True)\n    combined.to_csv(output_file, index=False)\n    print(f\"Processed {len(dfs)} files into {output_file}\")\n```\n\n**Data Pipeline:**\n```python\ndef create_pipeline(steps):\n    \"\"\"Create reusable data pipeline\"\"\"\n    def pipeline(data):\n        result = data\n        for step in steps:\n            result = step(result)\n        return result\n    return pipeline\n\n# Example usage:\npipeline = create_pipeline([\n    lambda x: x.dropna(),\n    lambda x: x.drop_duplicates(),\n    lambda x: x[x['value'] > 0],\n    lambda x: x.sort_values('date')\n])\n\nclean_data = pipeline(raw_data)\n```\n\n### 3. API Operations\n\n**Rate-Limited API Client:**\n```python\nimport time\nfrom functools import wraps\n\ndef rate_limit(calls_per_second=2):\n    \"\"\"Decorator to rate limit API calls\"\"\"\n    min_interval = 1.0 / calls_per_second\n    last_call = [0.0]\n    \n    def decorator(func):\n        @wraps(func)\n        def wrapper(*args, **kwargs):\n            elapsed = time.time() - last_call[0]\n            if elapsed < min_interval:\n                time.sleep(min_interval - elapsed)\n            last_call[0] = time.time()\n            return func(*args, **kwargs)\n        return wrapper\n    return decorator\n\n@rate_limit(2)  # 2 calls per second\ndef api_call(endpoint, data):\n    return requests.post(endpoint, json=data)\n```\n\n**Batch API Calls:**\n```python\ndef batch_api_calls(items, endpoint, batch_size=100):\n    \"\"\"Process API calls in batches\"\"\"\n    results = []\n    \n    for i in range(0, len(items), batch_size):\n        batch = items[i:i + batch_size]\n        \n        # Process batch\n        response = requests.post(endpoint, json={'items': batch})\n        \n        if response.status_code == 200:\n            results.extend(response.json())\n        else:\n            print(f\"Batch {i//batch_size} failed: {response.status_code}\")\n        \n        time.sleep(1)  # Rate limiting\n    \n    return results\n```\n\n**Retry with Backoff:**\n```python\nimport time\nimport random\n\ndef retry_with_backoff(func, max_retries=3, base_delay=1):\n    \"\"\"Retry failed calls with exponential backoff\"\"\"\n    for attempt in range(max_retries):\n        try:\n            return func()\n        except Exception as e:\n            if attempt == max_retries - 1:\n                raise\n            \n            delay = base_delay * (2 ** attempt) + random.random()\n            print(f\"Retry {attempt + 1}/{max_retries} in {delay:.1f}s: {e}\")\n            time.sleep(delay)\n```\n\n### 4. Scheduled Tasks\n\n**Cron Jobs:**\n```bash\n# Every hour\n0 * * * * /path/to/script.sh\n\n# Every day at 9 AM\n0 9 * * * /path/to/script.sh\n\n# Every Monday at 9 AM\n0 9 * * 1 /path/to/script.sh\n\n# Every hour on weekdays\n0 * * * 1-5 /path/to/script.sh\n```\n\n**Python Scheduler:**\n```python\nimport schedule\nimport time\n\ndef job():\n    print(\"Running scheduled task...\")\n\nschedule.every(10).minutes.do(job)\nschedule.every().hour.do(job)\nschedule.every().day.at(\"09:00\").do(job)\nschedule.every().monday.do(job)\n\nwhile True:\n    schedule.run_pending()\n    time.sleep(60)\n```\n\n**OpenClaw Cron:**\n```bash\nopenclaw cron add \\\n  --name \"Daily Report\" \\\n  --schedule \"0 9 * * *\" \\\n  --task \"Generate daily report and send to slack\"\n```\n\n## Workflow Patterns\n\n### Sequential Pipeline\n\n```python\ndef sequential_workflow(steps):\n    \"\"\"Run steps in sequence\"\"\"\n    results = []\n    \n    for i, step in enumerate(steps):\n        try:\n            result = step['action'](**step.get('params', {}))\n            results.append({'step': i, 'status': 'success', 'result': result})\n        except Exception as e:\n            results.append({'step': i, 'status': 'error', 'error': str(e)})\n            if step.get('stop_on_error', True):\n                break\n    \n    return results\n```\n\n### Parallel Execution\n\n```python\nfrom concurrent.futures import ThreadPoolExecutor, as_completed\n\ndef parallel_workflow(tasks, max_workers=5):\n    \"\"\"Run tasks in parallel\"\"\"\n    results = []\n    \n    with ThreadPoolExecutor(max_workers=max_workers) as executor:\n        futures = {executor.submit(task['action'], **task.get('params', {})): task \n                   for task in tasks}\n        \n        for future in as_completed(futures):\n            task = futures[future]\n            try:\n                result = future.result()\n                results.append({'task': task['name'], 'status': 'success', 'result': result})\n            except Exception as e:\n                results.append({'task': task['name'], 'status': 'error', 'error': str(e)})\n    \n    return results\n```\n\n### Conditional Workflow\n\n```python\ndef conditional_workflow(steps):\n    \"\"\"Run steps based on conditions\"\"\"\n    context = {}\n    \n    for step in steps:\n        # Check condition\n        if 'condition' in step:\n            if not step['condition'](context):\n                print(f\"Skipping {step['name']}: condition not met\")\n                continue\n        \n        # Execute step\n        result = step['action'](**step.get('params', {}), context=context)\n        context[step['name']] = result\n    \n    return context\n```\n\n## Error Handling\n\n### Graceful Degradation\n\n```python\ndef robust_operation(data, fallback=None):\n    \"\"\"Try operation with fallback\"\"\"\n    try:\n        return primary_operation(data)\n    except SpecificError as e:\n        print(f\"Primary failed: {e}, trying fallback\")\n        return fallback_operation(data) if fallback else None\n    except Exception as e:\n        print(f\"All options failed: {e}\")\n        return fallback\n```\n\n### Error Notification\n\n```python\ndef notify_on_error(func, notify_func):\n    \"\"\"Decorator to notify on errors\"\"\"\n    @wraps(func)\n    def wrapper(*args, **kwargs):\n        try:\n            return func(*args, **kwargs)\n        except Exception as e:\n            notify_func(f\"Error in {func.__name__}: {e}\")\n            raise\n    return wrapper\n\n@notify_on_error(send_slack_message)\ndef important_operation():\n    # ...\n```\n\n## Monitoring\n\n### Progress Tracking\n\n```python\nfrom tqdm import tqdm\n\ndef process_with_progress(items):\n    \"\"\"Process items with progress bar\"\"\"\n    results = []\n    for item in tqdm(items, desc=\"Processing\"):\n        results.append(process(item))\n    return results\n```\n\n### Logging\n\n```python\nimport logging\n\nlogging.basicConfig(\n    level=logging.INFO,\n    format='%(asctime)s - %(levelname)s - %(message)s',\n    filename='automation.log'\n)\n\nlogger = logging.getLogger(__name__)\n\ndef logged_operation(data):\n    logger.info(f\"Starting operation with {len(data)} items\")\n    try:\n        result = process(data)\n        logger.info(f\"Operation completed: {len(result)} results\")\n        return result\n    except Exception as e:\n        logger.error(f\"Operation failed: {e}\")\n        raise\n```\n\n## Best Practices\n\n### 1. Start Simple\n\n```\nManual → Script → Scheduled → Monitored\n```\n\nDon't over-engineer. Start with a manual process, then automate.\n\n### 2. Make Idempotent\n\n```python\ndef safe_operation(data):\n    \"\"\"Can be run multiple times safely\"\"\"\n    # Check if already done\n    if already_processed(data):\n        return get_cached_result(data)\n    \n    # Process\n    result = process(data)\n    \n    # Mark as done\n    mark_processed(data)\n    \n    return result\n```\n\n### 3. Add Checkpoints\n\n```python\ndef long_running_workflow(data):\n    \"\"\"Save progress at checkpoints\"\"\"\n    checkpoint_file = \"workflow_checkpoint.json\"\n    \n    # Load checkpoint if exists\n    if os.path.exists(checkpoint_file):\n        with open(checkpoint_file) as f:\n            state = json.load(f)\n            start_from = state['step']\n            data = state['data']\n    else:\n        start_from = 0\n    \n    # Process with checkpoints\n    for i, step in enumerate(steps[start_from:], start=start_from):\n        result = step(data)\n        \n        # Save checkpoint\n        with open(checkpoint_file, 'w') as f:\n            json.dump({'step': i + 1, 'data': result}, f)\n    \n    # Clean up\n    os.remove(checkpoint_file)\n    return result\n```\n\n### 4. Test Thoroughly\n\n```python\ndef test_automation():\n    \"\"\"Test automation with mock data\"\"\"\n    test_data = create_mock_data()\n    \n    # Dry run\n    result = automation(test_data, dry_run=True)\n    \n    # Validate\n    assert result['status'] == 'success'\n    assert len(result['output']) == expected_count\n    \n    print(\"All tests passed!\")\n```\n\n### 5. Document Everything\n\n```python\ndef automated_task(config):\n    \"\"\"\n    Process daily sales data and generate report.\n    \n    Args:\n        config: Dict with keys:\n            - input_dir: Directory with CSV files\n            - output_file: Path for output report\n            - notify: Email to notify on completion\n    \n    Returns:\n        Dict with keys:\n            - status: 'success' or 'error'\n            - records_processed: Number of records\n            - output_file: Path to generated report\n    \n    Example:\n        result = automated_task({\n            'input_dir': '/data/sales',\n            'output_file': '/reports/daily.csv',\n            'notify': 'team@company.com'\n        })\n    \"\"\"\n    # Implementation...\n```\n\n## Common Use Cases\n\n### Daily Report Automation\n\n```python\ndef daily_report():\n    # 1. Fetch data\n    data = fetch_from_sources()\n    \n    # 2. Process\n    processed = process_data(data)\n    \n    # 3. Generate report\n    report = generate_report(processed)\n    \n    # 4. Distribute\n    send_email(report)\n    upload_to_slack(report)\n    \n    # 5. Archive\n    archive_report(report)\n```\n\n### Data Synchronization\n\n```python\ndef sync_data():\n    # 1. Get last sync state\n    last_sync = get_last_sync_time()\n    \n    # 2. Fetch changes\n    changes = fetch_changes_since(last_sync)\n    \n    # 3. Apply changes\n    for change in changes:\n        apply_change(change)\n    \n    # 4. Update sync state\n    update_sync_time()\n```\n\n### Cleanup Automation\n\n```python\ndef cleanup():\n    # 1. Remove old files\n    remove_old_files(days=30)\n    \n    # 2. Clear temp directories\n    clear_temp_dirs()\n    \n    # 3. Archive old logs\n    archive_logs()\n    \n    # 4. Optimize database\n    optimize_database()\n```\n","topics":["Data Processing","Automate Workflow","Batch","Cron"],"tags":{"automation":"1.0.0","batch":"1.0.0","cron":"1.0.0","latest":"1.0.0","scheduling":"1.0.0","scripts":"1.0.0","workflow":"1.0.0"},"stats":{"comments":0,"downloads":1120,"installsAllTime":43,"installsCurrent":2,"stars":0,"versions":1},"createdAt":1773678504372,"updatedAt":1778491953611},"latestVersion":{"version":"1.0.0","createdAt":1773678504372,"changelog":"Initial release. Automate repetitive tasks with scripts, workflows, and schedules. File operations, data processing, API calls, and scheduled jobs.","license":"MIT-0"},"metadata":null,"owner":{"handle":"engsathiago","userId":"s1748ca82aw6dtzhdqmtcdaj1h83fxeq","displayName":"engsathiago","image":"https://avatars.githubusercontent.com/u/40508117?v=4"},"moderation":null}