{"skill":{"slug":"mqttasgi","displayName":"mqttasgi - IOT backbone for Django","summary":"MQTT ASGI protocol server for Django — bridge MQTT messages to Django Channels consumers with full ORM, Channel Layers, and testing support. The perfect back...","description":"---\nname: mqttasgi\ndescription: MQTT ASGI protocol server for Django — bridge MQTT messages to Django Channels consumers with full ORM, Channel Layers, and testing support. The perfect backbone for your home automation projects, IoT pipelines, and real-time device integrations.\nversion: 1.0.0\nmetadata:\n  openclaw:\n    emoji: \"📡\"\n    homepage: https://github.com/sivulich/mqttasgi\n---\n\n# mqttasgi\n\nmqttasgi is an ASGI protocol server that bridges MQTT (via paho-mqtt) and Django Channels, inspired by Daphne. It lets Django consumers subscribe/publish to MQTT topics with full ORM and Channel Layers support.\n\nSupports: Django 3.2–5.x · Channels 3.x–4.x · paho-mqtt 1.x and 2.x · Python 3.9–3.13\n\n## Installation\n\n```bash\npip install mqttasgi\n```\n\n## Running the server\n\n```bash\nmqttasgi -H localhost -p 1883 my_application.asgi:application\n```\n\n| Parameter | Env variable | Default | Purpose |\n|-----------|-------------|---------|---------|\n| `-H / --host` | `MQTT_HOSTNAME` | `localhost` | MQTT broker host |\n| `-p / --port` | `MQTT_PORT` | `1883` | MQTT broker port |\n| `-U / --username` | `MQTT_USERNAME` | | Broker username |\n| `-P / --password` | `MQTT_PASSWORD` | | Broker password |\n| `-c / --cleansession` | `MQTT_CLEAN` | `True` | MQTT clean session |\n| `-v / --verbosity` | `VERBOSITY` | `0` | Logging level (0–2) |\n| `-i / --id` | `MQTT_CLIENT_ID` | | MQTT client ID |\n| `-C / --cert` | `TLS_CERT` | | TLS certificate |\n| `-K / --key` | `TLS_KEY` | | TLS key |\n| `-S / --cacert` | `TLS_CA` | | TLS CA certificate |\n| `-SSL / --use-ssl` | `MQTT_USE_SSL` | `False` | SSL without cert auth |\n| `-T / --transport` | `MQTT_TRANSPORT` | `tcp` | Transport: `tcp` or `websockets` |\n| `-r / --retries` | `MQTT_RETRIES` | `3` | Reconnect retries (0 = unlimited) |\n\nAll parameters can also be set via a `.env` file at the project root. CLI args take precedence over env vars.\n\n## asgi.py setup\n\n```python\nimport os\nimport django\nfrom channels.routing import ProtocolTypeRouter\nfrom my_application.consumers import MyMqttConsumer\nfrom django.core.asgi import get_asgi_application\n\nos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_application.settings')\ndjango.setup()\n\napplication = ProtocolTypeRouter({\n    'http': get_asgi_application(),\n    'mqtt': MyMqttConsumer.as_asgi(),\n})\n```\n\n## Writing a consumer\n\n```python\nfrom mqttasgi.consumers import MqttConsumer\n\nclass MyMqttConsumer(MqttConsumer):\n\n    async def connect(self):\n        \"\"\"Called when connected to the broker. Subscribe here.\"\"\"\n        await self.subscribe('my/topic', qos=2)\n\n    async def receive(self, mqtt_message):\n        \"\"\"Called for each incoming MQTT message.\"\"\"\n        topic   = mqtt_message['topic']\n        payload = mqtt_message['payload']   # bytes\n        qos     = mqtt_message['qos']\n        await self.publish('response/topic', payload, qos=1, retain=False)\n\n    async def disconnect(self):\n        \"\"\"Called on broker disconnect. Clean up here.\"\"\"\n        await self.unsubscribe('my/topic')\n```\n\n### Consumer API\n\n| Method | Description |\n|--------|-------------|\n| `await self.subscribe(topic, qos)` | Subscribe to an MQTT topic |\n| `await self.unsubscribe(topic)` | Unsubscribe from an MQTT topic |\n| `await self.publish(topic, payload, qos=1, retain=False)` | Publish an MQTT message |\n| `self.scope` | ASGI scope dict (includes `app_id`, `instance_type`, and any `consumer_parameters`) |\n\n## Channel Layers\n\n```python\n# Outside the consumer (e.g. Django view or management command)\nfrom channels.layers import get_channel_layer\nfrom asgiref.sync import async_to_sync\n\nchannel_layer = get_channel_layer()\nasync_to_sync(channel_layer.group_send)(\n    \"my.group\",\n    {\"type\": \"my.custom.message\", \"text\": \"Hello from outside\"}\n)\n\n# Inside the consumer\nclass MyMqttConsumer(MqttConsumer):\n\n    async def connect(self):\n        await self.subscribe('my/topic', qos=2)\n        await self.channel_layer.group_add(\"my.group\", self.channel_name)\n\n    async def my_custom_message(self, event):\n        # Handler name must match the `type` field (dots become underscores)\n        print('Channel layer message:', event)\n\n    async def receive(self, mqtt_message): ...\n    async def disconnect(self): ...\n```\n\n## Multiple workers (experimental)\n\nOnly the master consumer (`instance_type='master'`, `app_id=0`) may spawn or kill workers.\n\n```python\nclass MasterConsumer(MqttConsumer):\n\n    async def connect(self):\n        # Spawn a worker with a unique app_id\n        await self.spawn_worker(\n            app_id=1,\n            consumer_path='my_application.consumers.WorkerConsumer',\n            consumer_params={'device_id': 'sensor-01'},\n        )\n\n    async def receive(self, mqtt_message):\n        if condition:\n            await self.kill_worker(app_id=1)\n\n    async def disconnect(self): ...\n```\n\n## Testing (no broker required)\n\n`MqttComunicator` drives consumers directly through the ASGI interface — no running broker needed.\n\n### pytest.ini\n\n```ini\n[pytest]\nasyncio_mode = auto\n```\n\n### tests/conftest.py\n\n```python\nimport django\nfrom django.conf import settings\n\ndef pytest_configure(config):\n    if not settings.configured:\n        settings.configure(\n            SECRET_KEY='test-secret-key',\n            INSTALLED_APPS=['channels'],\n            DATABASES={},\n            CHANNEL_LAYERS={\n                'default': {\n                    'BACKEND': 'channels.layers.InMemoryChannelLayer',\n                }\n            },\n        )\n        django.setup()\n```\n\n### Writing tests\n\n```python\nimport pytest\nfrom mqttasgi.testing import MqttComunicator  # note: one 'm' in Comunicator\nfrom my_application.consumers import MyMqttConsumer\n\nasync def test_subscribe_on_connect():\n    comm = MqttComunicator(MyMqttConsumer.as_asgi(), app_id=1)\n    response = await comm.connect()          # returns first message from consumer\n    assert response['type'] == 'mqtt.sub'\n    assert response['mqtt']['topic'] == 'my/topic'\n    await comm.disconnect()\n\nasync def test_publish_on_message():\n    comm = MqttComunicator(MyMqttConsumer.as_asgi(), app_id=1)\n    await comm.connect()\n    await comm.publish('my/topic', b'hello', qos=1)\n    response = await comm.receive_from()    # next message from consumer\n    assert response['type'] == 'mqtt.pub'\n    assert response['mqtt']['payload'] == b'hello'\n    await comm.disconnect()\n```\n\n### MqttComunicator API\n\n| Method | Description |\n|--------|-------------|\n| `MqttComunicator(app, app_id, instance_type='worker', consumer_parameters=None)` | Create communicator |\n| `await comm.connect(timeout=1)` | Send `mqtt.connect`; returns first consumer response |\n| `await comm.publish(topic, payload, qos)` | Send `mqtt.msg` event to the consumer |\n| `await comm.receive_from(timeout=1)` | Receive next message the consumer sent |\n| `await comm.disconnect(timeout=1)` | Send `mqtt.disconnect` and wait for shutdown |\n\nConsumer responses have this shape:\n\n```python\n{\n    'type': 'mqtt.sub',   # or mqtt.pub / mqtt.usub\n    'mqtt': {\n        'topic': 'my/topic',\n        'payload': b'...',   # only for mqtt.pub\n        'qos': 1,\n    }\n}\n```\n\n## Internal message types (for advanced use)\n\n**Server → Consumer:** `mqtt.connect`, `mqtt.msg`, `mqtt.disconnect`\n\n**Consumer → Server:** `mqtt.pub`, `mqtt.sub`, `mqtt.usub`, `mqttasgi.worker.spawn`, `mqttasgi.worker.kill`\n\n## Project ideas and examples\n\n### Home automation — motion-triggered lights\n\nA motion sensor publishes to `home/sensor/motion`. A consumer listens and publishes a command to the light controller, logging every event to the Django ORM.\n\n```python\nfrom mqttasgi.consumers import MqttConsumer\nfrom myapp.models import MotionEvent\n\nclass LightAutomationConsumer(MqttConsumer):\n\n    async def connect(self):\n        await self.subscribe('home/sensor/motion', qos=1)\n\n    async def receive(self, mqtt_message):\n        room = mqtt_message['payload'].decode()\n        await MotionEvent.objects.acreate(room=room)\n        await self.publish(f'home/lights/{room}/set', b'on', qos=1)\n\n    async def disconnect(self):\n        await self.unsubscribe('home/sensor/motion')\n```\n\n---\n\n### AI-powered automation — ask Claude before acting\n\nRoute sensor data through Claude to decide what action to take. The consumer calls the Anthropic API and publishes the result back onto the MQTT bus.\n\n```python\nimport anthropic\nfrom mqttasgi.consumers import MqttConsumer\n\nclient = anthropic.Anthropic()\n\nclass AIAutomationConsumer(MqttConsumer):\n\n    async def connect(self):\n        await self.subscribe('home/sensor/#', qos=1)\n\n    async def receive(self, mqtt_message):\n        topic   = mqtt_message['topic']\n        payload = mqtt_message['payload'].decode()\n\n        message = client.messages.create(\n            model='claude-opus-4-6',\n            max_tokens=64,\n            messages=[{\n                'role': 'user',\n                'content': (\n                    f'Sensor reading — topic: {topic}, value: {payload}. '\n                    'Reply with only the MQTT topic and payload to publish, '\n                    'separated by a space. Example: home/lights/living on'\n                ),\n            }],\n        )\n        response = message.content[0].text.strip().split(' ', 1)\n        if len(response) == 2:\n            out_topic, out_payload = response\n            await self.publish(out_topic, out_payload.encode(), qos=1)\n\n    async def disconnect(self):\n        await self.unsubscribe('home/sensor/#')\n```\n\n---\n\n### Energy monitoring — store readings in Django, alert on threshold\n\nElectricity sensors publish consumption data every 30 seconds. The consumer persists each reading and fires an alert if usage spikes.\n\n```python\nfrom mqttasgi.consumers import MqttConsumer\nfrom myapp.models import EnergyReading\n\nALERT_THRESHOLD_WATTS = 3000\n\nclass EnergyMonitorConsumer(MqttConsumer):\n\n    async def connect(self):\n        await self.subscribe('home/energy/consumption', qos=1)\n\n    async def receive(self, mqtt_message):\n        watts = float(mqtt_message['payload'])\n        await EnergyReading.objects.acreate(watts=watts)\n        if watts > ALERT_THRESHOLD_WATTS:\n            await self.publish('home/alerts/energy', b'high_consumption', qos=2)\n\n    async def disconnect(self):\n        await self.unsubscribe('home/energy/consumption')\n```\n\n---\n\n### Multi-device coordination — workers per room\n\nSpawn a dedicated worker for each room so subscriptions and logic stay isolated. The master consumer manages the worker lifecycle.\n\n```python\nclass MasterConsumer(MqttConsumer):\n\n    ROOMS = ['living', 'bedroom', 'kitchen']\n\n    async def connect(self):\n        for i, room in enumerate(self.ROOMS, start=1):\n            await self.spawn_worker(\n                app_id=i,\n                consumer_path='myapp.consumers.RoomConsumer',\n                consumer_params={'room': room},\n            )\n\n    async def receive(self, mqtt_message): pass\n    async def disconnect(self): pass\n\n\nclass RoomConsumer(MqttConsumer):\n\n    async def connect(self):\n        room = self.scope['room']\n        await self.subscribe(f'home/{room}/#', qos=1)\n\n    async def receive(self, mqtt_message):\n        # Handle all topics for this room\n        ...\n\n    async def disconnect(self):\n        room = self.scope['room']\n        await self.unsubscribe(f'home/{room}/#')\n```\n\n---\n\n### Garden irrigation — schedule-aware automation\n\nCombine Django's ORM with MQTT to only water the garden when the schedule says so and soil moisture is below a threshold.\n\n```python\nfrom django.utils import timezone\nfrom mqttasgi.consumers import MqttConsumer\nfrom myapp.models import IrrigationSchedule\n\nclass IrrigationConsumer(MqttConsumer):\n\n    async def connect(self):\n        await self.subscribe('garden/sensor/moisture', qos=1)\n\n    async def receive(self, mqtt_message):\n        moisture = float(mqtt_message['payload'])\n        now = timezone.now()\n        scheduled = await IrrigationSchedule.objects.filter(\n            active=True,\n            start_hour=now.hour,\n        ).aexists()\n\n        if scheduled and moisture < 30.0:\n            await self.publish('garden/valve/main', b'open', qos=2)\n\n    async def disconnect(self):\n        await self.unsubscribe('garden/sensor/moisture')\n```\n\n---\n\n## Common pitfalls\n\n- `MqttComunicator.connect()` returns the **first message** the consumer sends. If `connect()` does nothing (no subscribe, no publish), the call will time out — always subscribe or send something in `connect()`.\n- The class is spelled `MqttComunicator` (one `m`) — this is an intentional (legacy) typo in the library.\n- Worker spawn/kill is only allowed from the master consumer (`app_id=0`). Calling it from a worker raises an error.\n- With mosquitto 2.x you need `allow_anonymous true` and an explicit `listener` line in `mosquitto.conf` for integration tests.\n- `connect_max_retries=0` means retry forever with exponential back-off (capped at 30 s).\n","topics":["Home Automation"],"tags":{"latest":"1.0.1"},"stats":{"comments":0,"downloads":371,"installsAllTime":14,"installsCurrent":0,"stars":0,"versions":2},"createdAt":1771687337589,"updatedAt":1778992111715},"latestVersion":{"version":"1.0.1","createdAt":1771689094430,"changelog":"Added examples","license":null},"metadata":{"setup":[],"os":null,"systems":null},"owner":{"handle":"sivulich","userId":"s178vwyehrete1r8gb20f7171h885cv0","displayName":"sivulich","image":"https://avatars.githubusercontent.com/u/18451152?v=4"},"moderation":null}