Open Data Integrator
Integrate open construction datasets. Combine open data sources for enhanced analysis
MIT-0 · Free to use, modify, and redistribute. No attribution required.
⭐ 0 · 908 · 0 current installs · 0 all-time installs
MIT-0
Security Scan
OpenClaw
Benign
medium confidencePurpose & Capability
Name/description match the requested capabilities. Integrating open datasets reasonably requires network access (to fetch data) and filesystem access (to read user-provided files). The skill only declares python3 as a required binary; the SKILL.md shows use of Python libraries (e.g., requests) but no install spec — a minor mismatch in declared runtime dependencies, not a security red flag.
Instruction Scope
Runtime instructions and the included code samples are focused on fetching, processing, and enriching open datasets. The instructions constrain the agent to use only user-provided data or referenced sources. Connectors optionally accept API keys as parameters (expected for some data sources); there are no instructions to read arbitrary unrelated system files or environment variables.
Install Mechanism
This is an instruction-only skill (no install spec, no code files to execute). That minimizes install-time risk. Note: SKILL.md uses Python libraries (requests) which are not declared in a dependency manifest — users should ensure required Python packages are available in the runtime.
Credentials
The skill does not request any environment variables or credentials in its metadata. Optional API keys appear as connector parameters in the code (appropriate for some third‑party APIs). There are no unrelated credential requests.
Persistence & Privilege
The skill is not marked always:true and is user-invocable only. The manifest grants filesystem and network permissions, which are proportionate to integrating datasets. Autonomous invocation is allowed by default (platform behavior) but is not combined with unusually broad credential access.
Scan Findings in Context
[no_regex_findings] expected: The static scanner found no code patterns because this is an instruction-only skill with no executable code files. That absence is expected but not proof of safety.
Assessment
This skill appears to be what it says: a helper for integrating open construction data. Before installing, confirm you trust the skill source/homepage (https://datadrivenconstruction.io), ensure required Python packages (e.g., requests) are available in the runtime, and be cautious when providing any API keys or file paths: only supply keys for services you intend to use and only allow the skill to read files you explicitly choose to share. If you need higher assurance, ask the publisher for a dependency list or a signed release, or run the skill in an isolated environment. If you want the agent to fetch data from specific APIs, review which endpoints it will contact and avoid sharing unrelated credentials.Like a lobster shell, security has layers — review code before you run it.
Current versionv2.1.0
Download ziplatest
License
MIT-0
Free to use, modify, and redistribute. No attribution required.
Runtime requirements
🌐 Clawdis
OSmacOS · Linux · Windows
Binspython3
SKILL.md
Open Data Integrator
Overview
Based on DDC methodology (Chapter 2.2), this skill integrates open construction datasets from various sources like government databases, industry benchmarks, weather services, and geospatial data.
Book Reference: "Доминирование открытых данных" / "Open Data Dominance"
Quick Start
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional, Any, Callable
from datetime import datetime, date
import json
import requests
from abc import ABC, abstractmethod
class DataSourceType(Enum):
"""Types of open data sources"""
GOVERNMENT = "government" # Government statistics
INDUSTRY_BENCHMARK = "benchmark" # Industry benchmarks
WEATHER = "weather" # Weather data
GEOSPATIAL = "geospatial" # Geographic data
MATERIAL_PRICES = "material_prices" # Material cost indices
LABOR_RATES = "labor_rates" # Labor cost data
BUILDING_PERMITS = "permits" # Permit data
ENERGY = "energy" # Energy prices/data
ECONOMIC = "economic" # Economic indicators
class UpdateFrequency(Enum):
"""Data update frequency"""
REALTIME = "realtime"
HOURLY = "hourly"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
QUARTERLY = "quarterly"
ANNUAL = "annual"
@dataclass
class OpenDataSource:
"""Definition of an open data source"""
id: str
name: str
source_type: DataSourceType
url: str
api_key_required: bool = False
update_frequency: UpdateFrequency = UpdateFrequency.DAILY
format: str = "json"
license: str = "open"
description: Optional[str] = None
fields: List[str] = field(default_factory=list)
@dataclass
class DataRecord:
"""A single data record from a source"""
source_id: str
timestamp: datetime
data: Dict[str, Any]
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class IntegrationResult:
"""Result of data integration"""
source: str
records_fetched: int
records_processed: int
errors: List[str]
last_updated: datetime
sample_data: List[Dict]
@dataclass
class EnrichedData:
"""Data enriched with open data"""
original_data: Dict[str, Any]
enrichments: Dict[str, Any]
sources_used: List[str]
confidence: float
class OpenDataConnector(ABC):
"""Base class for open data connectors"""
@abstractmethod
def fetch(self, params: Dict) -> List[DataRecord]:
pass
@abstractmethod
def get_metadata(self) -> Dict:
pass
class WeatherDataConnector(OpenDataConnector):
"""Connector for weather data (e.g., OpenWeatherMap)"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key
self.base_url = "https://api.openweathermap.org/data/2.5"
def fetch(
self,
params: Dict
) -> List[DataRecord]:
"""Fetch weather data for location"""
lat = params.get("lat")
lon = params.get("lon")
start_date = params.get("start_date")
end_date = params.get("end_date")
# Simulate API call (in production, use actual API)
records = []
# Generate sample historical data
current = start_date
while current <= end_date:
records.append(DataRecord(
source_id="openweathermap",
timestamp=datetime.combine(current, datetime.min.time()),
data={
"date": current.isoformat(),
"temp_max": 25.0,
"temp_min": 15.0,
"precipitation": 0.0,
"wind_speed": 10.0,
"weather_code": "clear"
},
metadata={"lat": lat, "lon": lon}
))
current = date(current.year, current.month, current.day + 1) if current.day < 28 else date(current.year, current.month + 1 if current.month < 12 else 1, 1)
return records[:30] # Limit for demo
def get_metadata(self) -> Dict:
return {
"source": "OpenWeatherMap",
"type": DataSourceType.WEATHER.value,
"frequency": UpdateFrequency.HOURLY.value,
"fields": ["temp_max", "temp_min", "precipitation", "wind_speed"]
}
class MaterialPriceConnector(OpenDataConnector):
"""Connector for material price indices"""
def __init__(self, region: str = "US"):
self.region = region
self.price_indices = self._load_indices()
def _load_indices(self) -> Dict[str, Dict]:
"""Load material price indices"""
return {
"concrete": {"base": 100, "current": 125, "trend": "up"},
"steel": {"base": 100, "current": 145, "trend": "up"},
"lumber": {"base": 100, "current": 180, "trend": "stable"},
"copper": {"base": 100, "current": 135, "trend": "up"},
"asphalt": {"base": 100, "current": 115, "trend": "stable"},
"gypsum": {"base": 100, "current": 110, "trend": "stable"},
"glass": {"base": 100, "current": 105, "trend": "down"},
"cement": {"base": 100, "current": 120, "trend": "up"},
}
def fetch(self, params: Dict) -> List[DataRecord]:
"""Fetch material price data"""
materials = params.get("materials", list(self.price_indices.keys()))
records = []
for material in materials:
if material in self.price_indices:
records.append(DataRecord(
source_id="material_prices",
timestamp=datetime.now(),
data={
"material": material,
"region": self.region,
**self.price_indices[material]
}
))
return records
def get_metadata(self) -> Dict:
return {
"source": "Material Price Index",
"type": DataSourceType.MATERIAL_PRICES.value,
"frequency": UpdateFrequency.MONTHLY.value,
"materials": list(self.price_indices.keys())
}
class LaborRateConnector(OpenDataConnector):
"""Connector for labor rate data"""
def __init__(self, region: str = "US"):
self.region = region
self.labor_rates = self._load_rates()
def _load_rates(self) -> Dict[str, Dict]:
"""Load labor rates by trade"""
return {
"carpenter": {"hourly": 45.00, "burden_rate": 1.35},
"electrician": {"hourly": 55.00, "burden_rate": 1.40},
"plumber": {"hourly": 52.00, "burden_rate": 1.38},
"ironworker": {"hourly": 58.00, "burden_rate": 1.42},
"laborer": {"hourly": 32.00, "burden_rate": 1.30},
"operator": {"hourly": 48.00, "burden_rate": 1.35},
"mason": {"hourly": 50.00, "burden_rate": 1.36},
"painter": {"hourly": 38.00, "burden_rate": 1.32},
"hvac_tech": {"hourly": 54.00, "burden_rate": 1.38},
"welder": {"hourly": 52.00, "burden_rate": 1.40},
}
def fetch(self, params: Dict) -> List[DataRecord]:
"""Fetch labor rate data"""
trades = params.get("trades", list(self.labor_rates.keys()))
records = []
for trade in trades:
if trade in self.labor_rates:
rate_data = self.labor_rates[trade]
records.append(DataRecord(
source_id="labor_rates",
timestamp=datetime.now(),
data={
"trade": trade,
"region": self.region,
"hourly_rate": rate_data["hourly"],
"burden_rate": rate_data["burden_rate"],
"fully_loaded": rate_data["hourly"] * rate_data["burden_rate"]
}
))
return records
def get_metadata(self) -> Dict:
return {
"source": "Labor Rate Database",
"type": DataSourceType.LABOR_RATES.value,
"frequency": UpdateFrequency.QUARTERLY.value,
"trades": list(self.labor_rates.keys())
}
class BuildingPermitConnector(OpenDataConnector):
"""Connector for building permit data"""
def __init__(self, jurisdiction: str = "default"):
self.jurisdiction = jurisdiction
def fetch(self, params: Dict) -> List[DataRecord]:
"""Fetch permit data"""
# Simulate permit data
permit_types = ["new_construction", "renovation", "addition", "demolition"]
records = []
for ptype in permit_types:
records.append(DataRecord(
source_id="building_permits",
timestamp=datetime.now(),
data={
"permit_type": ptype,
"jurisdiction": self.jurisdiction,
"count_ytd": 150,
"total_value": 25000000,
"avg_processing_days": 21
}
))
return records
def get_metadata(self) -> Dict:
return {
"source": "Building Permit Database",
"type": DataSourceType.BUILDING_PERMITS.value,
"frequency": UpdateFrequency.DAILY.value
}
class OpenDataIntegrator:
"""
Integrate open construction datasets.
Based on DDC methodology Chapter 2.2.
"""
def __init__(self, region: str = "US"):
self.region = region
self.connectors: Dict[str, OpenDataConnector] = {}
self.cache: Dict[str, List[DataRecord]] = {}
self.cache_expiry: Dict[str, datetime] = {}
self._register_default_connectors()
def _register_default_connectors(self):
"""Register default data connectors"""
self.register_connector("weather", WeatherDataConnector())
self.register_connector("material_prices", MaterialPriceConnector(self.region))
self.register_connector("labor_rates", LaborRateConnector(self.region))
self.register_connector("permits", BuildingPermitConnector())
def register_connector(
self,
name: str,
connector: OpenDataConnector
):
"""Register a data connector"""
self.connectors[name] = connector
def fetch_data(
self,
source: str,
params: Optional[Dict] = None,
use_cache: bool = True
) -> IntegrationResult:
"""
Fetch data from a source.
Args:
source: Name of the data source
params: Query parameters
use_cache: Whether to use cached data
Returns:
Integration result with fetched data
"""
if source not in self.connectors:
return IntegrationResult(
source=source,
records_fetched=0,
records_processed=0,
errors=[f"Unknown source: {source}"],
last_updated=datetime.now(),
sample_data=[]
)
# Check cache
cache_key = f"{source}_{json.dumps(params or {}, sort_keys=True)}"
if use_cache and cache_key in self.cache:
expiry = self.cache_expiry.get(cache_key)
if expiry and expiry > datetime.now():
cached = self.cache[cache_key]
return IntegrationResult(
source=source,
records_fetched=len(cached),
records_processed=len(cached),
errors=[],
last_updated=expiry,
sample_data=[r.data for r in cached[:5]]
)
# Fetch fresh data
connector = self.connectors[source]
errors = []
try:
records = connector.fetch(params or {})
# Cache the results
self.cache[cache_key] = records
self.cache_expiry[cache_key] = datetime.now()
return IntegrationResult(
source=source,
records_fetched=len(records),
records_processed=len(records),
errors=errors,
last_updated=datetime.now(),
sample_data=[r.data for r in records[:5]]
)
except Exception as e:
errors.append(str(e))
return IntegrationResult(
source=source,
records_fetched=0,
records_processed=0,
errors=errors,
last_updated=datetime.now(),
sample_data=[]
)
def enrich_project_data(
self,
project_data: Dict[str, Any],
enrichment_sources: Optional[List[str]] = None
) -> EnrichedData:
"""
Enrich project data with open data.
Args:
project_data: Original project data
enrichment_sources: Sources to use for enrichment
Returns:
Enriched data
"""
sources = enrichment_sources or ["material_prices", "labor_rates", "weather"]
enrichments = {}
sources_used = []
# Material price enrichment
if "material_prices" in sources and "materials" in project_data:
materials = project_data["materials"]
result = self.fetch_data("material_prices", {"materials": materials})
if result.records_fetched > 0:
enrichments["material_price_indices"] = result.sample_data
sources_used.append("material_prices")
# Labor rate enrichment
if "labor_rates" in sources and "trades" in project_data:
trades = project_data["trades"]
result = self.fetch_data("labor_rates", {"trades": trades})
if result.records_fetched > 0:
enrichments["labor_rates"] = result.sample_data
sources_used.append("labor_rates")
# Weather enrichment
if "weather" in sources and "location" in project_data:
loc = project_data["location"]
params = {
"lat": loc.get("lat"),
"lon": loc.get("lon"),
"start_date": project_data.get("start_date", date.today()),
"end_date": project_data.get("end_date", date.today())
}
result = self.fetch_data("weather", params)
if result.records_fetched > 0:
enrichments["weather_forecast"] = result.sample_data
sources_used.append("weather")
# Calculate confidence based on enrichment success
confidence = len(sources_used) / len(sources) if sources else 0
return EnrichedData(
original_data=project_data,
enrichments=enrichments,
sources_used=sources_used,
confidence=confidence
)
def get_cost_indices(
self,
materials: Optional[List[str]] = None,
trades: Optional[List[str]] = None
) -> Dict:
"""Get current cost indices"""
indices = {
"timestamp": datetime.now().isoformat(),
"region": self.region
}
if materials:
result = self.fetch_data("material_prices", {"materials": materials})
indices["materials"] = result.sample_data
if trades:
result = self.fetch_data("labor_rates", {"trades": trades})
indices["labor"] = result.sample_data
return indices
def get_weather_risk(
self,
lat: float,
lon: float,
start_date: date,
end_date: date
) -> Dict:
"""Assess weather risk for project period"""
result = self.fetch_data("weather", {
"lat": lat,
"lon": lon,
"start_date": start_date,
"end_date": end_date
})
if result.records_fetched == 0:
return {"error": "No weather data available"}
# Calculate risk metrics
rain_days = sum(1 for d in result.sample_data
if d.get("precipitation", 0) > 5)
extreme_temp_days = sum(1 for d in result.sample_data
if d.get("temp_max", 0) > 35 or d.get("temp_min", 0) < 0)
total_days = len(result.sample_data)
risk_score = (rain_days + extreme_temp_days) / total_days if total_days > 0 else 0
return {
"total_days": total_days,
"rain_days": rain_days,
"extreme_temperature_days": extreme_temp_days,
"risk_score": risk_score,
"risk_level": "high" if risk_score > 0.3 else "medium" if risk_score > 0.1 else "low"
}
def list_sources(self) -> List[Dict]:
"""List all available data sources"""
sources = []
for name, connector in self.connectors.items():
meta = connector.get_metadata()
sources.append({
"name": name,
**meta
})
return sources
def generate_report(self) -> str:
"""Generate data availability report"""
output = """
# Open Data Integration Report
## Available Sources
"""
for source in self.list_sources():
output += f"""
### {source['name'].title()}
- **Type:** {source['type']}
- **Update Frequency:** {source['frequency']}
"""
output += """
## Cache Status
"""
for key, expiry in self.cache_expiry.items():
status = "valid" if expiry > datetime.now() else "expired"
output += f"- {key}: {status}\n"
return output
Common Use Cases
Fetch Material Prices
integrator = OpenDataIntegrator(region="US")
# Get material price indices
result = integrator.fetch_data("material_prices", {
"materials": ["concrete", "steel", "lumber"]
})
print(f"Fetched: {result.records_fetched} records")
for record in result.sample_data:
print(f" {record['material']}: index={record['current']}, trend={record['trend']}")
Enrich Project Data
project = {
"name": "Office Building",
"materials": ["concrete", "steel", "glass"],
"trades": ["carpenter", "electrician", "plumber"],
"location": {"lat": 40.7128, "lon": -74.0060},
"start_date": date(2024, 6, 1),
"end_date": date(2024, 12, 31)
}
enriched = integrator.enrich_project_data(project)
print(f"Sources used: {enriched.sources_used}")
print(f"Confidence: {enriched.confidence:.0%}")
print(f"Material indices: {enriched.enrichments.get('material_price_indices')}")
Assess Weather Risk
risk = integrator.get_weather_risk(
lat=40.7128,
lon=-74.0060,
start_date=date(2024, 6, 1),
end_date=date(2024, 8, 31)
)
print(f"Risk Level: {risk['risk_level']}")
print(f"Rain Days: {risk['rain_days']}")
Quick Reference
| Component | Purpose |
|---|---|
OpenDataIntegrator | Main integration engine |
OpenDataConnector | Base connector class |
WeatherDataConnector | Weather API connector |
MaterialPriceConnector | Material price indices |
LaborRateConnector | Labor rate data |
EnrichedData | Enriched data result |
Resources
- Book: "Data-Driven Construction" by Artem Boiko, Chapter 2.2
- Website: https://datadrivenconstruction.io
Next Steps
- Use ontology-mapper for semantic mapping
- Use cost-prediction with indices
- Use weather-impact-analysis for scheduling
Files
3 totalSelect a file
Select a file to preview.
Comments
Loading comments…
