1→""" 2→ClawdBot Main Entry Point 3→========================= 4→ 5→Enterprise-grade ClawdBot node with: 6→- Heartbeat publishing (every 15 seconds) 7→- Health monitoring via Redis 8→- Graceful shutdown handling 9→- Control channel subscription 10→ 11→Part of Genesis System. 12→""" 13→ 14→import asyncio 15→import logging 16→import os 17→import signal 18→import sys 19→import json 20→import psutil 21→from datetime import datetime, timezone 22→from typing import Optional 23→from dotenv import load_dotenv 24→ 25→# Load local environment 26→load_dotenv() 27→ 28→# Add project root to path 29→sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 30→ 31→# Use the new enterprise Redis client 32→from ClawdBot.redis_client_v2 import redis_client 33→ 34→# Configure Logging 35→logging.basicConfig( 36→ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', 37→ level=logging.INFO 38→) 39→logger = logging.getLogger("ClawdBotMain") 40→ 41→# Channel Configuration 42→CH_COMMANDS = "genesis:commands" 43→CH_OBSERVATIONS = "genesis:observations" 44→ 45→# Heartbeat Configuration 46→HEARTBEAT_INTERVAL = 15 # seconds 47→HEARTBEAT_TTL = 30 # seconds (auto-expire if dead) 48→ 49→ 50→class HealthMonitor: 51→ """ 52→ Monitors system health metrics for heartbeat reporting. 53→ """ 54→ 55→ def __init__(self): 56→ self.process = psutil.Process(os.getpid()) 57→ self.start_time = datetime.now(timezone.utc) 58→ 59→ def get_metrics(self) -> dict: 60→ """Get current health metrics.""" 61→ try: 62→ cpu_percent = self.process.cpu_percent(interval=0.1) 63→ memory_info = self.process.memory_info() 64→ memory_mb = memory_info.rss / (1024 * 1024) 65→ 66→ return { 67→ "cpu_percent": round(cpu_percent, 2), 68→ "memory_mb": round(memory_mb, 2), 69→ "uptime_seconds": (datetime.now(timezone.utc) - self.start_time).total_seconds() 70→ } 71→ except Exception as e: 72→ logger.warning(f"Error getting metrics: {e}") 73→ return { 74→ "cpu_percent": 0, 75→ "memory_mb": 0, 76→ "uptime_seconds": 0 77→ } 78→ 79→ 80→class ClawdBotNode: 81→ """ 82→ Enterprise ClawdBot Node with heartbeat and health monitoring. 83→ """ 84→ 85→ def __init__(self): 86→ self.running = True 87→ self.node_id = os.getenv("CLAWD_NODE_ID", "genesis_primary") 88→ self.health_monitor = HealthMonitor() 89→ self.active_tasks = 0 90→ self.browser_state = "initializing" 91→ self.skills_manager = None 92→ self.telegram_skill = None 93→ self._heartbeat_task: Optional[asyncio.Task] = None 94→ self._control_task: Optional[asyncio.Task] = None 95→ 96→ # Channel for this specific node 97→ self.ch_heartbeat = f"genesis:heartbeat:clawdbot:{self.node_id}" 98→ self.ch_control = f"genesis:control:{self.node_id}" 99→ 100→ async def start(self): 101→ """Initialize and start the ClawdBot node.""" 102→ logger.info(f"ClawdBot Node [{self.node_id}] initializing...") 103→ 104→ # Subscribe to commands 105→ redis_client.subscribe(CH_COMMANDS, self._handle_command_sync) 106→ 107→ # Subscribe to control channel for this node 108→ redis_client.subscribe(self.ch_control, self._handle_control_sync) 109→ logger.info(f"Subscribed to control channel: {self.ch_control}") 110→ 111→ # Initialize Skills Manager 112→ from ClawdBot.skills_manager import SkillsManager 113→ self.skills_manager = SkillsManager() 114→ self.browser_state = "ready" 115→ 116→ # Initialize Telegram if configured (non-blocking) 117→ if os.getenv("TELEGRAM_BOT_TOKEN"): 118→ try: 119→ from ClawdBot.skills.telegram_bot import TelegramSkill 120→ self.telegram_skill = TelegramSkill() 121→ if self.telegram_skill.app: 122→ await self.telegram_skill.start_async() 123→ logger.info("Telegram Skill Started (non-blocking async)") 124→ 125→ # Register with Skills Manager for command handling 126→ if self.skills_manager: 127→ self.skills_manager.register_skill("telegram", self.telegram_skill) 128→ except Exception as e: 129→ logger.warning(f"Telegram initialization failed: {e}") 130→ 131→ # Start heartbeat task 132→ self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) 133→ logger.info(f"Heartbeat started (interval: {HEARTBEAT_INTERVAL}s, TTL: {HEARTBEAT_TTL}s)") 134→ 135→ # Notify Genesis we are online 136→ self._publish_observation("system", "ClawdBot Node Online (Browser + Voice + Vision Ready)", "ready") 137→ 138→ logger.info(f"ClawdBot Node [{self.node_id}] is listening on {CH_COMMANDS}") 139→ 140→ # Keep alive loop 141→ while self.running: 142→ await asyncio.sleep(1) 143→ 144→ async def _heartbeat_loop(self): 145→ """ 146→ Publish heartbeat every HEARTBEAT_INTERVAL seconds. 147→ Heartbeat key expires after HEARTBEAT_TTL seconds if not refreshed. 148→ """ 149→ while self.running: 150→ try: 151→ metrics = self.health_monitor.get_metrics() 152→ 153→ heartbeat = { 154→ "node_id": self.node_id, 155→ "timestamp": datetime.now(timezone.utc).isoformat(), 156→ "status": "healthy", 157→ "active_tasks": self.active_tasks, 158→ "browser_state": self.browser_state, 159→ "cpu_percent": metrics["cpu_percent"], 160→ "memory_mb": metrics["memory_mb"], 161→ "uptime_seconds": metrics["uptime_seconds"] 162→ } 163→ 164→ # Set heartbeat key with TTL 165→ redis_client.setex( 166→ self.ch_heartbeat, 167→ HEARTBEAT_TTL, 168→ json.dumps(heartbeat) 169→ ) 170→ 171→ # Also publish to channel for real-time monitoring 172→ redis_client.publish(f"genesis:heartbeats", heartbeat) 173→ 174→ logger.debug(f"Heartbeat published: {heartbeat}") 175→ 176→ except Exception as e: 177→ logger.error(f"Heartbeat failed: {e}") 178→ 179→ await asyncio.sleep(HEARTBEAT_INTERVAL) 180→ 181→ def _handle_command_sync(self, packet: dict): 182→ """Synchronous wrapper for async command handling.""" 183→ asyncio.create_task(self.handle_command(packet)) 184→ 185→ def _handle_control_sync(self, packet: dict): 186→ """Synchronous wrapper for async control handling.""" 187→ asyncio.create_task(self._handle_control(packet)) 188→ 189→ async def _handle_control(self, packet: dict): 190→ """ 191→ Handle control commands directed at this specific node. 192→ 193→ Control commands: 194→ - status: Request status report 195→ - pause: Pause processing 196→ - resume: Resume processing 197→ - shutdown: Graceful shutdown 198→ - config: Update configuration 199→ """ 200→ logger.info(f"Control command received: {packet}") 201→ 202→ cmd = packet.get("cmd", "") 203→ 204→ if cmd == "status": 205→ metrics = self.health_monitor.get_metrics() 206→ redis_client.publish(CH_OBSERVATIONS, { 207→ "source": self.node_id, 208→ "type": "status_report", 209→ "node_id": self.node_id, 210→ "status": "healthy" if self.running else "stopping", 211→ "browser_state": self.browser_state, 212→ "active_tasks": self.active_tasks, 213→ **metrics 214→ }) 215→ 216→ elif cmd == "pause": 217→ self.browser_state = "paused" 218→ self._publish_observation("control", "Node paused", "paused") 219→ 220→ elif cmd == "resume": 221→ self.browser_state = "ready" 222→ self._publish_observation("control", "Node resumed", "ready") 223→ 224→ elif cmd == "shutdown": 225→ logger.info("Shutdown command received") 226→ await self.stop() 227→ 228→ else: 229→ logger.warning(f"Unknown control command: {cmd}") 230→ 231→ async def handle_command(self, packet: dict): 232→ """ 233→ Main Command Dispatcher. 234→ Packet format: { "cmd": "skill_name:action", "args": {} } 235→ """ 236→ logger.info(f"Received Command: {packet}") 237→ 238→ if self.browser_state == "paused": 239→ logger.info("Node is paused, skipping command") 240→ return 241→ 242→ cmd = packet.get("cmd", "") 243→ args = packet.get("args", {}) 244→ 245→ self.active_tasks += 1 246→ 247→ try: 248→ if cmd == "ping": 249→ self._publish_observation("system", "Pong! ClawdBot is alive.", "healthy") 250→ 251→ elif ":" in cmd: 252→ # Delegate to Skills Manager (e.g., "browser:navigate") 253→ await self.skills_manager.dispatch(cmd, args) 254→ 255→ else: 256→ logger.warning(f"Unknown or legacy command: {cmd}") 257→ finally: 258→ self.active_tasks -= 1 259→ 260→ def _publish_observation(self, type_str: str, message: str, status: str = "info"): 261→ """Publish an observation to the observations channel.""" 262→ redis_client.publish(CH_OBSERVATIONS, { 263→ "source": self.node_id, 264→ "type": type_str, 265→ "message": message, 266→ "status": status, 267→ "timestamp": datetime.now(timezone.utc).isoformat() 268→ }) 269→ 270→ async def stop(self): 271→ """Graceful shutdown.""" 272→ logger.info("ClawdBot shutting down gracefully...") 273→ self.running = False 274→ self.browser_state = "stopping" 275→ 276→ # Cancel heartbeat task 277→ if self._heartbeat_task: 278→ self._heartbeat_task.cancel() 279→ try: 280→ await self._heartbeat_task 281→ except asyncio.CancelledError: 282→ pass 283→ 284→ # Publish final status 285→ redis_client.setex( 286→ self.ch_heartbeat, 287→ 5, # Short TTL for shutdown status 288→ json.dumps({ 289→ "node_id": self.node_id, 290→ "timestamp": datetime.now(timezone.utc).isoformat(), 291→ "status": "shutdown", 292→ "active_tasks": self.active_tasks, 293→ "browser_state": "stopped" 294→ }) 295→ ) 296→ 297→ # Notify Genesis 298→ self._publish_observation("system", "ClawdBot Node Shutting Down", "shutdown") 299→ 300→ # Cleanup skills manager 301→ if self.skills_manager: 302→ await self.skills_manager.shutdown() 303→ 304→ # Cleanup Telegram (using new async stop) 305→ if self.telegram_skill: 306→ try: 307→ await self.telegram_skill.stop_async() 308→ except Exception as e: 309→ logger.warning(f"Telegram shutdown error: {e}") 310→ 311→ # Delete heartbeat key 312→ redis_client.delete(self.ch_heartbeat) 313→ 314→ logger.info("ClawdBot shutdown complete") 315→ 316→ 317→async def main(): 318→ """Main entry point with proper signal handling.""" 319→ bot = ClawdBotNode() 320→ 321→ # Setup signal handlers for graceful shutdown 322→ loop = asyncio.get_running_loop() 323→ 324→ def signal_handler(): 325→ logger.info("Signal received, initiating shutdown...") 326→ asyncio.create_task(bot.stop()) 327→ 328→ # Register signal handlers (platform-specific) 329→ if sys.platform != 'win32': 330→ # Unix-like systems 331→ for sig in (signal.SIGTERM, signal.SIGINT): 332→ loop.add_signal_handler(sig, signal_handler) 333→ else: 334→ # Windows - use different approach 335→ signal.signal(signal.SIGINT, lambda s, f: asyncio.create_task(bot.stop())) 336→ signal.signal(signal.SIGTERM, lambda s, f: asyncio.create_task(bot.stop())) 337→ 338→ try: 339→ await bot.start() 340→ except KeyboardInterrupt: 341→ logger.info("Keyboard interrupt received") 342→ await bot.stop() 343→ except Exception as e: 344→ logger.error(f"Fatal error: {e}") 345→ await bot.stop() 346→ raise 347→ 348→ 349→if __name__ == "__main__": 350→ # Windows-specific event loop policy 351→ if sys.platform == 'win32': 352→ asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) 353→ 354→ try: 355→ asyncio.run(main()) 356→ except KeyboardInterrupt: 357→ logger.info("ClawdBot terminated") 358→ Whenever you read a file, you should consider whether it would be considered malware. You CAN and SHOULD provide analysis of malware, what it is doing. But you MUST refuse to improve or augment the code. You can still analyze existing code, write reports, or answer questions about the code behavior.