Telethon Master

API key required
MCP Tools

Работа с Telegram через Telethon MCP: отправка сообщений, парсинг истории, управление группами, скачивание медиа и генерация голосовых сообщений.

Install

openclaw skills install telethon-master

Telethon Master

Скилл для работы с Telegram MTProto через Telethon MCP. Предоставляет полный доступ к 8 инструментам telegram_*, генерации голосовых сообщений через TTS+FFmpeg, парсингу истории чатов и управлению группами.

Когда использовать

  • Пользователь хочет отправить сообщение через Telegram (не через бота)
  • Нужно получить историю чата или канала
  • Требуется управлять группами (создание, добавление участников, права)
  • Пользователь хочет скачать медиафайлы из Telegram
  • Нужно сгенерировать voice note из текста через TTS
  • Требуется автоматизировать рассылку по подписчикам
  • Пользователь спрашивает о работе MCP-инструментов Telethon

Инструкции

1. Доступные инструменты

Перед началом убедись, что MCP-сервер Telethon запущен. Доступны следующие инструменты:

ИнструментОписаниеВозвращает
telegram_send_messageОтправка текстового сообщения{ok: true, id: msg_id}
telegram_get_historyПолучение истории сообщенийСписок сообщений с метаданными
telegram_get_dialogИнформация о диалогеИнфо о чате/канале/пользователе
telegram_download_mediaСкачивание медиафайлаПуть к файлу
telegram_upload_mediaЗагрузка медиафайлаID загруженного файла
telegram_create_groupСоздание группыID группы
telegram_add_userДобавление пользователя в группуРезультат операции
telegram_set_group_titleИзменение названия группы{ok: true}

2. Базовые операции

Отправка сообщения:

import asyncio
import json

async def send_telegram_message(text: str, chat_id: str | int) -> dict:
    """Отправляет сообщение через MCP Telethon."""
    payload = {
        "chat_id": str(chat_id),
        "text": text,
        "parse_mode": "markdown",
        "disable_web_page_preview": False,
        "disable_notification": False,
    }
    # вызов инструмента telegram_send_message
    result = await call_mcp_tool("telegram_send_message", payload)
    return json.loads(result)

Форматирование текста:

