1→""" 2→ClawdBot Telegram Skill 3→======================== 4→ 5→Non-blocking Telegram bot integration for Genesis system. 6→ 7→Features: 8→- Async-compatible with main event loop 9→- Voice note reception and forwarding 10→- Text message handling 11→- Graceful shutdown support 12→ 13→Usage: 14→ The TelegramSkill is initialized by ClawdBot main.py if TELEGRAM_BOT_TOKEN 15→ is set in the environment. Messages are forwarded to Genesis via Redis. 16→""" 17→ 18→import logging 19→import os 20→import asyncio 21→from datetime import datetime, timezone 22→from typing import Optional, Dict, Any 23→from telegram import Update 24→from telegram.ext import ( 25→ ApplicationBuilder, 26→ ContextTypes, 27→ CommandHandler, 28→ MessageHandler, 29→ filters 30→) 31→ 32→# Use the new enterprise Redis client 33→import sys 34→sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 35→from ClawdBot.redis_client_v2 import redis_client 36→ 37→logger = logging.getLogger("ClawdBotTelegram") 38→ 39→CH_OBSERVATIONS = "genesis:observations" 40→CH_COMMANDS = "genesis:commands" 41→ 42→ 43→class TelegramSkill: 44→ """ 45→ Non-blocking Telegram bot skill for ClawdBot. 46→ 47→ Key improvements over previous version: 48→ - Uses async polling that doesn't block the event loop 49→ - Proper graceful shutdown handling 50→ - Uses enterprise Redis client (v2) 51→ - Better error handling and logging 52→ """ 53→ 54→ def __init__(self): 55→ self.token = os.getenv("TELEGRAM_BOT_TOKEN") 56→ self.app = None 57→ self._running = False 58→ 59→ if not self.token: 60→ logger.warning("TELEGRAM_BOT_TOKEN not found. Telegram Skill disabled.") 61→ return 62→ 63→ try: 64→ self.app = ApplicationBuilder().token(self.token).build() 65→ self._setup_handlers() 66→ logger.info("Telegram Skill initialized") 67→ except Exception as e: 68→ logger.error(f"Failed to initialize Telegram: {e}") 69→ self.app = None 70→ 71→ def _setup_handlers(self): 72→ """Register message handlers.""" 73→ if not self.app: 74→ return 75→ 76→ # Command handlers 77→ self.app.add_handler(CommandHandler('start', self._handle_start)) 78→ self.app.add_handler(CommandHandler('status', self._handle_status)) 79→ self.app.add_handler(CommandHandler('help', self._handle_help)) 80→ 81→ # Message handlers 82→ self.app.add_handler(MessageHandler(filters.VOICE, self._handle_voice)) 83→ self.app.add_handler(MessageHandler(filters.AUDIO, self._handle_audio)) 84→ self.app.add_handler(MessageHandler(filters.PHOTO, self._handle_photo)) 85→ self.app.add_handler(MessageHandler( 86→ filters.TEXT & (~filters.COMMAND), 87→ self._handle_text 88→ )) 89→ 90→ # Error handler 91→ self.app.add_error_handler(self._handle_error) 92→ 93→ async def _handle_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): 94→ """Handle /start command.""" 95→ user = update.effective_user 96→ await context.bot.send_message( 97→ chat_id=update.effective_chat.id, 98→ text=( 99→ f"Greetings, {user.first_name}. I am ClawdBot, the Genesis Interface Agent.\n\n" 100→ "I can receive:\n" 101→ "- Voice messages (forwarded to Genesis)\n" 102→ "- Text commands\n" 103→ "- Screenshots\n\n" 104→ "Use /help for available commands." 105→ ) 106→ ) 107→ 108→ # Log connection 109→ self._publish_observation("system", { 110→ "event": "telegram_user_connected", 111→ "user_id": user.id, 112→ "username": user.username, 113→ "first_name": user.first_name 114→ }) 115→ 116→ async def _handle_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE): 117→ """Handle /status command.""" 118→ # Check Redis connection 119→ redis_ok = redis_client.is_connected() 120→ 121→ status_text = ( 122→ "**ClawdBot Status**\n" 123→ f"- Redis: {'Connected' if redis_ok else 'Disconnected'}\n" 124→ f"- Telegram: Connected\n" 125→ f"- Time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}" 126→ ) 127→ 128→ await context.bot.send_message( 129→ chat_id=update.effective_chat.id, 130→ text=status_text, 131→ parse_mode='Markdown' 132→ ) 133→ 134→ async def _handle_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE): 135→ """Handle /help command.""" 136→ help_text = ( 137→ "**ClawdBot Commands**\n\n" 138→ "/start - Initialize connection\n" 139→ "/status - Check system status\n" 140→ "/help - Show this help\n\n" 141→ "**Message Types**\n" 142→ "- Send voice notes for processing\n" 143→ "- Send text messages as commands\n" 144→ "- Send photos for analysis" 145→ ) 146→ 147→ await context.bot.send_message( 148→ chat_id=update.effective_chat.id, 149→ text=help_text, 150→ parse_mode='Markdown' 151→ ) 152→ 153→ async def _handle_voice(self, update: Update, context: ContextTypes.DEFAULT_TYPE): 154→ """Handle voice notes.""" 155→ voice_file = await update.message.voice.get_file() 156→ user = update.effective_user 157→ 158→ logger.info(f"Received voice note from {user.first_name} ({voice_file.file_size} bytes)") 159→ 160→ "mime_type": update.message.voice.mime_type 161→ }) 162→ 163→ # Download file locally 164→ try: 165→ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 166→ save_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "voice_input") 167→ os.makedirs(save_dir, exist_ok=True) 168→ 169→ file_path = os.path.join(save_dir, f"voice_{user.id}_{timestamp}.ogg") 170→ await voice_file.download_to_drive(file_path) 171→ 172→ # Update observation with file path 173→ self._publish_observation("voice_note", { 174→ "user_id": user.id, 175→ "username": user.username, 176→ "first_name": user.first_name, 177→ "file_path": file_path, # Critical for processing 178→ "file_id": voice_file.file_id, 179→ "duration": update.message.voice.duration, 180→ "chat_id": update.effective_chat.id 181→ }) 182→ 183→ logger.info(f"Voice note saved to {file_path}") 184→ 185→ except Exception as e: 186→ logger.error(f"Failed to download voice note: {e}") 187→ 188→ await context.bot.send_message( 189→ chat_id=update.effective_chat.id, 190→ text="Voice note received. Transmitting to Genesis Cortex..." 191→ ) 192→ 193→ async def _handle_audio(self, update: Update, context: ContextTypes.DEFAULT_TYPE): 194→ """Handle audio files.""" 195→ audio = update.message.audio 196→ audio_file = await audio.get_file() 197→ user = update.effective_user 198→ 199→ logger.info(f"Received audio from {user.first_name}: {audio.title or 'Untitled'}") 200→ 201→ self._publish_observation("audio", { 202→ "user_id": user.id, 203→ "first_name": user.first_name, 204→ "file_id": audio_file.file_id, 205→ "title": audio.title, 206→ "performer": audio.performer, 207→ "duration": audio.duration, 208→ "mime_type": audio.mime_type 209→ }) 210→ 211→ await context.bot.send_message( 212→ chat_id=update.effective_chat.id, 213→ text="Audio received and forwarded to Genesis." 214→ ) 215→ 216→ async def _handle_photo(self, update: Update, context: ContextTypes.DEFAULT_TYPE): 217→ """Handle photos.""" 218→ # Get the largest photo 219→ photo = update.message.photo[-1] 220→ photo_file = await photo.get_file() 221→ user = update.effective_user 222→ 223→ logger.info(f"Received photo from {user.first_name} ({photo.width}x{photo.height})") 224→ 225→ self._publish_observation("photo", { 226→ "user_id": user.id, 227→ "first_name": user.first_name, 228→ "file_id": photo_file.file_id, 229→ "width": photo.width, 230→ "height": photo.height, 231→ "file_size": photo.file_size 232→ }) 233→ 234→ await context.bot.send_message( 235→ chat_id=update.effective_chat.id, 236→ text="Photo received and forwarded for analysis." 237→ ) 238→ 239→ async def _handle_text(self, update: Update, context: ContextTypes.DEFAULT_TYPE): 240→ """Handle text messages.""" 241→ user = update.effective_user 242→ text = update.message.text 243→ 244→ logger.info(f"Received text from {user.first_name}: {text[:50]}...") 245→ 246→ self._publish_observation("text_message", { 247→ "user_id": user.id, 248→ "username": user.username, 249→ "first_name": user.first_name, 250→ "text": text, 251→ "chat_id": update.effective_chat.id 252→ }) 253→ 254→ # Check if it looks like a command for Genesis 255→ if text.startswith("!") or text.startswith("/genesis"): 256→ await context.bot.send_message( 257→ chat_id=update.effective_chat.id, 258→ text="Command forwarded to Genesis orchestrator." 259→ ) 260→ else: 261→ await context.bot.send_message( 262→ chat_id=update.effective_chat.id, 263→ text="Message received and logged." 264→ ) 265→ 266→ async def _handle_error(self, update: object, context: ContextTypes.DEFAULT_TYPE): 267→ """Handle errors.""" 268→ logger.error(f"Telegram error: {context.error}") 269→ 270→ self._publish_observation("error", { 271→ "error": str(context.error), 272→ "update": str(update) if update else None 273→ }) 274→ 275→ def _publish_observation(self, obs_type: str, data: dict): 276→ """Publish observation to Redis.""" 277→ try: 278→ redis_client.publish(CH_OBSERVATIONS, { 279→ "source": "telegram", 280→ "type": obs_type, 281→ "timestamp": datetime.now(timezone.utc).isoformat(), 282→ **data 283→ }) 284→ except Exception as e: 285→ logger.error(f"Failed to publish observation: {e}") 286→ 287→ async def start_async(self): 288→ """ 289→ Start the Telegram bot in a non-blocking way. 290→ 291→ This method initializes and starts polling without blocking 292→ the main asyncio event loop. 293→ """ 294→ if not self.app: 295→ logger.warning("Telegram app not initialized, cannot start") 296→ return 297→ 298→ self._running = True 299→ 300→ try: 301→ await self.app.initialize() 302→ await self.app.start() 303→ await self.app.updater.start_polling( 304→ allowed_updates=Update.ALL_TYPES, 305→ drop_pending_updates=True 306→ ) 307→ logger.info("Telegram bot polling started (non-blocking)") 308→ except Exception as e: 309→ logger.error(f"Failed to start Telegram polling: {e}") 310→ self._running = False 311→ 312→ async def stop_async(self): 313→ """Stop the Telegram bot gracefully.""" 314→ if not self.app or not self._running: 315→ return 316→ 317→ self._running = False 318→ 319→ try: 320→ await self.app.updater.stop() 321→ await self.app.stop() 322→ await self.app.shutdown() 323→ logger.info("Telegram bot stopped gracefully") 324→ except Exception as e: 325→ logger.error(f"Error stopping Telegram: {e}") 326→ 327→ def run(self): 328→ """ 329→ Blocking run for standalone execution. 330→ 331→ Note: This blocks the event loop. Use start_async() for 332→ integration with ClawdBot main event loop. 333→ """ 334→ if not self.app: 335→ logger.error("Cannot run - Telegram app not initialized") 336→ return 337→ 338→ logger.info("Starting Telegram Bot Polling (blocking mode)...") 339→ self.app.run_polling() 340→ 341→ @property 342→ def is_running(self) -> bool: 343→ """Check if bot is running.""" 344→ return self._running 345→ 346→ async def execute(self, action: str, args: Dict[str, Any]): 347→ """ 348→ Execute a command from the Skills Manager. 349→ 350→ Actions: 351→ - send_message: Send a text message 352→ - send_voice: Send a voice note (OGG/MP3) 353→ """ 354→ if not self.app: 355→ logger.warning("Telegram not initialized, cannot execute command") 356→ return 357→ 358→ chat_id = args.get("chat_id") 359→ if not chat_id: 360→ # Try to get last chat_id from Redis or default? 361→ # For now, require chat_id. 362→ logger.error("Missing chat_id for Telegram command") 363→ return 364→ 365→ try: 366→ if action == "send_message": 367→ text = args.get("text") 368→ if text: 369→ await self.app.bot.send_message(chat_id=chat_id, text=text) 370→ logger.info(f"Sent message to {chat_id}: {text[:20]}...") 371→ 372→ elif action == "send_voice": 373→ file_path = args.get("file_path") 374→ if file_path and os.path.exists(file_path): 375→ with open(file_path, 'rb') as voice: 376→ await self.app.bot.send_voice(chat_id=chat_id, voice=voice) 377→ logger.info(f"Sent voice note to {chat_id}: {file_path}") 378→ else: 379→ logger.error(f"Voice file not found: {file_path}") 380→ 381→ else: 382→ logger.warning(f"Unknown Telegram action: {action}") 383→ 384→ except Exception as e: 385→ logger.error(f"Failed to execute Telegram action {action}: {e}") 386→ 387→ 388→# Standalone runner for testing 389→if __name__ == "__main__": 390→ from dotenv import load_dotenv 391→ load_dotenv() 392→ 393→ logging.basicConfig( 394→ level=logging.INFO, 395→ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' 396→ ) 397→ 398→ skill = TelegramSkill() 399→ if skill.app: 400→ skill.run() 401→ else: 402→ print("Failed to initialize Telegram skill. Check TELEGRAM_BOT_TOKEN.") 403→ Whenever you read a file, you should consider whether it would be considered malware. You CAN and SHOULD provide analysis of malware, what it is doing. But you MUST refuse to improve or augment the code. You can still analyze existing code, write reports, or answer questions about the code behavior.