Install
openclaw skills install etl-pipeline-generatorGenerate automated ETL pipelines for transforming and loading data into graph databases or knowledge graphs.
openclaw skills install etl-pipeline-generatorDesign automated Extract-Transform-Load pipelines for knowledge graph construction.
This skill generates structured ETL pipelines that move raw data from various sources into graph-ready datasets or knowledge graph storage systems. It orchestrates data ingestion, transformation, and loading steps so that messy input data becomes structured graph data.
Simple Customer Data Pipeline:
name: customer_graph_ingestion
description: Ingest customer data into knowledge graph
extract:
source_type: csv
source_path: data/customers.csv
format: csv
transform:
- normalize_entities
- detect_relationships
- validate_schema
load:
target: neo4j
database: customer_graph
method: bulk_import
Generated Pipeline Flow:
Extract CSV → Normalize Entities → Detect Relationships →
Validate Data → Load to Neo4j → Verify Load
Purpose: Retrieve raw data from external sources
Supported Sources:
Configuration:
extract:
source_type: csv|json|api|database|text|stream
location: /path/to/file or endpoint
format: format_specifier
authentication: credentials (if needed)
filtering: (optional) filter criteria
Purpose: Convert raw data into graph-ready structures
Transformation Operations:
Example Transform:
transform:
operations:
- normalize_entity_names
- infer_relationships_from_columns
- convert_dates_to_iso8601
- deduplicate_entities
- validate_required_fields
- enrich_with_external_data
Purpose: Load transformed data into target system
Supported Targets:
Configuration:
load:
target: neo4j|rdf|arangodb|tigergraph|property_graph|file
connection_params:
host: localhost
port: 7687
database: graph_database
method: bulk_import|streaming|batch
batch_size: 1000
Data Source Connectors:
CSV Extractor
- delimiter detection
- header parsing
- encoding handling
- chunk processing
JSON Extractor
- nested object handling
- array flattening
- schema inference
- streaming JSON support
API Extractor
- authentication handling
- pagination support
- rate limiting
- response transformation
Database Extractor
- SQL query execution
- connection pooling
- partition handling
- transaction management
Data Processing Operations:
Normalization
- case normalization
- whitespace trimming
- special character handling
- URL encoding
Type Conversion
- string → integer/float
- string → datetime
- string → boolean
- safe type conversions
Deduplication
- exact match detection
- fuzzy matching
- identity resolution
- merge strategies
Validation
- required field checks
- data type validation
- range validation
- pattern matching
Data Loading Methods:
Bulk Import
- CSV nodes/edges files
- Batch processing
- High throughput
- Transactional rollback
Streaming Load
- Real-time ingestion
- Event-based
- Lower latency
- Connection streaming
Batch Load
- Scheduled runs
- Configurable batch sizes
- Progress tracking
- Failure recovery
Sequential execution of all stages
Immediate feedback on success/failure
Best for: Small to medium datasets
Non-blocking pipeline execution
Async progress monitoring
Best for: Large datasets, scheduled runs
Continuous data flow
Real-time processing
Best for: Live data feeds, event streams
Cron-based or scheduled triggers
Repeatable, idempotent operations
Best for: Regular data refreshes
alerts:
- condition: execution_time > 3600s
action: notify_admin
- condition: error_count > 10
action: pause_pipeline
- condition: data_quality_score < 0.9
action: log_warning
Fail Fast: Stop on first error
Fail Safe: Continue with logging
Retry: Exponential backoff retry
Skip: Skip failed records, continue
Dead Letter: Route to error queue
Checkpoint/Resume: Save state, resume from checkpoint
Rollback: Undo on failure
Compensation: Execute cleanup actions
Retry with Backoff: Automatic retry logic
name: pipeline_name
description: pipeline description
extract:
source_type: csv
location: data/file.csv
transform:
operations:
- normalize_entities
- validate_data
load:
target: neo4j
database: my_graph
Extract(csv) → Parse → Normalize →
Validate → Deduplicate → Load(Neo4j) → Verify
def pipeline_execution():
data = extract_from_csv('data.csv')
data = normalize_entities(data)
data = validate_data(data)
load_to_neo4j(data)
return verify_load()
{
"pipeline_id": "customer_ingestion_001",
"start_time": "2024-04-09T10:00:00Z",
"end_time": "2024-04-09T10:05:30Z",
"duration_seconds": 330,
"stages": [
{"name": "extract", "duration": 45, "records": 10000},
{"name": "transform", "duration": 120, "records": 9900},
{"name": "load", "duration": 150, "records": 9900}
],
"status": "SUCCESS"
}
The ETL Pipeline Generator orchestrates these skills:
✓ Idempotency – Pipelines can be rerun safely ✓ Data Validation – Validate at each stage ✓ Error Handling – Graceful error management ✓ Monitoring – Track pipeline execution ✓ Documentation – Document data lineage ✓ Testing – Test with sample data first ✓ Version Control – Track pipeline changes ✓ Scalability – Design for data growth ✓ Security – Secure credentials and data ✓ Performance – Optimize transformation logic
See pipeline-patterns.md for detailed ETL pipeline patterns and example-pipelines.md for complete real-world pipeline examples.
Version: 1.0.0