Install
openclaw skills install telethon-masterРабота с Telegram через Telethon MCP: отправка сообщений, парсинг истории, управление группами, скачивание медиа и генерация голосовых сообщений.
openclaw skills install telethon-masterСкилл для работы с Telegram MTProto через Telethon MCP. Предоставляет полный доступ к 8 инструментам telegram_*, генерации голосовых сообщений через TTS+FFmpeg, парсингу истории чатов и управлению группами.
Перед началом убедись, что MCP-сервер Telethon запущен. Доступны следующие инструменты:
| Инструмент | Описание | Возвращает |
|---|---|---|
telegram_send_message | Отправка текстового сообщения | {ok: true, id: msg_id} |
telegram_get_history | Получение истории сообщений | Список сообщений с метаданными |
telegram_get_dialog | Информация о диалоге | Инфо о чате/канале/пользователе |
telegram_download_media | Скачивание медиафайла | Путь к файлу |
telegram_upload_media | Загрузка медиафайла | ID загруженного файла |
telegram_create_group | Создание группы | ID группы |
telegram_add_user | Добавление пользователя в группу | Результат операции |
telegram_set_group_title | Изменение названия группы | {ok: true} |
Отправка сообщения:
import asyncio
import json
async def send_telegram_message(text: str, chat_id: str | int) -> dict:
"""Отправляет сообщение через MCP Telethon."""
payload = {
"chat_id": str(chat_id),
"text": text,
"parse_mode": "markdown",
"disable_web_page_preview": False,
"disable_notification": False,
}
# вызов инструмента telegram_send_message
result = await call_mcp_tool("telegram_send_message", payload)
return json.loads(result)
Форматирование текста:
| Стиль | Маркдаун | HTML |
|---|---|---|
| Жирный | **текст** | <b>текст</b> |
| Курсив | __текст__ | <i>текст</i> |
| Моноширный | `код` | <code>код</code> |
| Спойлер | ` | |
| Ссылка | [текст](url) | <a href="url">текст</a> |
Парсинг чата:
async def get_chat_history(
chat_id: str | int,
limit: int = 100,
offset_id: int = 0,
min_id: int = 0,
) -> list[dict]:
"""Получает историю сообщений чата."""
payload = {
"chat_id": str(chat_id),
"limit": min(limit, 1000),
}
if offset_id:
payload["offset_id"] = offset_id
if min_id:
payload["min_id"] = min_id
raw = await call_mcp_tool("telegram_get_history", payload)
messages = json.loads(raw)
result = []
for msg in messages:
result.append(
{
"id": msg.get("id"),
"date": msg.get("date"),
"from_id": msg.get("from_id"),
"text": msg.get("text", ""),
"media": msg.get("media"),
"reply_to": msg.get("reply_to"),
"views": msg.get("views"),
"forwards": msg.get("forwards"),
}
)
return result
Фильтрация сообщений:
def filter_messages(
messages: list[dict],
keywords: list[str] | None = None,
from_user: str | int | None = None,
after_date: str | None = None,
has_media: bool | None = None,
) -> list[dict]:
"""Фильтрует сообщения по различным критериям."""
result = messages
if keywords:
result = [
m for m in result
if any(k.lower() in m["text"].lower() for k in keywords)
]
if from_user:
result = [
m for m in result
if str(m.get("from_id", "")) == str(from_user)
]
if after_date:
result = [
m for m in result
if m.get("date", "") >= after_date
]
if has_media is not None:
result = [
m for m in result
if (m.get("media") is not None) == has_media
]
return result
Генерация голосового сообщения из текста:
import asyncio
import os
import tempfile
from pathlib import Path
async def text_to_voice_note(
text: str,
chat_id: str | int,
tts_provider: str = "edge",
voice: str = "ru-RU-SvetlanaNeural",
speed: float = 1.0,
quality: str = "low",
) -> dict:
"""
Генерирует voice note из текста и отправляет в чат.
Поддерживаемые TTS-провайдеры:
- edge: Microsoft Edge TTS (бесплатно, ~50 голосов)
- elevenlabs: ElevenLabs API (качественно, нужен ключ)
- silero: локальный Silero TTS (офлайн, только русский)
Требования: FFmpeg должен быть установлен в системе.
"""
audio_path = None
ogg_path = None
try:
if tts_provider == "edge":
audio_path = await _edge_tts(text, voice, speed)
elif tts_provider == "elevenlabs":
audio_path = await _elevenlabs_tts(text, voice, speed)
elif tts_provider == "silero":
audio_path = await _silero_tts(text, voice, speed)
else:
raise ValueError(f"Unknown TTS provider: {tts_provider}")
# конвертация в OGG Opus (формат голосовых сообщений)
ogg_path = audio_path.with_suffix(".ogg")
bitrate = "16k" if quality == "low" else "48k"
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
"-i", str(audio_path),
"-c:a", "libopus",
"-b:a", bitrate,
"-ar", "24000",
"-ac", "1",
"-y",
str(ogg_path),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError(f"FFmpeg failed with code {proc.returncode}")
# загрузка в Telegram
upload_result = await call_mcp_tool(
"telegram_upload_media",
{
"file_path": str(ogg_path),
"caption": "",
},
)
# отправка как voice note
send_result = await call_mcp_tool(
"telegram_send_message",
{
"chat_id": str(chat_id),
"text": "🎤",
"media_id": json.loads(upload_result).get("id"),
"media_type": "voice",
},
)
return json.loads(send_result)
finally:
for p in [audio_path, ogg_path]:
if p and p.exists():
os.unlink(p)
async def _edge_tts(text: str, voice: str, speed: float) -> Path:
"""Microsoft Edge TTS (через edge-tts библиотеку)."""
import edge_tts
tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
tmp_path = Path(tmp.name)
tmp.close()
communicate = edge_tts.Communicate(text, voice=voice, rate=f"{int((speed - 1.0) * 100):+d}%")
await communicate.save(str(tmp_path))
return tmp_path
async def _elevenlabs_tts(text: str, voice: str, speed: float) -> Path:
"""ElevenLabs TTS (через HTTP API)."""
import aiohttp
api_key = os.environ.get("ELEVENLABS_API_KEY", "")
if not api_key:
raise RuntimeError("ELEVENLABS_API_KEY not set")
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice}"
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": api_key,
}
data = {
"text": text,
"model_id": "eleven_monolingual_v1",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75,
"speed": speed,
},
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data, headers=headers) as resp:
if resp.status != 200:
raise RuntimeError(f"ElevenLabs API error: {resp.status}")
tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
tmp_path = Path(tmp.name)
tmp.close()
with open(tmp_path, "wb") as f:
f.write(await resp.read())
return tmp_path
async def _silero_tts(text: str, voice: str, speed: float) -> Path:
"""Локальный Silero TTS (офлайн, русский)."""
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model, _ = torch.hub.load(
repo_or_dir="snakers4/silero-models",
model="silero_tts",
language="ru",
speaker="v3_1_ru",
)
model.to(device)
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tmp_path = Path(tmp.name)
tmp.close()
audio = model.apply_tts(text=text, speaker=voice, sample_rate=24000)
import scipy.io.wavfile as wav
wav.write(str(tmp_path), 24000, audio.numpy())
return tmp_path
Создание и настройка группы:
async def create_telegram_group(
title: str,
description: str = "",
members: list[str] | None = None,
restricted: bool = False,
) -> dict:
"""Создаёт Telegram-группу с описанием и участниками."""
result = await call_mcp_tool(
"telegram_create_group",
{
"title": title,
"members": members or [],
},
)
group_data = json.loads(result)
group_id = group_data.get("id")
if group_id and description:
await call_mcp_tool(
"telegram_send_message",
{
"chat_id": str(group_id),
"text": f"Описание группы:\n{description}",
},
)
return group_data
async def set_group_permissions(
group_id: str | int,
send_messages: bool = True,
send_media: bool = True,
send_stickers: bool = True,
send_polls: bool = False,
add_users: bool = False,
pin_messages: bool = False,
change_info: bool = False,
) -> dict:
"""Устанавливает права участников группы (требуется админ)."""
payload = {
"chat_id": str(group_id),
"permissions": {
"can_send_messages": send_messages,
"can_send_media_messages": send_media,
"can_send_stickers": send_stickers,
"can_send_polls": send_polls,
"can_add_web_page_previews": send_media,
"can_invite_users": add_users,
"can_pin_messages": pin_messages,
"can_change_info": change_info,
},
}
# используем set_permissions через сырой MTProto вызов
return await call_mcp_tool("telegram_call_method", {
"method": "messages.editChatDefaultBannedRights",
"params": {"peer": str(group_id), "banned_rights": _banned_rights(payload["permissions"])},
})
async def download_media(
chat_id: str | int,
message_id: int,
output_dir: str = "./downloads",
) -> Path:
"""Скачивает медиафайл из сообщения."""
result = await call_mcp_tool(
"telegram_download_media",
{
"chat_id": str(chat_id),
"message_id": message_id,
"output_dir": output_dir,
},
)
data = json.loads(result)
return Path(data.get("path", ""))
async def send_photo(
chat_id: str | int,
file_path: str,
caption: str = "",
) -> dict:
"""Отправляет фото с подписью."""
upload = await call_mcp_tool(
"telegram_upload_media",
{
"file_path": file_path,
"caption": caption,
},
)
upload_data = json.loads(upload)
result = await call_mcp_tool(
"telegram_send_message",
{
"chat_id": str(chat_id),
"text": caption,
"media_id": upload_data.get("id"),
"media_type": "photo",
},
)
return json.loads(result)
Поиск по всем диалогам:
async def search_all_chats(keyword: str, limit: int = 10) -> list[dict]:
"""Ищет сообщения по ключевому слову во всех доступных чатах."""
dialogs_raw = await call_mcp_tool("telegram_get_dialog", {"limit": 50})
dialogs = json.loads(dialogs_raw)
results = []
for dialog in dialogs:
chat_id = dialog.get("id")
history = await get_chat_history(chat_id, limit=100)
for msg in history:
if keyword.lower() in msg["text"].lower():
results.append(
{
"chat": dialog.get("title", dialog.get("name", "")),
"chat_id": chat_id,
"message_id": msg["id"],
"text": msg["text"][:200],
"date": msg["date"],
}
)
if len(results) >= limit:
return results
return results
Анализ активности в группе:
async def analyze_group_activity(chat_id: str | int, days: int = 7) -> dict:
"""Анализирует активность участников группы за N дней."""
history = await get_chat_history(chat_id, limit=1000)
from collections import Counter
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(days=days)
active_users = Counter()
hourly_activity = Counter()
media_count = 0
total = 0
for msg in history:
msg_date = msg.get("date", "")
if msg_date < cutoff.isoformat():
continue
total += 1
if msg.get("media"):
media_count += 1
user_id = str(msg.get("from_id", "unknown"))
active_users[user_id] += 1
try:
hour = datetime.fromisoformat(msg_date).hour
hourly_activity[hour] += 1
except (ValueError, TypeError):
pass
return {
"total_messages": total,
"unique_users": len(active_users),
"media_percentage": round(media_count / total * 100, 1) if total else 0,
"top_users": active_users.most_common(10),
"peak_hours": [h for h, _ in hourly_activity.most_common(5)],
"avg_messages_per_day": round(total / days, 1),
}
| Правило | Описание |
|---|---|
| Никакой спам | Не отправлять рекламные сообщения без явного запроса пользователя |
| Чувствительные данные | Не логировать содержимое сообщений, только метаданные |
| Rate limiting | Не более 30 сообщений в минуту через одного юзера |
| Размер файлов | FFmpeg конвертация: не превышать 50 MB финальный файл |
| Удаление | Не удалять сообщения пользователей без подтверждения |
| Приватные чаты | Не читать ЛС без явного разрешения владельца аккаунта |
| Ситуация | Действие |
|---|---|
FLOOD_WAIT_X | Подождать X секунд, повторить. X > 300 — сообщить пользователю |
PHONE_NUMBER_INVALID | Проверить формат номера (+ и код страны) |
USER_PRIVACY_RESTRICTED | Нельзя добавить пользователя — сообщить, предложить пригласительную ссылку |
CHAT_ADMIN_REQUIRED | Нет прав администратора — сообщить |
MEDIA_EMPTY | Сообщение не содержит медиа — предложить получить историю |
FILE_REFERENCE_EXPIRED | Перезапросить историю и повторить скачивание |
def _banned_rights(permissions: dict) -> dict:
"""Конвертирует права в banned_rights формат MTProto."""
mapping = {
"can_send_messages": "send_messages",
"can_send_media_messages": "send_media",
"can_send_stickers": "send_stickers",
"can_send_polls": "send_polls",
"can_add_web_page_previews": "embed_links",
"can_invite_users": "invite_users",
"can_pin_messages": "pin_messages",
"can_change_info": "change_info",
}
return {
flag: not permissions.get(perm, True)
for perm, flag in mapping.items()
}
async def call_mcp_tool(tool_name: str, params: dict) -> str:
"""Вызывает MCP-инструмент Telethon через стандартный протокол."""
import subprocess
import json
cmd = [
"python3", "-m", "openclaw", "call",
"--tool", tool_name,
"--params", json.dumps(params),
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"MCP call failed: {stderr.decode()}")
return stdout.decode()
def format_telegram_user(user_data: dict) -> str:
"""Форматирует данные пользователя для отображения."""
parts = [
user_data.get("first_name", ""),
user_data.get("last_name", ""),
]
name = " ".join(filter(None, parts)).strip()
username = user_data.get("username", "")
user_id = user_data.get("id", "")
result = name or "No name"
if username:
result += f" (@{username})"
result += f" — ID: {user_id}"
return result
https://docs.telethon.dev/https://core.telegram.org/mtprotohttps://ffmpeg.org/documentation.htmlhttps://github.com/rany2/edge-ttshttps://github.com/snakers4/silero-modelshttps://core.telegram.org/constructor/chatBannedRights