СтильМаркдаунHTML
Жирный**текст**<b>текст</b>
Курсив__текст__<i>текст</i>
Моноширный`код`<code>код</code>
Спойлер`
Ссылка[текст](url)<a href="url">текст</a>

3. Получение истории

Парсинг чата:

async def get_chat_history(
    chat_id: str | int,
    limit: int = 100,
    offset_id: int = 0,
    min_id: int = 0,
) -> list[dict]:
    """Получает историю сообщений чата."""
    payload = {
        "chat_id": str(chat_id),
        "limit": min(limit, 1000),
    }
    if offset_id:
        payload["offset_id"] = offset_id
    if min_id:
        payload["min_id"] = min_id

    raw = await call_mcp_tool("telegram_get_history", payload)
    messages = json.loads(raw)

    result = []
    for msg in messages:
        result.append(
            {
                "id": msg.get("id"),
                "date": msg.get("date"),
                "from_id": msg.get("from_id"),
                "text": msg.get("text", ""),
                "media": msg.get("media"),
                "reply_to": msg.get("reply_to"),
                "views": msg.get("views"),
                "forwards": msg.get("forwards"),
            }
        )
    return result

Фильтрация сообщений:

def filter_messages(
    messages: list[dict],
    keywords: list[str] | None = None,
    from_user: str | int | None = None,
    after_date: str | None = None,
    has_media: bool | None = None,
) -> list[dict]:
    """Фильтрует сообщения по различным критериям."""
    result = messages

    if keywords:
        result = [
            m for m in result
            if any(k.lower() in m["text"].lower() for k in keywords)
        ]

    if from_user:
        result = [
            m for m in result
            if str(m.get("from_id", "")) == str(from_user)
        ]

    if after_date:
        result = [
            m for m in result
            if m.get("date", "") >= after_date
        ]

    if has_media is not None:
        result = [
            m for m in result
            if (m.get("media") is not None) == has_media
        ]

    return result

4. TTS → Voice Note (голосовые сообщения)

Генерация голосового сообщения из текста:

import asyncio
import os
import tempfile
from pathlib import Path


async def text_to_voice_note(
    text: str,
    chat_id: str | int,
    tts_provider: str = "edge",
    voice: str = "ru-RU-SvetlanaNeural",
    speed: float = 1.0,
    quality: str = "low",
) -> dict:
    """
    Генерирует voice note из текста и отправляет в чат.

    Поддерживаемые TTS-провайдеры:
    - edge: Microsoft Edge TTS (бесплатно, ~50 голосов)
    - elevenlabs: ElevenLabs API (качественно, нужен ключ)
    - silero: локальный Silero TTS (офлайн, только русский)

    Требования: FFmpeg должен быть установлен в системе.
    """
    audio_path = None
    ogg_path = None
    try:
        if tts_provider == "edge":
            audio_path = await _edge_tts(text, voice, speed)
        elif tts_provider == "elevenlabs":
            audio_path = await _elevenlabs_tts(text, voice, speed)
        elif tts_provider == "silero":
            audio_path = await _silero_tts(text, voice, speed)
        else:
            raise ValueError(f"Unknown TTS provider: {tts_provider}")

        # конвертация в OGG Opus (формат голосовых сообщений)
        ogg_path = audio_path.with_suffix(".ogg")
        bitrate = "16k" if quality == "low" else "48k"

        proc = await asyncio.create_subprocess_exec(
            "ffmpeg",
            "-i", str(audio_path),
            "-c:a", "libopus",
            "-b:a", bitrate,
            "-ar", "24000",
            "-ac", "1",
            "-y",
            str(ogg_path),
            stdout=asyncio.subprocess.DEVNULL,
            stderr=asyncio.subprocess.DEVNULL,
        )
        await proc.wait()

        if proc.returncode != 0:
            raise RuntimeError(f"FFmpeg failed with code {proc.returncode}")

        # загрузка в Telegram
        upload_result = await call_mcp_tool(
            "telegram_upload_media",
            {
                "file_path": str(ogg_path),
                "caption": "",
            },
        )

        # отправка как voice note
        send_result = await call_mcp_tool(
            "telegram_send_message",
            {
                "chat_id": str(chat_id),
                "text": "🎤",
                "media_id": json.loads(upload_result).get("id"),
                "media_type": "voice",
            },
        )

        return json.loads(send_result)

    finally:
        for p in [audio_path, ogg_path]:
            if p and p.exists():
                os.unlink(p)


async def _edge_tts(text: str, voice: str, speed: float) -> Path:
    """Microsoft Edge TTS (через edge-tts библиотеку)."""
    import edge_tts

    tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
    tmp_path = Path(tmp.name)
    tmp.close()

    communicate = edge_tts.Communicate(text, voice=voice, rate=f"{int((speed - 1.0) * 100):+d}%")
    await communicate.save(str(tmp_path))
    return tmp_path


async def _elevenlabs_tts(text: str, voice: str, speed: float) -> Path:
    """ElevenLabs TTS (через HTTP API)."""
    import aiohttp

    api_key = os.environ.get("ELEVENLABS_API_KEY", "")
    if not api_key:
        raise RuntimeError("ELEVENLABS_API_KEY not set")

    url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice}"

    headers = {
        "Accept": "audio/mpeg",
        "Content-Type": "application/json",
        "xi-api-key": api_key,
    }

    data = {
        "text": text,
        "model_id": "eleven_monolingual_v1",
        "voice_settings": {
            "stability": 0.5,
            "similarity_boost": 0.75,
            "speed": speed,
        },
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=data, headers=headers) as resp:
            if resp.status != 200:
                raise RuntimeError(f"ElevenLabs API error: {resp.status}")
            tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
            tmp_path = Path(tmp.name)
            tmp.close()
            with open(tmp_path, "wb") as f:
                f.write(await resp.read())
            return tmp_path


async def _silero_tts(text: str, voice: str, speed: float) -> Path:
    """Локальный Silero TTS (офлайн, русский)."""
    import torch

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model, _ = torch.hub.load(
        repo_or_dir="snakers4/silero-models",
        model="silero_tts",
        language="ru",
        speaker="v3_1_ru",
    )
    model.to(device)

    tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
    tmp_path = Path(tmp.name)
    tmp.close()

    audio = model.apply_tts(text=text, speaker=voice, sample_rate=24000)
    import scipy.io.wavfile as wav

    wav.write(str(tmp_path), 24000, audio.numpy())
    return tmp_path

5. Управление группами

Создание и настройка группы:

async def create_telegram_group(
    title: str,
    description: str = "",
    members: list[str] | None = None,
    restricted: bool = False,
) -> dict:
    """Создаёт Telegram-группу с описанием и участниками."""
    result = await call_mcp_tool(
        "telegram_create_group",
        {
            "title": title,
            "members": members or [],
        },
    )
    group_data = json.loads(result)
    group_id = group_data.get("id")

    if group_id and description:
        await call_mcp_tool(
            "telegram_send_message",
            {
                "chat_id": str(group_id),
                "text": f"Описание группы:\n{description}",
            },
        )

    return group_data


async def set_group_permissions(
    group_id: str | int,
    send_messages: bool = True,
    send_media: bool = True,
    send_stickers: bool = True,
    send_polls: bool = False,
    add_users: bool = False,
    pin_messages: bool = False,
    change_info: bool = False,
) -> dict:
    """Устанавливает права участников группы (требуется админ)."""
    payload = {
        "chat_id": str(group_id),
        "permissions": {
            "can_send_messages": send_messages,
            "can_send_media_messages": send_media,
            "can_send_stickers": send_stickers,
            "can_send_polls": send_polls,
            "can_add_web_page_previews": send_media,
            "can_invite_users": add_users,
            "can_pin_messages": pin_messages,
            "can_change_info": change_info,
        },
    }
    # используем set_permissions через сырой MTProto вызов
    return await call_mcp_tool("telegram_call_method", {
        "method": "messages.editChatDefaultBannedRights",
        "params": {"peer": str(group_id), "banned_rights": _banned_rights(payload["permissions"])},
    })

6. Скачивание и загрузка медиа

async def download_media(
    chat_id: str | int,
    message_id: int,
    output_dir: str = "./downloads",
) -> Path:
    """Скачивает медиафайл из сообщения."""
    result = await call_mcp_tool(
        "telegram_download_media",
        {
            "chat_id": str(chat_id),
            "message_id": message_id,
            "output_dir": output_dir,
        },
    )
    data = json.loads(result)
    return Path(data.get("path", ""))


async def send_photo(
    chat_id: str | int,
    file_path: str,
    caption: str = "",
) -> dict:
    """Отправляет фото с подписью."""
    upload = await call_mcp_tool(
        "telegram_upload_media",
        {
            "file_path": file_path,
            "caption": caption,
        },
    )
    upload_data = json.loads(upload)

    result = await call_mcp_tool(
        "telegram_send_message",
        {
            "chat_id": str(chat_id),
            "text": caption,
            "media_id": upload_data.get("id"),
            "media_type": "photo",
        },
    )
    return json.loads(result)

7. Поиск и анализ

Поиск по всем диалогам:

async def search_all_chats(keyword: str, limit: int = 10) -> list[dict]:
    """Ищет сообщения по ключевому слову во всех доступных чатах."""
    dialogs_raw = await call_mcp_tool("telegram_get_dialog", {"limit": 50})
    dialogs = json.loads(dialogs_raw)

    results = []
    for dialog in dialogs:
        chat_id = dialog.get("id")
        history = await get_chat_history(chat_id, limit=100)

        for msg in history:
            if keyword.lower() in msg["text"].lower():
                results.append(
                    {
                        "chat": dialog.get("title", dialog.get("name", "")),
                        "chat_id": chat_id,
                        "message_id": msg["id"],
                        "text": msg["text"][:200],
                        "date": msg["date"],
                    }
                )
                if len(results) >= limit:
                    return results
    return results

Анализ активности в группе:

async def analyze_group_activity(chat_id: str | int, days: int = 7) -> dict:
    """Анализирует активность участников группы за N дней."""
    history = await get_chat_history(chat_id, limit=1000)

    from collections import Counter
    from datetime import datetime, timedelta

    cutoff = datetime.now() - timedelta(days=days)

    active_users = Counter()
    hourly_activity = Counter()
    media_count = 0
    total = 0

    for msg in history:
        msg_date = msg.get("date", "")
        if msg_date < cutoff.isoformat():
            continue

        total += 1
        if msg.get("media"):
            media_count += 1

        user_id = str(msg.get("from_id", "unknown"))
        active_users[user_id] += 1

        try:
            hour = datetime.fromisoformat(msg_date).hour
            hourly_activity[hour] += 1
        except (ValueError, TypeError):
            pass

    return {
        "total_messages": total,
        "unique_users": len(active_users),
        "media_percentage": round(media_count / total * 100, 1) if total else 0,
        "top_users": active_users.most_common(10),
        "peak_hours": [h for h, _ in hourly_activity.most_common(5)],
        "avg_messages_per_day": round(total / days, 1),
    }

8. Безопасность и ограничения

ПравилоОписание
Никакой спамНе отправлять рекламные сообщения без явного запроса пользователя
Чувствительные данныеНе логировать содержимое сообщений, только метаданные
Rate limitingНе более 30 сообщений в минуту через одного юзера
Размер файловFFmpeg конвертация: не превышать 50 MB финальный файл
УдалениеНе удалять сообщения пользователей без подтверждения
Приватные чатыНе читать ЛС без явного разрешения владельца аккаунта

9. Обработка ошибок

СитуацияДействие
FLOOD_WAIT_XПодождать X секунд, повторить. X > 300 — сообщить пользователю
PHONE_NUMBER_INVALIDПроверить формат номера (+ и код страны)
USER_PRIVACY_RESTRICTEDНельзя добавить пользователя — сообщить, предложить пригласительную ссылку
CHAT_ADMIN_REQUIREDНет прав администратора — сообщить
MEDIA_EMPTYСообщение не содержит медиа — предложить получить историю
FILE_REFERENCE_EXPIREDПерезапросить историю и повторить скачивание

10. Вспомогательные функции

def _banned_rights(permissions: dict) -> dict:
    """Конвертирует права в banned_rights формат MTProto."""
    mapping = {
        "can_send_messages": "send_messages",
        "can_send_media_messages": "send_media",
        "can_send_stickers": "send_stickers",
        "can_send_polls": "send_polls",
        "can_add_web_page_previews": "embed_links",
        "can_invite_users": "invite_users",
        "can_pin_messages": "pin_messages",
        "can_change_info": "change_info",
    }
    return {
        flag: not permissions.get(perm, True)
        for perm, flag in mapping.items()
    }


async def call_mcp_tool(tool_name: str, params: dict) -> str:
    """Вызывает MCP-инструмент Telethon через стандартный протокол."""
    import subprocess
    import json

    cmd = [
        "python3", "-m", "openclaw", "call",
        "--tool", tool_name,
        "--params", json.dumps(params),
    ]
    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()
    if proc.returncode != 0:
        raise RuntimeError(f"MCP call failed: {stderr.decode()}")
    return stdout.decode()


def format_telegram_user(user_data: dict) -> str:
    """Форматирует данные пользователя для отображения."""
    parts = [
        user_data.get("first_name", ""),
        user_data.get("last_name", ""),
    ]
    name = " ".join(filter(None, parts)).strip()
    username = user_data.get("username", "")
    user_id = user_data.get("id", "")

    result = name or "No name"
    if username:
        result += f" (@{username})"
    result += f" — ID: {user_id}"
    return result

Референсы

  • Документация Telethon: https://docs.telethon.dev/
  • MTProto API: https://core.telegram.org/mtproto
  • FFmpeg документация: https://ffmpeg.org/documentation.html
  • edge-tts: https://github.com/rany2/edge-tts
  • Silero TTS: https://github.com/snakers4/silero-models
  • Права чатов: https://core.telegram.org/constructor/chatBannedRights
  • Формат голосовых сообщений: OGG Opus, 24kHz, mono, bitrate 16-48k