Install
openclaw skills install sql-query-generatorGenerate secure SQL queries with validation, pagination helpers, risk analysis, and audit-focused safeguards.
openclaw skills install sql-query-generatorThis skill enables AI agents to generate accurate, optimized SQL queries from natural language descriptions. It supports multiple database systems and follows best practices for query construction, security, and performance.
# Clone or download the repository
git clone https://github.com/yourusername/sql-query-generator.git
cd sql-query-generator
# No external dependencies required for core functionality
python sql_query_generator.py
# Copy sql_query_generator.py to your project
cp sql_query_generator.py /path/to/your/project/
# Import in your code
from sql_query_generator import SQLQueryGenerator, DatabaseType
For AI agents using this skill:
Install only the drivers you need:
# PostgreSQL
pip install psycopg2-binary
# MySQL
pip install mysql-connector-python
# SQL Server
pip install pyodbc
# Oracle
pip install cx_Oracle
# For testing and development
pip install pytest pytest-cov
When generating SQL queries, follow these steps:
Understand the Request
Generate Base Query
-- Example structure
SELECT
column1,
column2,
aggregate_function(column3) AS alias
FROM
table1
JOIN
table2 ON table1.id = table2.foreign_id
WHERE
condition1 = value1
AND condition2 > value2
GROUP BY
column1, column2
HAVING
aggregate_condition
ORDER BY
column1 DESC
LIMIT 100;
Apply Security Measures
-- Natural language: "Get all users who registered after January 1, 2024"
SELECT
id,
username,
email,
registration_date
FROM
users
WHERE
registration_date > $1 -- Parameterized
ORDER BY
registration_date DESC;
-- Natural language: "Show total orders by customer in 2024"
SELECT
c.customer_name,
c.email,
COUNT(o.order_id) AS total_orders,
SUM(o.total_amount) AS total_spent
FROM
customers c
INNER JOIN
orders o ON c.customer_id = o.customer_id
WHERE
EXTRACT(YEAR FROM o.order_date) = $1
GROUP BY
c.customer_id,
c.customer_name,
c.email
HAVING
COUNT(o.order_id) > 5
ORDER BY
total_spent DESC;
-- Natural language: "Find products with above-average prices"
SELECT
product_name,
price,
category
FROM
products
WHERE
price > (
SELECT AVG(price)
FROM products
)
ORDER BY
price DESC;
-- Natural language: "Get top 3 products per category by sales"
WITH product_sales AS (
SELECT
p.product_id,
p.product_name,
p.category_id,
c.category_name,
SUM(oi.quantity * oi.unit_price) AS total_sales,
ROW_NUMBER() OVER (
PARTITION BY p.category_id
ORDER BY SUM(oi.quantity * oi.unit_price) DESC
) AS rank_in_category
FROM
products p
JOIN
order_items oi ON p.product_id = oi.product_id
JOIN
categories c ON p.category_id = c.category_id
GROUP BY
p.product_id,
p.product_name,
p.category_id,
c.category_name
)
SELECT
category_name,
product_name,
total_sales,
rank_in_category
FROM
product_sales
WHERE
rank_in_category <= 3
ORDER BY
category_name,
rank_in_category;
-- Natural language: "Show running total of sales per day"
SELECT
sale_date,
daily_total,
SUM(daily_total) OVER (
ORDER BY sale_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total,
AVG(daily_total) OVER (
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_average_7days
FROM (
SELECT
DATE(order_date) AS sale_date,
SUM(total_amount) AS daily_total
FROM
orders
GROUP BY
DATE(order_date)
) daily_sales
ORDER BY
sale_date;
THESE RULES ARE NON-NEGOTIABLE AND MUST ALWAYS BE FOLLOWED:
NEVER CONCATENATE USER INPUT INTO SQL
# WRONG - CRITICAL SECURITY VULNERABILITY
query = f"SELECT * FROM users WHERE username = '{user_input}'"
# CORRECT - Always use parameters
query = "SELECT * FROM users WHERE username = %s"
cursor.execute(query, (user_input,))
ALL VALUES MUST BE PARAMETERIZED
VALIDATE AND SANITIZE ALL INPUTS
# Whitelist validation
VALID_STATUSES = ['active', 'inactive', 'pending']
if status not in VALID_STATUSES:
raise ValueError("Invalid status")
# Type validation
if not isinstance(user_id, int):
raise TypeError("user_id must be integer")
# Length validation
if len(username) > 50:
raise ValueError("username too long")
ESCAPE DYNAMIC IDENTIFIERS PROPERLY
from psycopg2 import sql
# For table/column names that must be dynamic
query = sql.SQL("SELECT * FROM {} WHERE id = %s").format(
sql.Identifier(table_name)
)
cursor.execute(query, (user_id,))
import re
from typing import Any, List, Optional
class SQLInputValidator:
"""Comprehensive input validation for SQL queries"""
@staticmethod
def validate_identifier(identifier: str, max_length: int = 63) -> str:
"""Validate table/column names"""
# Check length
if len(identifier) > max_length:
raise ValueError(f"Identifier too long: {len(identifier)} > {max_length}")
# Only alphanumeric and underscore
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
raise ValueError(f"Invalid identifier: {identifier}")
# Prevent SQL keywords as identifiers
SQL_KEYWORDS = {
'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE',
'ALTER', 'TRUNCATE', 'UNION', 'JOIN', 'WHERE', 'FROM'
}
if identifier.upper() in SQL_KEYWORDS:
raise ValueError(f"SQL keyword not allowed: {identifier}")
return identifier
@staticmethod
def validate_integer(value: Any, min_val: Optional[int] = None,
max_val: Optional[int] = None) -> int:
"""Validate integer values"""
try:
int_value = int(value)
except (ValueError, TypeError):
raise ValueError(f"Invalid integer: {value}")
if min_val is not None and int_value < min_val:
raise ValueError(f"Value {int_value} below minimum {min_val}")
if max_val is not None and int_value > max_val:
raise ValueError(f"Value {int_value} above maximum {max_val}")
return int_value
@staticmethod
def validate_string(value: str, max_length: int = 255,
allow_empty: bool = False) -> str:
"""Validate string values"""
if not isinstance(value, str):
raise TypeError("Value must be string")
if not allow_empty and len(value) == 0:
raise ValueError("Empty string not allowed")
if len(value) > max_length:
raise ValueError(f"String too long: {len(value)} > {max_length}")
# Check for null bytes
if '\x00' in value:
raise ValueError("Null bytes not allowed in string")
return value
@staticmethod
def validate_email(email: str) -> str:
"""Validate email format"""
email = SQLInputValidator.validate_string(email, max_length=254)
# Basic email validation
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
raise ValueError(f"Invalid email format: {email}")
return email
@staticmethod
def validate_date(date_str: str) -> str:
"""Validate date format (YYYY-MM-DD)"""
if not re.match(r'^\d{4}-\d{2}-\d{2}$', date_str):
raise ValueError(f"Invalid date format: {date_str}")
return date_str
@staticmethod
def validate_enum(value: str, allowed_values: List[str]) -> str:
"""Validate value against whitelist"""
if value not in allowed_values:
raise ValueError(f"Invalid value: {value}. Allowed: {allowed_values}")
return value
# Detect common SQL injection patterns
INJECTION_PATTERNS = [
r"('|(\\')|(--)|(\#)|(%23)|(;))", # Basic SQL injection
r"((\%27)|(\'))", # Single quote variations
r"(union.*select)", # UNION-based injection
r"(insert.*into)", # INSERT injection
r"(update.*set)", # UPDATE injection
r"(delete.*from)", # DELETE injection
r"(drop.*table)", # DROP TABLE
r"(exec(\s|\+)+(s|x)p\w+)", # Stored procedure execution
r"(script.*>)", # XSS attempts
]
def detect_injection_attempt(value: str) -> bool:
"""Detect potential SQL injection attempts"""
value_lower = value.lower()
for pattern in INJECTION_PATTERNS:
if re.search(pattern, value_lower):
return True
return False
class SecureQueryBuilder:
"""Build SQL queries with mandatory security checks"""
def __init__(self, db_type: DatabaseType):
self.db_type = db_type
self.validator = SQLInputValidator()
self.params = []
def build_select(self, table: str, columns: List[str],
conditions: dict) -> tuple:
"""Build SELECT query with validation"""
# Validate table name
table = self.validator.validate_identifier(table)
# Validate columns
validated_columns = [
self.validator.validate_identifier(col)
for col in columns
]
# Build query
query = f"SELECT {', '.join(validated_columns)} FROM {table}"
# Add WHERE clause with parameters
if conditions:
where_parts = []
for key, value in conditions.items():
key = self.validator.validate_identifier(key)
where_parts.append(f"{key} = %s")
self.params.append(value)
query += " WHERE " + " AND ".join(where_parts)
return query, tuple(self.params)
import ssl
from typing import Optional
class SecureConnection:
"""Secure database connection configuration"""
@staticmethod
def get_postgresql_ssl_config() -> dict:
"""PostgreSQL SSL configuration"""
return {
'sslmode': 'require', # or 'verify-full' for production
'sslrootcert': '/path/to/ca-cert.pem',
'sslcert': '/path/to/client-cert.pem',
'sslkey': '/path/to/client-key.pem'
}
@staticmethod
def get_connection_timeout() -> dict:
"""Connection timeout settings"""
return {
'connect_timeout': 10,
'command_timeout': 30,
'keepalives': 1,
'keepalives_idle': 30,
'keepalives_interval': 10,
'keepalives_count': 5
}
@staticmethod
def create_secure_connection(database_url: str) -> Any:
"""Create connection with security settings"""
import psycopg2
# Parse connection string securely
# NEVER log the connection string (contains credentials)
conn = psycopg2.connect(
database_url,
**SecureConnection.get_postgresql_ssl_config(),
**SecureConnection.get_connection_timeout()
)
# Set session security parameters
cursor = conn.cursor()
cursor.execute("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE")
cursor.execute("SET statement_timeout = 30000") # 30 seconds
cursor.close()
return conn
import time
from collections import defaultdict
from threading import Lock
class RateLimiter:
"""Prevent query flooding attacks"""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
self.lock = Lock()
def is_allowed(self, identifier: str) -> bool:
"""Check if request is allowed"""
with self.lock:
now = time.time()
window_start = now - self.window_seconds
# Clean old requests
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if req_time > window_start
]
# Check limit
if len(self.requests[identifier]) >= self.max_requests:
return False
# Add new request
self.requests[identifier].append(now)
return True
import logging
import json
from datetime import datetime
from typing import Any, Dict
class SecurityAuditLogger:
"""Log all database operations for security auditing"""
def __init__(self, log_file: str = '/var/log/sql_audit.log'):
self.logger = logging.getLogger('sql_audit')
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_query(self, query: str, params: tuple, user_id: str,
ip_address: str, result_count: int = None):
"""Log query execution"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'ip_address': ip_address,
'query': query,
'param_count': len(params),
'result_count': result_count
}
self.logger.info(json.dumps(log_entry))
def log_security_event(self, event_type: str, details: Dict[str, Any],
severity: str = 'WARNING'):
"""Log security events"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'severity': severity,
'details': details
}
if severity == 'CRITICAL':
self.logger.critical(json.dumps(log_entry))
elif severity == 'ERROR':
self.logger.error(json.dumps(log_entry))
else:
self.logger.warning(json.dumps(log_entry))
from typing import Dict, Any
import hashlib
class PreparedStatementPool:
"""Reuse prepared statements for better performance and security"""
def __init__(self, connection):
self.connection = connection
self.statements: Dict[str, Any] = {}
def get_statement(self, query: str):
"""Get or create prepared statement"""
# Create hash of query for lookup
query_hash = hashlib.sha256(query.encode()).hexdigest()[:16]
if query_hash not in self.statements:
# Create new prepared statement
cursor = self.connection.cursor()
statement_name = f"stmt_{query_hash}"
cursor.execute(f"PREPARE {statement_name} AS {query}")
self.statements[query_hash] = statement_name
return self.statements[query_hash]
def execute(self, query: str, params: tuple):
"""Execute using prepared statement"""
stmt_name = self.get_statement(query)
cursor = self.connection.cursor()
param_list = ', '.join(['%s'] * len(params))
cursor.execute(f"EXECUTE {stmt_name}({param_list})", params)
return cursor
PostgreSQL/Python (psycopg2)
# CORRECT - Parameterized
cursor.execute(
"SELECT * FROM users WHERE email = %s AND status = %s",
(user_email, status)
)
# WRONG - String concatenation (SQL injection risk)
cursor.execute(
f"SELECT * FROM users WHERE email = '{user_email}'"
)
MySQL/Python (mysql-connector)
# CORRECT
cursor.execute(
"SELECT * FROM products WHERE price > %s",
(min_price,)
)
SQLite/Python
# CORRECT
cursor.execute(
"SELECT * FROM orders WHERE order_date > ?",
(start_date,)
)
Node.js (PostgreSQL)
// CORRECT
const result = await client.query(
'SELECT * FROM users WHERE id = $1',
[userId]
);
PostgreSQL
$1, $2, $3 for parametersRETURNING clause for INSERT/UPDATE/DELETEMySQL
? for parametersLIMIT offset, countSQL Server
@param1, @param2 for parametersSQLite
? for parametersWhen generating queries, include error handling recommendations:
import psycopg2
from psycopg2 import sql
try:
cursor.execute(
sql.SQL("SELECT * FROM {} WHERE id = %s").format(
sql.Identifier('users')
),
(user_id,)
)
results = cursor.fetchall()
except psycopg2.Error as e:
print(f"Database error: {e}")
# Log error, return appropriate response
finally:
cursor.close()
Before providing a query, verify:
When responding to a query request, provide:
### SQL Query
```sql
-- Get active users with their order counts
SELECT
u.user_id,
u.username,
u.email,
COUNT(o.order_id) AS order_count,
COALESCE(SUM(o.total_amount), 0) AS lifetime_value
FROM
users u
LEFT JOIN
orders o ON u.user_id = o.user_id
WHERE
u.status = $1
AND u.created_at >= $2
GROUP BY
u.user_id,
u.username,
u.email
HAVING
COUNT(o.order_id) >= $3
ORDER BY
lifetime_value DESC
LIMIT $4;
$1: status (string, e.g., 'active')$2: created_at (date, e.g., '2024-01-01')$3: min_orders (integer, e.g., 5)$4: limit (integer, e.g., 100)This query retrieves active users who joined after a specified date and have placed a minimum number of orders. It calculates their total order count and lifetime value, sorted by highest spending customers first.
| user_id | username | order_count | lifetime_value | |
|---|---|---|---|---|
| 123 | john_doe | john@example.com | 15 | 2500.00 |
users.status and users.created_atorders.user_idcursor.execute(query, ('active', '2024-01-01', 5, 100))
results = cursor.fetchall()
## Advanced Topics
### 1. Query Optimization Techniques
- Use EXPLAIN ANALYZE to understand query plans
- Create covering indexes
- Partition large tables
- Use materialized views for complex aggregations
- Implement query result caching
### 2. Complex Scenarios
- Recursive CTEs for hierarchical data
- Pivot/Unpivot operations
- Full-text search
- Geospatial queries
- Time-series analysis
### 3. Migration Support
- Generate queries for data migration
- Schema comparison queries
- Data validation queries
- Backup and restore scripts
## Testing Recommendations
Always suggest testing generated queries with:
1. Small dataset first
2. EXPLAIN or EXPLAIN ANALYZE
3. Various edge cases (NULL values, empty sets)
4. Performance benchmarks
5. Security scanning tools
## Common Pitfalls to Avoid
1. **N+1 Query Problem**: Use JOINs instead of multiple queries
2. **SELECT ***: Specify needed columns explicitly
3. **Missing Indexes**: Recommend indexes on filter/join columns
4. **Cartesian Products**: Ensure proper JOIN conditions
5. **Implicit Type Conversions**: Cast explicitly when needed
6. **Timezone Issues**: Always use timezone-aware timestamps
## Integration Examples
### REST API
```python
from flask import Flask, request, jsonify
import psycopg2
@app.route('/api/users', methods=['GET'])
def get_users():
status = request.args.get('status', 'active')
# Validate input
if status not in ['active', 'inactive', 'suspended']:
return jsonify({'error': 'Invalid status'}), 400
try:
cursor.execute(
"SELECT id, username, email FROM users WHERE status = %s",
(status,)
)
users = cursor.fetchall()
return jsonify(users)
except Exception as e:
return jsonify({'error': str(e)}), 500
const resolvers = {
Query: {
users: async (_, { status, limit }, { db }) => {
const result = await db.query(
'SELECT * FROM users WHERE status = $1 LIMIT $2',
[status, limit]
);
return result.rows;
}
}
};
This skill provides comprehensive SQL query generation capabilities with a focus on security, performance, and best practices. Always prioritize parameterized queries and provide clear documentation with generated SQL.