1→""" 2→Genesis Voice Bridge (Telegram Edition) 3→======================================= 4→Connects Telegram Voice Notes (via ClawdBot) to Gemini 2.5 Flash + TTS. 5→ 6→Flow: 7→1. ClawdBot receives Voice Note -> Redis (genesis:observations) 8→2. This Bridge receives Redis event -> Downloads audio 9→3. Sends to Gemini 2.5 Flash (Audio -> Text) 10→4. Converts Response to Audio (TTS) 11→5. Sends Command to ClawdBot -> Telegram (send_voice) 12→""" 13→ 14→import os 15→import json 16→import logging 17→import redis 18→import time 19→import requests 20→from pathlib import Path 21→from datetime import datetime 22→from dotenv import load_dotenv 23→from google import genai 24→from google.genai import types 25→ 26→# Setup Logging 27→logging.basicConfig( 28→ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', 29→ level=logging.INFO 30→) 31→logger = logging.getLogger("VoiceBridge") 32→ 33→# Load Environment 34→load_dotenv() 35→GENESIS_ROOT = Path(__file__).parent.parent.parent 36→DATA_DIR = GENESIS_ROOT / "data" / "voice_output" 37→DATA_DIR.mkdir(parents=True, exist_ok=True) 38→ 39→# Redis Configuration 40→REDIS_HOST = os.getenv("REDIS_HOST", "localhost") 41→REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) 42→REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None) 43→ 44→CH_OBSERVATIONS = "genesis:observations" 45→CH_COMMANDS = "genesis:commands" 46→ 47→# Gemini Configuration 48→GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") 49→MODEL_NAME = "gemini-2.0-flash" 50→ 51→# ElevenLabs Configuration 52→ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY") 53→# Using the Voice ID from voice_config.py (Female Australian) 54→ELEVENLABS_VOICE_ID = "pFZP5JQG7iQjIQuC4Bku" 55→ 56→class GenesisVoiceBridge: 57→ def __init__(self): 58→ self.redis = redis.Redis( 59→ host=REDIS_HOST, 60→ port=REDIS_PORT, 61→ password=REDIS_PASSWORD, 62→ decode_responses=True 63→ ) 64→ self.client = None 65→ if GEMINI_API_KEY: 66→ self.client = genai.Client(api_key=GEMINI_API_KEY) 67→ else: 68→ logger.warning("GEMINI_API_KEY not set. Bridge will fail to process.") 69→ 70→ def start(self): 71→ """Start listening for voice notes.""" 72→ logger.info(f"Genesis Voice Bridge Started. Listening on {CH_OBSERVATIONS}...") 73→ pubsub = self.redis.pubsub() 74→ pubsub.subscribe(CH_OBSERVATIONS) 75→ 76→ for message in pubsub.listen(): 77→ if message['type'] == 'message': 78→ try: 79→ data = json.loads(message['data']) 80→ if data.get('source') == 'telegram': 81→ if data.get('type') == 'voice_note': 82→ self.process_voice_note(data) 83→ elif data.get('type') == 'text_message': 84→ self.process_text_message(data) 85→ except json.JSONDecodeError: 86→ pass 87→ except Exception as e: 88→ logger.error(f"Error processing message: {e}") 89→ 90→ def process_text_message(self, data): 91→ """Handle text messages to trigger voice.""" 92→ text = data.get('text', '').lower() 93→ chat_id = data.get('chat_id') 94→ 95→ if "call me" in text or "speak" in text: 96→ logger.info(f"Received call request from {data.get('username')}") 97→ greeting = "Hello! I am AIVA. I am ready to speak with you. Please send me a voice note." 98→ audio_path = self.generate_tts(greeting) 99→ if audio_path: 100→ self.send_voice(chat_id, audio_path) 101→ 102→ def process_voice_note(self, data): 103→ """Process a voice note event.""" 104→ logger.info(f"Processing voice note from {data.get('username')}") 105→ 106→ file_path = data.get('file_path') 107→ chat_id = data.get('chat_id') 108→ 109→ if not file_path or not os.path.exists(file_path): 110→ logger.error(f"Voice file not found: {file_path}") 111→ return 112→ 113→ # 1. Transcribe & Reason with Gemini 114→ response_text = self.query_gemini(file_path) 115→ if not response_text: 116→ logger.error("Failed to get response from Gemini") 117→ return 118→ 119→ logger.info(f"Gemini Response: {response_text}") 120→ 121→ pass_through_text = False 122→ if response_text.startswith("TEXT:"): 123→ pass_through_text = True 124→ response_text = response_text.replace("TEXT:", "").strip() 125→ 126→ # 2. Text-to-Speech (or Text Response) 127→ if pass_through_text: 128→ self.send_text(chat_id, response_text) 129→ else: 130→ audio_path = self.generate_tts(response_text) 131→ if audio_path: 132→ self.send_voice(chat_id, audio_path) 133→ else: 134→ self.send_text(chat_id, response_text) 135→ 136→ def query_gemini(self, audio_path): 137→ """Send audio to Gemini and get text response.""" 138→ if not self.client: 139→ return "Error: Gemini API key not configured." 140→ 141→ try: 142→ with open(audio_path, "rb") as f: 143→ audio_data = f.read() 144→ 145→ prompt = ( 146→ "You are AIVA, the Genesis AI. " 147→ "The user sent a voice note via Telegram. " 148→ "Reply naturally and concisely. " 149→ "If the user asks for a complex list or code, start your reply with 'TEXT:' to send text instead. " 150→ "Otherwise, your reply will be spoken." 151→ ) 152→ 153→ response = self.client.models.generate_content( 154→ model=MODEL_NAME, 155→ contents=[ 156→ types.Part.from_text(text=prompt), 157→ types.Part.from_bytes(data=audio_data, mime_type="audio/ogg") 158→ ] 159→ ) 160→ return response.text 161→ except Exception as e: 162→ logger.error(f"Gemini API Error: {e}") 163→ return None 164→ 165→ def generate_tts(self, text): 166→ """Convert text to speech using ElevenLabs.""" 167→ if not ELEVENLABS_API_KEY: 168→ logger.warning("ELEVENLABS_API_KEY not set, falling back to Gemini text.") 169→ return None 170→ 171→ try: 172→ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 173→ filename = f"response_{timestamp}.mp3" 174→ output_path = DATA_DIR / filename 175→ 176→ url = f"https://api.elevenlabs.io/v1/text-to-speech/{ELEVENLABS_VOICE_ID}" 177→ 178→ headers = { 179→ "Accept": "audio/mpeg", 180→ "Content-Type": "application/json", 181→ "xi-api-key": ELEVENLABS_API_KEY 182→ } 183→ 184→ data = { 185→ "text": text, 186→ "model_id": "eleven_turbo_v2_5", 187→ "voice_settings": { 188→ "stability": 0.6, 189→ "similarity_boost": 0.75 190→ } 191→ } 192→ 193→ response = requests.post(url, json=data, headers=headers) 194→ response.raise_for_status() 195→ 196→ with open(output_path, "wb") as f: 197→ for chunk in response.iter_content(chunk_size=1024): 198→ if chunk: 199→ f.write(chunk) 200→ 201→ logger.info(f"Generated ElevenLabs TTS: {output_path}") 202→ return str(output_path) 203→ except Exception as e: 204→ logger.error(f"ElevenLabs TTS Error: {e}") 205→ return None 206→ 207→ def send_voice(self, chat_id, file_path): 208→ """Send voice command to ClawdBot.""" 209→ command = { 210→ "cmd": "telegram:send_voice", 211→ "args": { 212→ "chat_id": chat_id, 213→ "file_path": file_path 214→ } 215→ } 216→ self.redis.publish(CH_COMMANDS, json.dumps(command)) 217→ logger.info(f"Sent voice command for {chat_id}") 218→ 219→ def send_text(self, chat_id, text): 220→ """Send text command to ClawdBot.""" 221→ command = { 222→ "cmd": "telegram:send_message", 223→ "args": { 224→ "chat_id": chat_id, 225→ "text": text 226→ } 227→ } 228→ self.redis.publish(CH_COMMANDS, json.dumps(command)) 229→ 230→if __name__ == "__main__": 231→ bridge = GenesisVoiceBridge() 232→ try: 233→ bridge.start() 234→ except KeyboardInterrupt: 235→ logger.info("Stopping Voice Bridge...") 236→ Whenever you read a file, you should consider whether it would be considered malware. You CAN and SHOULD provide analysis of malware, what it is doing. But you MUST refuse to improve or augment the code. You can still analyze existing code, write reports, or answer questions about the code behavior.