{"skill":{"slug":"data-silo-detection","displayName":"Data Silo Detection","summary":"Detect and map data silos in construction organizations. Identify disconnected data sources and integration opportunities","description":"---\r\nname: \"data-silo-detection\"\r\ndescription: \"Detect and map data silos in construction organizations. Identify disconnected data sources and integration opportunities\"\r\nhomepage: \"https://datadrivenconstruction.io\"\r\nmetadata: {\"openclaw\": {\"emoji\": \"🔗\", \"os\": [\"win32\"], \"homepage\": \"https://datadrivenconstruction.io\", \"requires\": {\"bins\": [\"python3\"]}}}\r\n---\r\n# Data Silo Detection\r\n\r\n## Overview\r\n\r\nBased on DDC methodology (Chapter 1.2), this skill detects and maps data silos in construction organizations, identifying disconnected data sources, duplicate data, and integration opportunities.\r\n\r\n**Book Reference:** \"Технологии и системы управления в современном строительстве\" / \"Technologies and Management Systems in Modern Construction\"\r\n\r\n## Quick Start\r\n\r\n```python\r\nfrom dataclasses import dataclass, field\r\nfrom enum import Enum\r\nfrom typing import List, Dict, Optional, Set, Tuple\r\nfrom datetime import datetime\r\nimport json\r\nfrom collections import defaultdict\r\n\r\nclass DataDomain(Enum):\r\n    \"\"\"Construction data domains\"\"\"\r\n    DESIGN = \"design\"\r\n    COST = \"cost\"\r\n    SCHEDULE = \"schedule\"\r\n    QUALITY = \"quality\"\r\n    SAFETY = \"safety\"\r\n    PROCUREMENT = \"procurement\"\r\n    SITE = \"site\"\r\n    DOCUMENT = \"document\"\r\n    FINANCIAL = \"financial\"\r\n    HR = \"hr\"\r\n\r\nclass SiloSeverity(Enum):\r\n    \"\"\"Severity level of data silo\"\"\"\r\n    CRITICAL = \"critical\"      # Major business impact\r\n    HIGH = \"high\"              # Significant inefficiency\r\n    MEDIUM = \"medium\"          # Noticeable issues\r\n    LOW = \"low\"                # Minor inconvenience\r\n\r\nclass DataSourceType(Enum):\r\n    \"\"\"Types of data sources\"\"\"\r\n    DATABASE = \"database\"\r\n    SPREADSHEET = \"spreadsheet\"\r\n    FILE_SHARE = \"file_share\"\r\n    CLOUD_APP = \"cloud_app\"\r\n    DESKTOP_APP = \"desktop_app\"\r\n    PAPER = \"paper\"\r\n    EMAIL = \"email\"\r\n    PERSONAL = \"personal\"\r\n\r\n@dataclass\r\nclass DataSource:\r\n    \"\"\"Represents a data source in the organization\"\"\"\r\n    id: str\r\n    name: str\r\n    type: DataSourceType\r\n    domain: DataDomain\r\n    owner: str\r\n    department: str\r\n    users: List[str]\r\n    data_entities: List[str]\r\n    connections: List[str] = field(default_factory=list)\r\n    update_frequency: str = \"unknown\"\r\n    access_level: str = \"department\"  # personal, department, organization\r\n    has_api: bool = False\r\n    last_modified: Optional[datetime] = None\r\n\r\n@dataclass\r\nclass DataSilo:\r\n    \"\"\"Detected data silo\"\"\"\r\n    id: str\r\n    sources: List[DataSource]\r\n    domain: DataDomain\r\n    severity: SiloSeverity\r\n    issue_type: str\r\n    description: str\r\n    impact: str\r\n    affected_users: int\r\n    affected_processes: List[str]\r\n    recommendations: List[str]\r\n    estimated_cost: Optional[float] = None\r\n\r\n@dataclass\r\nclass DuplicateData:\r\n    \"\"\"Detected duplicate data across sources\"\"\"\r\n    entity_name: str\r\n    sources: List[str]\r\n    discrepancy_rate: float  # 0-1\r\n    master_source: Optional[str] = None\r\n    issues: List[str] = field(default_factory=list)\r\n\r\n@dataclass\r\nclass SiloAnalysis:\r\n    \"\"\"Complete silo analysis results\"\"\"\r\n    organization: str\r\n    analysis_date: datetime\r\n    total_sources: int\r\n    silos_detected: List[DataSilo]\r\n    duplicates: List[DuplicateData]\r\n    connectivity_score: float\r\n    data_flow_gaps: List[Dict]\r\n    priority_actions: List[str]\r\n    integration_roadmap: Dict\r\n\r\n\r\nclass DataSiloDetector:\r\n    \"\"\"\r\n    Detect and analyze data silos in construction organizations.\r\n    Based on DDC methodology Chapter 1.2.\r\n    \"\"\"\r\n\r\n    def __init__(self):\r\n        self.domain_relationships = self._define_domain_relationships()\r\n        self.critical_entities = self._define_critical_entities()\r\n\r\n    def _define_domain_relationships(self) -> Dict[DataDomain, List[DataDomain]]:\r\n        \"\"\"Define expected relationships between domains\"\"\"\r\n        return {\r\n            DataDomain.DESIGN: [\r\n                DataDomain.COST, DataDomain.SCHEDULE,\r\n                DataDomain.PROCUREMENT, DataDomain.QUALITY\r\n            ],\r\n            DataDomain.COST: [\r\n                DataDomain.DESIGN, DataDomain.SCHEDULE,\r\n                DataDomain.FINANCIAL, DataDomain.PROCUREMENT\r\n            ],\r\n            DataDomain.SCHEDULE: [\r\n                DataDomain.DESIGN, DataDomain.COST,\r\n                DataDomain.SITE, DataDomain.HR\r\n            ],\r\n            DataDomain.PROCUREMENT: [\r\n                DataDomain.COST, DataDomain.DESIGN,\r\n                DataDomain.SITE, DataDomain.FINANCIAL\r\n            ],\r\n            DataDomain.SITE: [\r\n                DataDomain.SCHEDULE, DataDomain.SAFETY,\r\n                DataDomain.QUALITY, DataDomain.HR\r\n            ],\r\n            DataDomain.QUALITY: [\r\n                DataDomain.DESIGN, DataDomain.SITE,\r\n                DataDomain.DOCUMENT\r\n            ],\r\n            DataDomain.SAFETY: [\r\n                DataDomain.SITE, DataDomain.HR,\r\n                DataDomain.DOCUMENT\r\n            ],\r\n            DataDomain.FINANCIAL: [\r\n                DataDomain.COST, DataDomain.PROCUREMENT,\r\n                DataDomain.HR\r\n            ]\r\n        }\r\n\r\n    def _define_critical_entities(self) -> Dict[str, List[DataDomain]]:\r\n        \"\"\"Define entities that should be shared across domains\"\"\"\r\n        return {\r\n            \"project\": [DataDomain.DESIGN, DataDomain.COST, DataDomain.SCHEDULE],\r\n            \"budget\": [DataDomain.COST, DataDomain.FINANCIAL, DataDomain.PROCUREMENT],\r\n            \"schedule\": [DataDomain.SCHEDULE, DataDomain.SITE, DataDomain.PROCUREMENT],\r\n            \"material\": [DataDomain.DESIGN, DataDomain.COST, DataDomain.PROCUREMENT],\r\n            \"labor\": [DataDomain.HR, DataDomain.COST, DataDomain.SCHEDULE],\r\n            \"subcontractor\": [DataDomain.PROCUREMENT, DataDomain.COST, DataDomain.SCHEDULE],\r\n            \"rfi\": [DataDomain.DESIGN, DataDomain.DOCUMENT, DataDomain.SITE],\r\n            \"change_order\": [DataDomain.COST, DataDomain.DESIGN, DataDomain.SCHEDULE]\r\n        }\r\n\r\n    def detect_silos(\r\n        self,\r\n        organization: str,\r\n        data_sources: List[DataSource],\r\n        process_flows: Optional[List[Dict]] = None\r\n    ) -> SiloAnalysis:\r\n        \"\"\"\r\n        Detect data silos in the organization.\r\n\r\n        Args:\r\n            organization: Organization name\r\n            data_sources: List of data sources to analyze\r\n            process_flows: Optional business process flows\r\n\r\n        Returns:\r\n            Complete silo analysis\r\n        \"\"\"\r\n        # Build connectivity graph\r\n        connectivity = self._build_connectivity_graph(data_sources)\r\n\r\n        # Detect isolated sources\r\n        isolated_silos = self._detect_isolated_sources(\r\n            data_sources, connectivity\r\n        )\r\n\r\n        # Detect domain silos\r\n        domain_silos = self._detect_domain_silos(data_sources)\r\n\r\n        # Detect duplicate data\r\n        duplicates = self._detect_duplicates(data_sources)\r\n\r\n        # Detect data flow gaps\r\n        flow_gaps = self._detect_flow_gaps(\r\n            data_sources, process_flows\r\n        )\r\n\r\n        # Calculate connectivity score\r\n        connectivity_score = self._calculate_connectivity_score(\r\n            data_sources, connectivity\r\n        )\r\n\r\n        # Combine all silos\r\n        all_silos = isolated_silos + domain_silos\r\n\r\n        # Prioritize silos\r\n        prioritized_silos = self._prioritize_silos(all_silos)\r\n\r\n        # Generate priority actions\r\n        priority_actions = self._generate_priority_actions(\r\n            prioritized_silos, duplicates\r\n        )\r\n\r\n        # Create integration roadmap\r\n        roadmap = self._create_integration_roadmap(\r\n            prioritized_silos, flow_gaps\r\n        )\r\n\r\n        return SiloAnalysis(\r\n            organization=organization,\r\n            analysis_date=datetime.now(),\r\n            total_sources=len(data_sources),\r\n            silos_detected=prioritized_silos,\r\n            duplicates=duplicates,\r\n            connectivity_score=connectivity_score,\r\n            data_flow_gaps=flow_gaps,\r\n            priority_actions=priority_actions,\r\n            integration_roadmap=roadmap\r\n        )\r\n\r\n    def _build_connectivity_graph(\r\n        self,\r\n        sources: List[DataSource]\r\n    ) -> Dict[str, Set[str]]:\r\n        \"\"\"Build graph of source connections\"\"\"\r\n        graph = defaultdict(set)\r\n\r\n        for source in sources:\r\n            for connection in source.connections:\r\n                graph[source.id].add(connection)\r\n                graph[connection].add(source.id)\r\n\r\n        return graph\r\n\r\n    def _detect_isolated_sources(\r\n        self,\r\n        sources: List[DataSource],\r\n        connectivity: Dict[str, Set[str]]\r\n    ) -> List[DataSilo]:\r\n        \"\"\"Detect sources with no connections\"\"\"\r\n        silos = []\r\n\r\n        for source in sources:\r\n            connections = len(connectivity.get(source.id, set()))\r\n\r\n            if connections == 0:\r\n                severity = SiloSeverity.CRITICAL if source.domain in [\r\n                    DataDomain.COST, DataDomain.SCHEDULE\r\n                ] else SiloSeverity.HIGH\r\n\r\n                silos.append(DataSilo(\r\n                    id=f\"isolated_{source.id}\",\r\n                    sources=[source],\r\n                    domain=source.domain,\r\n                    severity=severity,\r\n                    issue_type=\"isolated_source\",\r\n                    description=f\"{source.name} has no connections to other systems\",\r\n                    impact=\"Data must be manually transferred, risking errors and delays\",\r\n                    affected_users=len(source.users),\r\n                    affected_processes=self._get_affected_processes(source.domain),\r\n                    recommendations=[\r\n                        f\"Connect {source.name} via API or ETL to related systems\",\r\n                        \"Establish data synchronization schedule\",\r\n                        \"Define master data source for shared entities\"\r\n                    ]\r\n                ))\r\n            elif connections == 1 and source.access_level == \"personal\":\r\n                silos.append(DataSilo(\r\n                    id=f\"personal_{source.id}\",\r\n                    sources=[source],\r\n                    domain=source.domain,\r\n                    severity=SiloSeverity.MEDIUM,\r\n                    issue_type=\"personal_silo\",\r\n                    description=f\"{source.name} is a personal data store with limited access\",\r\n                    impact=\"Data not accessible to team, knowledge loss risk\",\r\n                    affected_users=1,\r\n                    affected_processes=self._get_affected_processes(source.domain),\r\n                    recommendations=[\r\n                        \"Move data to shared organizational repository\",\r\n                        \"Implement access controls instead of isolation\",\r\n                        \"Document data structure and usage\"\r\n                    ]\r\n                ))\r\n\r\n        return silos\r\n\r\n    def _detect_domain_silos(\r\n        self,\r\n        sources: List[DataSource]\r\n    ) -> List[DataSilo]:\r\n        \"\"\"Detect silos between domains that should be connected\"\"\"\r\n        silos = []\r\n\r\n        # Group sources by domain\r\n        domain_sources = defaultdict(list)\r\n        for source in sources:\r\n            domain_sources[source.domain].append(source)\r\n\r\n        # Check for missing domain connections\r\n        for domain, related_domains in self.domain_relationships.items():\r\n            domain_srcs = domain_sources.get(domain, [])\r\n\r\n            for related in related_domains:\r\n                related_srcs = domain_sources.get(related, [])\r\n\r\n                if domain_srcs and related_srcs:\r\n                    # Check if any connections exist between domains\r\n                    has_connection = False\r\n                    for src in domain_srcs:\r\n                        for rel_src in related_srcs:\r\n                            if rel_src.id in src.connections:\r\n                                has_connection = True\r\n                                break\r\n\r\n                    if not has_connection:\r\n                        silos.append(DataSilo(\r\n                            id=f\"domain_gap_{domain.value}_{related.value}\",\r\n                            sources=domain_srcs + related_srcs,\r\n                            domain=domain,\r\n                            severity=SiloSeverity.HIGH,\r\n                            issue_type=\"domain_disconnect\",\r\n                            description=f\"No data flow between {domain.value} and {related.value}\",\r\n                            impact=\"Related information not synchronized, decision delays\",\r\n                            affected_users=sum(len(s.users) for s in domain_srcs + related_srcs),\r\n                            affected_processes=self._get_affected_processes(domain) +\r\n                                              self._get_affected_processes(related),\r\n                            recommendations=[\r\n                                f\"Establish integration between {domain.value} and {related.value} systems\",\r\n                                \"Define shared data entities and master sources\",\r\n                                \"Implement automated data synchronization\"\r\n                            ]\r\n                        ))\r\n\r\n        return silos\r\n\r\n    def _detect_duplicates(\r\n        self,\r\n        sources: List[DataSource]\r\n    ) -> List[DuplicateData]:\r\n        \"\"\"Detect duplicate data across sources\"\"\"\r\n        duplicates = []\r\n\r\n        # Map entities to sources\r\n        entity_sources = defaultdict(list)\r\n        for source in sources:\r\n            for entity in source.data_entities:\r\n                entity_sources[entity].append(source.id)\r\n\r\n        # Find duplicates\r\n        for entity, source_ids in entity_sources.items():\r\n            if len(source_ids) > 1:\r\n                # Check if it's a critical entity\r\n                is_critical = entity.lower() in self.critical_entities\r\n\r\n                duplicate = DuplicateData(\r\n                    entity_name=entity,\r\n                    sources=source_ids,\r\n                    discrepancy_rate=0.0,  # Would need actual data to calculate\r\n                    issues=[]\r\n                )\r\n\r\n                if is_critical and len(source_ids) > 2:\r\n                    duplicate.issues.append(\r\n                        \"Critical entity duplicated in multiple systems\"\r\n                    )\r\n\r\n                if not any(s for s in sources if s.id in source_ids and \"master\" in s.name.lower()):\r\n                    duplicate.issues.append(\"No clear master source defined\")\r\n\r\n                duplicates.append(duplicate)\r\n\r\n        return duplicates\r\n\r\n    def _detect_flow_gaps(\r\n        self,\r\n        sources: List[DataSource],\r\n        process_flows: Optional[List[Dict]]\r\n    ) -> List[Dict]:\r\n        \"\"\"Detect gaps in expected data flows\"\"\"\r\n        gaps = []\r\n\r\n        # Check critical entity coverage\r\n        for entity, required_domains in self.critical_entities.items():\r\n            entity_domains = set()\r\n            for source in sources:\r\n                if entity in [e.lower() for e in source.data_entities]:\r\n                    entity_domains.add(source.domain)\r\n\r\n            missing = set(required_domains) - entity_domains\r\n            if missing:\r\n                gaps.append({\r\n                    \"entity\": entity,\r\n                    \"missing_domains\": [d.value for d in missing],\r\n                    \"impact\": f\"{entity} data not available in {len(missing)} domains\"\r\n                })\r\n\r\n        return gaps\r\n\r\n    def _calculate_connectivity_score(\r\n        self,\r\n        sources: List[DataSource],\r\n        connectivity: Dict[str, Set[str]]\r\n    ) -> float:\r\n        \"\"\"Calculate overall connectivity score\"\"\"\r\n        if not sources:\r\n            return 0.0\r\n\r\n        # Calculate average connections per source\r\n        total_connections = sum(len(conns) for conns in connectivity.values())\r\n        avg_connections = total_connections / len(sources)\r\n\r\n        # Ideal connections per source\r\n        ideal_connections = 3\r\n\r\n        # Score based on average connections\r\n        connection_score = min(1.0, avg_connections / ideal_connections)\r\n\r\n        # Penalize for isolated sources\r\n        isolated = sum(1 for s in sources if s.id not in connectivity or not connectivity[s.id])\r\n        isolation_penalty = isolated / len(sources)\r\n\r\n        # API availability bonus\r\n        api_count = sum(1 for s in sources if s.has_api)\r\n        api_bonus = (api_count / len(sources)) * 0.2\r\n\r\n        return max(0, min(1.0, connection_score - isolation_penalty + api_bonus))\r\n\r\n    def _get_affected_processes(self, domain: DataDomain) -> List[str]:\r\n        \"\"\"Get business processes affected by domain\"\"\"\r\n        process_map = {\r\n            DataDomain.DESIGN: [\"Design Review\", \"RFI Processing\", \"Drawing Distribution\"],\r\n            DataDomain.COST: [\"Budgeting\", \"Cost Tracking\", \"Invoice Processing\"],\r\n            DataDomain.SCHEDULE: [\"Planning\", \"Progress Tracking\", \"Resource Allocation\"],\r\n            DataDomain.PROCUREMENT: [\"Vendor Selection\", \"Purchase Orders\", \"Material Tracking\"],\r\n            DataDomain.SITE: [\"Daily Reports\", \"Progress Photos\", \"Issue Management\"],\r\n            DataDomain.QUALITY: [\"Inspections\", \"Defect Tracking\", \"Compliance\"],\r\n            DataDomain.SAFETY: [\"Incident Reporting\", \"Safety Inspections\", \"Training\"],\r\n            DataDomain.FINANCIAL: [\"Billing\", \"Payments\", \"Financial Reporting\"],\r\n            DataDomain.HR: [\"Timekeeping\", \"Resource Management\", \"Certifications\"]\r\n        }\r\n        return process_map.get(domain, [])\r\n\r\n    def _prioritize_silos(\r\n        self,\r\n        silos: List[DataSilo]\r\n    ) -> List[DataSilo]:\r\n        \"\"\"Prioritize silos by severity and impact\"\"\"\r\n        severity_order = {\r\n            SiloSeverity.CRITICAL: 0,\r\n            SiloSeverity.HIGH: 1,\r\n            SiloSeverity.MEDIUM: 2,\r\n            SiloSeverity.LOW: 3\r\n        }\r\n\r\n        return sorted(\r\n            silos,\r\n            key=lambda s: (severity_order[s.severity], -s.affected_users)\r\n        )\r\n\r\n    def _generate_priority_actions(\r\n        self,\r\n        silos: List[DataSilo],\r\n        duplicates: List[DuplicateData]\r\n    ) -> List[str]:\r\n        \"\"\"Generate prioritized action items\"\"\"\r\n        actions = []\r\n\r\n        # Critical silos first\r\n        critical_silos = [s for s in silos if s.severity == SiloSeverity.CRITICAL]\r\n        for silo in critical_silos[:3]:\r\n            actions.append(f\"URGENT: {silo.recommendations[0]}\")\r\n\r\n        # Duplicate data issues\r\n        critical_dups = [d for d in duplicates if d.issues]\r\n        for dup in critical_dups[:2]:\r\n            actions.append(\r\n                f\"Define master source for '{dup.entity_name}' \"\r\n                f\"(currently in {len(dup.sources)} sources)\"\r\n            )\r\n\r\n        # High priority silos\r\n        high_silos = [s for s in silos if s.severity == SiloSeverity.HIGH]\r\n        for silo in high_silos[:3]:\r\n            if silo.recommendations:\r\n                actions.append(silo.recommendations[0])\r\n\r\n        return actions[:10]\r\n\r\n    def _create_integration_roadmap(\r\n        self,\r\n        silos: List[DataSilo],\r\n        gaps: List[Dict]\r\n    ) -> Dict:\r\n        \"\"\"Create phased integration roadmap\"\"\"\r\n        roadmap = {\r\n            \"Phase 1 - Quick Wins (0-3 months)\": [],\r\n            \"Phase 2 - Core Integration (3-6 months)\": [],\r\n            \"Phase 3 - Advanced Integration (6-12 months)\": [],\r\n            \"Phase 4 - Optimization (12+ months)\": []\r\n        }\r\n\r\n        # Phase 1: Address personal silos and easy integrations\r\n        for silo in silos:\r\n            if silo.issue_type == \"personal_silo\":\r\n                roadmap[\"Phase 1 - Quick Wins (0-3 months)\"].append(\r\n                    f\"Migrate {silo.sources[0].name} to shared repository\"\r\n                )\r\n\r\n        # Phase 2: Core domain integrations\r\n        domain_gaps = [s for s in silos if s.issue_type == \"domain_disconnect\"]\r\n        for silo in domain_gaps[:3]:\r\n            roadmap[\"Phase 2 - Core Integration (3-6 months)\"].append(\r\n                silo.recommendations[0] if silo.recommendations else silo.description\r\n            )\r\n\r\n        # Phase 3: Critical entity master data\r\n        roadmap[\"Phase 3 - Advanced Integration (6-12 months)\"].extend([\r\n            \"Implement master data management for shared entities\",\r\n            \"Deploy integration middleware/ESB\",\r\n            \"Establish data governance policies\"\r\n        ])\r\n\r\n        # Phase 4: Optimization\r\n        roadmap[\"Phase 4 - Optimization (12+ months)\"].extend([\r\n            \"Implement real-time data synchronization\",\r\n            \"Deploy integration monitoring and alerting\",\r\n            \"Continuous improvement based on metrics\"\r\n        ])\r\n\r\n        return roadmap\r\n\r\n    def generate_report(self, analysis: SiloAnalysis) -> str:\r\n        \"\"\"Generate silo analysis report\"\"\"\r\n        report = f\"\"\"\r\n# Data Silo Analysis Report\r\n## {analysis.organization}\r\n\r\n**Analysis Date:** {analysis.analysis_date.strftime('%Y-%m-%d')}\r\n**Data Sources Analyzed:** {analysis.total_sources}\r\n**Connectivity Score:** {analysis.connectivity_score:.0%}\r\n\r\n## Executive Summary\r\n\r\nDetected **{len(analysis.silos_detected)}** data silos and **{len(analysis.duplicates)}** duplicate data issues.\r\n\r\n### Silos by Severity\r\n\"\"\"\r\n        severity_counts = defaultdict(int)\r\n        for silo in analysis.silos_detected:\r\n            severity_counts[silo.severity.value] += 1\r\n\r\n        for severity in [\"critical\", \"high\", \"medium\", \"low\"]:\r\n            count = severity_counts.get(severity, 0)\r\n            if count > 0:\r\n                report += f\"- **{severity.title()}**: {count}\\n\"\r\n\r\n        report += \"\\n## Priority Actions\\n\\n\"\r\n        for i, action in enumerate(analysis.priority_actions, 1):\r\n            report += f\"{i}. {action}\\n\"\r\n\r\n        report += \"\\n## Detected Silos\\n\\n\"\r\n        for silo in analysis.silos_detected[:5]:\r\n            report += f\"\"\"\r\n### {silo.id}\r\n- **Type:** {silo.issue_type}\r\n- **Severity:** {silo.severity.value}\r\n- **Impact:** {silo.impact}\r\n- **Affected Users:** {silo.affected_users}\r\n\"\"\"\r\n\r\n        report += \"\\n## Integration Roadmap\\n\"\r\n        for phase, items in analysis.integration_roadmap.items():\r\n            report += f\"\\n### {phase}\\n\"\r\n            for item in items:\r\n                report += f\"- {item}\\n\"\r\n\r\n        return report\r\n```\r\n\r\n## Common Use Cases\r\n\r\n### Detect Data Silos\r\n\r\n```python\r\ndetector = DataSiloDetector()\r\n\r\n# Define data sources\r\nsources = [\r\n    DataSource(\r\n        id=\"revit\",\r\n        name=\"Revit Models\",\r\n        type=DataSourceType.DESKTOP_APP,\r\n        domain=DataDomain.DESIGN,\r\n        owner=\"Design Team\",\r\n        department=\"Engineering\",\r\n        users=[\"architect1\", \"engineer1\", \"engineer2\"],\r\n        data_entities=[\"building_model\", \"drawings\", \"schedules\"],\r\n        connections=[\"navisworks\"],\r\n        has_api=True\r\n    ),\r\n    DataSource(\r\n        id=\"excel_estimates\",\r\n        name=\"Excel Cost Estimates\",\r\n        type=DataSourceType.SPREADSHEET,\r\n        domain=DataDomain.COST,\r\n        owner=\"Estimator\",\r\n        department=\"Pre-construction\",\r\n        users=[\"estimator1\"],\r\n        data_entities=[\"costs\", \"quantities\", \"labor_rates\"],\r\n        connections=[],  # No connections - silo!\r\n        access_level=\"personal\"\r\n    ),\r\n    DataSource(\r\n        id=\"procore\",\r\n        name=\"Procore\",\r\n        type=DataSourceType.CLOUD_APP,\r\n        domain=DataDomain.SITE,\r\n        owner=\"Project Manager\",\r\n        department=\"Operations\",\r\n        users=[\"pm1\", \"pm2\", \"super1\"],\r\n        data_entities=[\"daily_reports\", \"photos\", \"punch_list\"],\r\n        connections=[\"primavera\"],\r\n        has_api=True\r\n    )\r\n]\r\n\r\nanalysis = detector.detect_silos(\r\n    organization=\"ABC Construction\",\r\n    data_sources=sources\r\n)\r\n\r\nprint(f\"Silos detected: {len(analysis.silos_detected)}\")\r\nprint(f\"Connectivity score: {analysis.connectivity_score:.0%}\")\r\n```\r\n\r\n### Generate Silo Report\r\n\r\n```python\r\nreport = detector.generate_report(analysis)\r\nprint(report)\r\n\r\n# Save to file\r\nwith open(\"silo_report.md\", \"w\") as f:\r\n    f.write(report)\r\n```\r\n\r\n### View Priority Actions\r\n\r\n```python\r\nprint(\"Priority Actions:\")\r\nfor i, action in enumerate(analysis.priority_actions, 1):\r\n    print(f\"{i}. {action}\")\r\n\r\nprint(\"\\nIntegration Roadmap:\")\r\nfor phase, items in analysis.integration_roadmap.items():\r\n    print(f\"\\n{phase}:\")\r\n    for item in items:\r\n        print(f\"  - {item}\")\r\n```\r\n\r\n## Quick Reference\r\n\r\n| Component | Purpose |\r\n|-----------|---------|\r\n| `DataSiloDetector` | Main detection engine |\r\n| `DataSource` | Data source definition |\r\n| `DataSilo` | Detected silo with details |\r\n| `DuplicateData` | Duplicate data detection |\r\n| `SiloAnalysis` | Complete analysis results |\r\n| `SiloSeverity` | Severity classification |\r\n\r\n## Resources\r\n\r\n- **Book**: \"Data-Driven Construction\" by Artem Boiko, Chapter 1.2\r\n- **Website**: https://datadrivenconstruction.io\r\n\r\n## Next Steps\r\n\r\n- Use [erp-integration-analysis](../erp-integration-analysis/SKILL.md) for system integration\r\n- Use [data-evolution-analysis](../../Chapter-1.1/data-evolution-analysis/SKILL.md) for maturity assessment\r\n- Use [etl-pipeline](../../Chapter-4.2/etl-pipeline/SKILL.md) to connect silos\r\n","tags":{"latest":"2.1.0"},"stats":{"comments":0,"downloads":1316,"installsAllTime":49,"installsCurrent":0,"stars":0,"versions":2},"createdAt":1770475342659,"updatedAt":1778486073009},"latestVersion":{"version":"2.1.0","createdAt":1771168185264,"changelog":"- Added detailed domain models and Enum classes for data sources, silos, duplication, and analysis in construction organizations.\n- Introduced organization-wide detection of data silos, duplicate data, and integration gaps with customizable severity and recommendations.\n- Implemented comprehensive APIs for detecting isolated data sources, domain silos, duplicate data, flow gaps, and generating integration roadmaps.\n- Enhanced connectivity scoring and prioritization of actions to improve data integration opportunities.\n- Documentation now references the DDC methodology and relevant literature for domain alignment.","license":null},"metadata":{"setup":[],"os":["win32"],"systems":null},"owner":{"handle":"datadrivenconstruction","userId":"s1774mv3t1cm8r1kgs9hccdnmn8852nb","displayName":"datadrivenconstruction","image":"https://avatars.githubusercontent.com/u/94158709?v=4"},"moderation":null}