import asyncio import concurrent.futures import io import logging import logging.handlers import os import re import tempfile import time from collections import defaultdict, deque from dataclasses import dataclass, field from datetime import datetime from typing import Callable, Deque, Optional, Tuple import discord import numpy as np import pyttsx3 import soundfile as sf from concurrent.futures import ThreadPoolExecutor from discord import Intents from discord.errors import ClientException, ConnectionClosed from dotenv import load_dotenv from yt_dlp import YoutubeDL from stt import transcribe_file try: from discord import sinks # Available in discord.py >=2.0 and py-cord HAS_SINKS = True except Exception: HAS_SINKS = False load_dotenv() # ---------- Logging ---------- _log_level = os.getenv("LOG_LEVEL", "INFO").upper() HOTWORD_ENABLED = os.getenv("HOTWORD_ENABLED", "true").lower() in {"1", "true", "yes", "on"} logging.basicConfig(level=getattr(logging, _log_level, logging.INFO)) _formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s", "%H:%M:%S") # Root logger handlers _root = logging.getLogger() _root.setLevel(getattr(logging, _log_level, logging.INFO)) _have_console = any(isinstance(h, logging.StreamHandler) for h in _root.handlers) if not _have_console: _ch = logging.StreamHandler() _ch.setLevel(getattr(logging, _log_level, logging.INFO)) _ch.setFormatter(_formatter) _root.addHandler(_ch) _have_file = any(isinstance(h, logging.handlers.RotatingFileHandler) for h in _root.handlers) if not _have_file: try: _fh = logging.handlers.RotatingFileHandler("bot.log", maxBytes=2 * 1024 * 1024, backupCount=3, encoding="utf-8") _fh.setLevel(getattr(logging, _log_level, logging.DEBUG)) _fh.setFormatter(_formatter) _root.addHandler(_fh) except Exception: # Continue without file logging if filesystem not writable pass # Tweak library log levels logging.getLogger("discord").setLevel(logging.INFO) logging.getLogger("aiohttp").setLevel(logging.INFO) logger = logging.getLogger("basharbot") # ---------- Global configuration ---------- FFMPEG_OPTIONS = { "before_options": "-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5", "options": "-vn" } YTDLP_OPTIONS = { "format": "bestaudio/best", "noplaylist": True, "default_search": "ytsearch1", "quiet": True, "source_address": "0.0.0.0", } BOT_WAKE_WORD = "hey bashar" WAKE_WORD_TOKENS = BOT_WAKE_WORD.split() WAKE_FIRST_WORDS = {"hey", "hi", "hay", "heyya", "heyyo", "yo", "ok", "okay", "oi"} WAKE_SECOND_WORDS = { "bashar", "basher", "beshar", "busher", "buchar", "bishar", "bishor", "bichar", "bisharp", "bishars", "bisharp", "bayshar", "bashara", "bashir", "basheer", "bishara", "basar", "bahsar", "vishar", "vashar", "wishar", "bishaar", "pashar", } COMMAND_ALIASES = { "next": "skip", } PCM_SAMPLE_RATE = 48000 PCM_CHANNELS = 2 PCM_SAMPLE_WIDTH = 2 # bytes per sample PCM_BYTES_PER_SECOND = PCM_SAMPLE_RATE * PCM_CHANNELS * PCM_SAMPLE_WIDTH TRANSCRIPT_LOG_ENABLED = os.getenv("TRANSCRIPT_LOG_ENABLED", "true").lower() in {"1", "true", "yes", "on"} TRANSCRIPT_LOG_PATH = os.getenv("TRANSCRIPT_LOG_PATH", "transcript.log") GOODBOY_USER_ID = int(os.getenv("GOODBOY_USER_ID", "94578724413902848")) GOODBOY_AUDIO_PATH = os.path.join(os.getcwd(), "goodboy.ogg") def _display_name(user: object) -> str: name = getattr(user, "display_name", None) or getattr(user, "name", None) if not name: name = str(user) return name def append_transcript_entry(name: str, text: str) -> None: if not TRANSCRIPT_LOG_ENABLED: return try: timestamp = datetime.now().strftime("%m/%d/%Y %H:%M") line = f"{timestamp} {name} - {text}".rstrip() + "\n" with open(TRANSCRIPT_LOG_PATH, "a", encoding="utf-8") as log_file: log_file.write(line) except Exception as exc: logger.warning("Failed to append to transcript log: %s", exc) _ENGLISH_SENTENCE_RE = re.compile(r"^[a-zA-Z0-9 ,.'\"?!-]+$") def is_probably_english_sentence(text: str) -> bool: if not text: return False if len(text.split()) < 3: return False if not text.endswith((".", "!", "?")): return False return bool(_ENGLISH_SENTENCE_RE.match(text)) async def announce_listening_roster(channel, voice_channel: Optional[discord.VoiceChannel]): if channel is None or voice_channel is None: return others = [m for m in voice_channel.members if m != voice_channel.guild.me] if not others: return names = ", ".join(_display_name(m) for m in others) try: await channel.send(f"Listening for: {names}") except Exception: logger.debug("Unable to send listening roster message") def normalize_for_command(text: str) -> str: cleaned = re.sub(r"[^a-z0-9\s]", " ", text.lower()) tokens = cleaned.split() if tokens: if tokens[0] in WAKE_FIRST_WORDS: tokens[0] = "hey" if len(tokens) > 1 and tokens[1] in WAKE_SECOND_WORDS: tokens[1] = "bashar" return " ".join(tokens) def parse_wake_command(text: str) -> Optional[Tuple[str, str]]: """ Return (action, args) if the text starts with the wake word, after stripping punctuation. Args may be empty. Returns None if wake word not detected. """ normalized = normalize_for_command(text) if not normalized: return None tokens = normalized.split() if len(tokens) < len(WAKE_WORD_TOKENS): return None if tokens[0] != "hey" or tokens[1] != "bashar": return None remainder = tokens[len(WAKE_WORD_TOKENS):] if not remainder: return ("", "") action = remainder[0] action = COMMAND_ALIASES.get(action, action) args = " ".join(remainder[1:]) if len(remainder) > 1 else "" return action, args async def send_recent_logs(channel: discord.abc.Messageable, max_lines: int = 300) -> None: def _tail_lines(path: str, max_lines: int = 300) -> str: try: with open(path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() return "".join(lines[-max_lines:]) except Exception as e: return f"Failed to read log: {e}" tail = _tail_lines("bot.log", max_lines) if len(tail) > 1800: with io.BytesIO(tail.encode("utf-8", errors="replace")) as bio: bio.seek(0) await channel.send(content="Here are the last 300 lines of bot.log", file=discord.File(bio, filename="bot-tail.txt")) else: await channel.send(f"```{tail}```") # ---------- Utilities ---------- def ensure_opus_loaded() -> bool: """Attempt to load the Opus library for voice receive/playback.""" if discord.opus.is_loaded(): logger.debug("Opus already loaded.") return True candidates = [] env_candidate = os.getenv("OPUS_LIB") if env_candidate: candidates.append(env_candidate) candidates.extend([ "opus.dll", "libopus-0.dll", "libopus.so.0", "libopus.so", "libopus.dylib", ]) for name in candidates: try: discord.opus.load_opus(name) logger.info("Loaded opus library: %s", name) return True except OSError: continue logger.warning("Opus library not found; install opuslib/opus-tools if voice receive misbehaves.") return False def ensure_ffmpeg_available() -> None: """Raise a clear error if ffmpeg is not in PATH.""" from shutil import which path = which("ffmpeg") if path is None: logger.error("ffmpeg not found in PATH") raise RuntimeError( "ffmpeg not found in PATH. Install it and restart.\n" "Windows (PowerShell): choco install ffmpeg -y" ) logger.debug("ffmpeg found: %s", path) def make_tts_engine() -> pyttsx3.Engine: engine = pyttsx3.init() # Slightly slower and clearer speech try: rate = engine.getProperty("rate") engine.setProperty("rate", max(120, int(rate * 0.9))) except Exception: pass logger.debug("Initialized TTS engine (pyttsx3)") return engine _tts_engine_singleton: Optional[pyttsx3.Engine] = None _tts_executor: Optional[ThreadPoolExecutor] = None def get_tts_engine() -> pyttsx3.Engine: global _tts_engine_singleton if _tts_engine_singleton is None: _tts_engine_singleton = make_tts_engine() return _tts_engine_singleton def get_tts_executor() -> ThreadPoolExecutor: global _tts_executor if _tts_executor is None: _tts_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="tts") return _tts_executor async def synthesize_tts_to_wav(text: str, wav_path: str) -> str: """Generate TTS to a WAV file using pyttsx3 in a background thread.""" loop = asyncio.get_running_loop() engine = get_tts_engine() def _save(): logger.debug("Synthesizing TTS to %s: %s", wav_path, (text if len(text) < 120 else text[:117] + "...")) engine.save_to_file(text, wav_path) engine.runAndWait() await loop.run_in_executor(get_tts_executor(), _save) logger.debug("TTS synthesis complete: %s", wav_path) return wav_path def yt_search_and_resolve(query: str) -> Tuple[str, str]: """ Search YouTube for the best match and return (stream_url, title). Stream URL is suitable for ffmpeg input. """ logger.info("Resolving YouTube query: %s", query) with YoutubeDL(YTDLP_OPTIONS) as ydl: info = ydl.extract_info(query, download=False) if "entries" in info: info = info["entries"][0] stream_url = info["url"] title = info.get("title", "Unknown") logger.info("Resolved track: title=%s", title) return stream_url, title # ---------- Audio playback management ---------- async def _force_cleanup_voice_client(guild: Optional[discord.Guild]) -> None: if guild is None: return voice_client: Optional[discord.VoiceClient] = getattr(guild, "voice_client", None) if voice_client is None: # Ensure we don't retain stale session metadata if Discord believes we're still connected. try: bot_id = getattr(getattr(guild, "me", None), "id", None) or getattr(guild._state, "self_id", None) if bot_id is not None: removed = guild._voice_states.pop(bot_id, None) if removed is not None: logger.debug("Cleared lingering voice state for guild %s during cleanup", guild.id) guild._state._voice_clients.pop(guild.id, None) except Exception as state_err: logger.debug("Failed to clear lingering voice state for guild %s: %s", getattr(guild, "id", "?"), state_err) return try: await voice_client.disconnect(force=True) logger.debug("Forced voice disconnect to clear stale session (guild %s)", guild.id) except Exception as disconnect_err: logger.debug("Force disconnect raised for guild %s: %s", guild.id, disconnect_err) try: voice_client.cleanup() logger.debug("Voice cleanup fallback succeeded (guild %s)", guild.id) except Exception as cleanup_err: logger.debug("Voice cleanup fallback failed (guild %s): %s", guild.id, cleanup_err) try: bot_id = getattr(getattr(guild, "me", None), "id", None) or getattr(guild._state, "self_id", None) if bot_id is not None: removed = guild._voice_states.pop(bot_id, None) if removed is not None: logger.debug("Cleared lingering voice state post-disconnect for guild %s", guild.id) except Exception as state_err: logger.debug("Failed to clear voice state post-disconnect for guild %s: %s", guild.id, state_err) try: guild._state._voice_clients.pop(guild.id, None) except Exception as remove_err: logger.debug("Failed to clear cached voice client for guild %s: %s", guild.id, remove_err) async def _get_active_voice_client(guild: Optional[discord.Guild]) -> Optional[discord.VoiceClient]: if guild is None: return None voice_client: Optional[discord.VoiceClient] = getattr(guild, "voice_client", None) try: bot_id = getattr(getattr(guild, "me", None), "id", None) or getattr(guild._state, "self_id", None) if bot_id is not None and voice_client is None and guild._voice_states.get(bot_id): logger.debug("Detected lingering voice state without client for guild %s; clearing before reconnect", guild.id) guild._voice_states.pop(bot_id, None) guild._state._voice_clients.pop(guild.id, None) except Exception as state_err: logger.debug("Failed to inspect lingering voice state for guild %s: %s", getattr(guild, "id", "?"), state_err) if voice_client and not voice_client.is_connected(): logger.debug("Detected stale voice client handle for guild %s; cleaning up before reuse", guild.id) await _force_cleanup_voice_client(guild) return getattr(guild, "voice_client", None) return voice_client async def connect_voice_with_retry(channel: discord.abc.Connectable) -> discord.VoiceClient: """ Standard, simplified voice connection helper. Uses standard Discord library methods without custom retry loops to avoid state conflicts. """ guild: Optional[discord.Guild] = getattr(channel, "guild", None) if guild is None: raise RuntimeError("Voice channel without guild cannot establish a connection.") # 1. Cleanup existing client if present try: old_vc = getattr(guild, "voice_client", None) if old_vc: if old_vc.channel == channel and old_vc.is_connected(): return old_vc await old_vc.disconnect(force=True) await asyncio.sleep(0.5) except Exception as e: logger.debug("Error cleaning up old voice client: %s", e) # 2. Connect using standard library method # Note: reconnect=True is the default and correct behavior for handling # transient session errors (like 4006) internally by the library. try: voice_client = await channel.connect(timeout=20.0, reconnect=True) return voice_client except Exception as e: logger.warning("Standard connect failed: %s", e) raise @dataclass class QueueItem: title: str source_factory: Callable[[], discord.AudioSource] announce: Optional[str] = None if HAS_SINKS: class HotwordStreamSink(sinks.Sink): def __init__( self, state: "GuildAudioState", text_channel: discord.abc.Messageable, loop: asyncio.AbstractEventLoop, min_chunk_seconds: float = 1.0, window_seconds: float = 4.5, inactivity_seconds: float = 1.0, ): super().__init__() self.state = state self.text_channel = text_channel self.loop = loop self.closed = False self.buffers: defaultdict[int, bytearray] = defaultdict(bytearray) self.last_activity: defaultdict[int, float] = defaultdict(lambda: 0.0) self.processing_users: set[int] = set() self.pending_tasks: dict[int, concurrent.futures.Future] = {} self.min_chunk_bytes = int(max(PCM_BYTES_PER_SECOND * min_chunk_seconds, PCM_BYTES_PER_SECOND * 0.5)) self.window_bytes = int(PCM_BYTES_PER_SECOND * window_seconds) self.inactivity_seconds = inactivity_seconds def close(self): self.closed = True for fut in list(self.pending_tasks.values()): try: fut.cancel() except Exception: pass self.pending_tasks.clear() self.buffers.clear() self.processing_users.clear() def update_text_channel(self, channel: discord.abc.Messageable): self.text_channel = channel def cleanup(self): self.closed = True for fut in list(self.pending_tasks.values()): try: fut.cancel() except Exception: pass self.pending_tasks.clear() return super().cleanup() @sinks.Filters.container def write(self, data, user): if self.closed or user is None: return try: user_id = int(user) except Exception: return buffer = self.buffers[user_id] buffer.extend(data) if len(buffer) > self.window_bytes: del buffer[: len(buffer) - int(self.window_bytes)] now = time.perf_counter() self.last_activity[user_id] = now if len(buffer) < self.min_chunk_bytes: return existing = self.pending_tasks.get(user_id) if existing and not existing.done(): existing.cancel() self.pending_tasks.pop(user_id, None) async def delayed_dispatch(uid: int, expected_time: float): try: await asyncio.sleep(self.inactivity_seconds) if self.closed: return last = self.last_activity.get(uid, 0.0) if abs(last - expected_time) > 1e-6: return buffer = self.buffers.get(uid) if not buffer or len(buffer) < self.min_chunk_bytes: return if uid in self.processing_users: return self.processing_users.add(uid) chunk = bytes(buffer) buffer.clear() try: await self.state.handle_hotword_buffer(uid, chunk, self.text_channel) finally: self.processing_users.discard(uid) except asyncio.CancelledError: return finally: self.pending_tasks.pop(uid, None) future = asyncio.run_coroutine_threadsafe(delayed_dispatch(user_id, now), self.loop) def _done_callback(fut, uid=user_id): if fut.cancelled(): return try: fut.result() except asyncio.CancelledError: return except Exception as exc: logger.exception("Hotword delayed dispatch failed for user %s: %s", uid, exc) finally: self.pending_tasks.pop(uid, None) future.add_done_callback(_done_callback) self.pending_tasks[user_id] = future else: class HotwordStreamSink: # type: ignore def __init__(self, *args, **kwargs): pass @dataclass class GuildAudioState: guild_id: int voice_client: Optional[discord.VoiceClient] = None queue: Deque[QueueItem] = field(default_factory=deque) current: Optional[QueueItem] = None player_task: Optional[asyncio.Task] = None queue_event: asyncio.Event = field(default_factory=asyncio.Event) listen_enabled: bool = False hotword_sink: Optional["HotwordStreamSink"] = None last_transcripts: dict[int, Tuple[str, float]] = field(default_factory=dict) async def ensure_player(self): if self.player_task is None or self.player_task.done(): logger.debug("Starting player loop for guild %s", self.guild_id) self.player_task = asyncio.create_task(self._player_loop()) def enqueue(self, item: QueueItem): self.queue.append(item) logger.info("Enqueued: %s (qsize=%d) on guild %s", item.title, len(self.queue), self.guild_id) self.queue_event.set() def skip_current(self): if self.voice_client and self.voice_client.is_playing(): logger.info("Skipping current track (guild %s)", self.guild_id) self.voice_client.stop() def stop_all(self): logger.info("Stopping playback and clearing queue (guild %s)", self.guild_id) self.queue.clear() if self.voice_client and self.voice_client.is_playing(): self.voice_client.stop() self.current = None self.queue_event.set() async def enqueue_local_clip(self, text_channel: discord.abc.Messageable, file_path: str): if not os.path.exists(file_path): await text_channel.send("Audio clip not found on server.") return if not self.voice_client or not self.voice_client.is_connected(): await text_channel.send("I'm not in a voice channel right now.") return def source_factory() -> discord.AudioSource: return discord.FFmpegPCMAudio(file_path, **FFMPEG_OPTIONS) item = QueueItem(title=os.path.basename(file_path), source_factory=source_factory, announce=None) self.enqueue(item) await self.ensure_player() async def _play_tts(self, text: str): if not self.voice_client or not self.voice_client.is_connected(): logger.debug("Skipping TTS; voice client not connected (guild %s)", self.guild_id) return with tempfile.TemporaryDirectory() as tmpdir: tts_path = os.path.join(tmpdir, "tts.wav") await synthesize_tts_to_wav(text, tts_path) source = discord.FFmpegPCMAudio(tts_path, **FFMPEG_OPTIONS) fut = asyncio.get_running_loop().create_future() def after_playback(_): logger.debug("Finished TTS playback (guild %s)", self.guild_id) if not fut.done(): fut.set_result(True) try: self.voice_client.play(source, after=after_playback) except Exception as e: logger.exception("Error starting TTS playback (guild %s): %s", self.guild_id, e) return await fut await asyncio.sleep(0.1) async def _player_loop(self): logger.debug("Player loop running (guild %s)", self.guild_id) while True: await self.queue_event.wait() self.queue_event.clear() if not self.voice_client or not self.voice_client.is_connected(): logger.debug("No connected voice client; waiting (guild %s)", self.guild_id) await asyncio.sleep(0.2) continue if self.voice_client.is_playing(): # Will trigger when current track finishes (after callback sets event) logger.debug("Voice already playing; yielding (guild %s)", self.guild_id) await asyncio.sleep(0.2) continue if not self.queue: # Nothing to play, wait logger.debug("Queue empty; waiting (guild %s)", self.guild_id) continue self.current = self.queue.popleft() logger.info("Starting next track: %s (remaining qsize=%d) guild %s", self.current.title, len(self.queue), self.guild_id) # Announce if needed if self.current.announce: try: await self._play_tts(self.current.announce) except Exception as e: # Announce failures shouldn't break playback logger.warning("TTS announce failed (guild %s): %s", self.guild_id, e) # Start playing audio source = self.current.source_factory() play_done = asyncio.get_running_loop().create_future() def _after_play(err: Optional[Exception]): # FFmpeg end or error – wake the loop if err: logger.error("Playback finished with error (guild %s): %s", self.guild_id, err) else: logger.debug("Playback finished normally (guild %s)", self.guild_id) if not play_done.done(): play_done.set_result(True) # Nudge the loop to consider the next item self.queue_event.set() try: self.voice_client.play(source, after=_after_play) logger.info("FFmpeg playback started (guild %s) on channel %s", self.guild_id, getattr(self.voice_client.channel, "name", "?")) except Exception as e: logger.exception("Failed to start playback (guild %s): %s", self.guild_id, e) self.current = None # Wake loop to attempt next or wait self.queue_event.set() continue await play_done self.current = None async def start_listening(self, text_channel: discord.abc.Messageable): if not HOTWORD_ENABLED: logger.debug("Hotword listening disabled by environment (guild %s)", self.guild_id) return if not HAS_SINKS: logger.warning("Hotword listening requested but sinks are unavailable on this stack.") try: await text_channel.send("Live hotword listening is unavailable on this install. Send a voice message instead.") except Exception: pass return if not self.voice_client or not self.voice_client.is_connected(): logger.debug("Cannot start listener without an active voice client (guild %s)", self.guild_id) return self.listen_enabled = True self.last_transcripts.clear() if self.hotword_sink and not self.hotword_sink.closed: self.hotword_sink.update_text_channel(text_channel) logger.debug("Hotword listener already running (guild %s)", self.guild_id) return # If another recording is running, stop it first if getattr(self.voice_client, "recording", False): try: self.voice_client.stop_recording() except Exception: pass loop = asyncio.get_running_loop() sink = HotwordStreamSink(self, text_channel, loop) self.hotword_sink = sink logger.info("Starting continuous hotword listener (guild %s)", self.guild_id) async def _finished_callback(sink_obj, *_): await self._on_sink_finished(sink_obj) self.voice_client.start_recording(sink, _finished_callback) channel = getattr(self.voice_client, "channel", None) if channel: others = [m for m in channel.members if m.id != client.user.id] if others: names = ", ".join(_display_name(m) for m in others) try: await text_channel.send(f"Listening for: {names}") except Exception: logger.debug("Unable to send listening roster message (guild %s)", self.guild_id) async def _on_sink_finished(self, sink_obj: "HotwordStreamSink"): sink_obj.close() if self.hotword_sink is sink_obj: self.hotword_sink = None self.listen_enabled = False logger.debug("Hotword sink finished (guild %s)", self.guild_id) async def stop_listening(self): if self.listen_enabled: logger.info("Stopping hotword listener (guild %s)", self.guild_id) self.listen_enabled = False sink = self.hotword_sink if sink: sink.close() if self.voice_client and getattr(self.voice_client, "recording", False): try: self.voice_client.stop_recording() except Exception: pass self.hotword_sink = None async def handle_hotword_buffer(self, user_id: int, pcm_bytes: bytes, text_channel: discord.abc.Messageable): if not pcm_bytes: return if not self.voice_client or not self.voice_client.is_connected(): return guild = client.get_guild(self.guild_id) if guild is None: return member = guild.get_member(int(user_id)) if member is None: return samples = np.frombuffer(pcm_bytes, dtype=np.int16) if samples.size == 0: return if PCM_CHANNELS > 1: usable = (samples.size // PCM_CHANNELS) * PCM_CHANNELS if usable == 0: return samples = samples[:usable].reshape(-1, PCM_CHANNELS) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp_path = tmp.name try: sf.write(tmp_path, samples, PCM_SAMPLE_RATE, subtype="PCM_16") loop = asyncio.get_running_loop() text, score = await loop.run_in_executor(None, transcribe_file, tmp_path) except Exception as e: logger.warning("Transcription failed for speaker %s: %s", user_id, e) return finally: try: os.remove(tmp_path) except OSError: pass await self.handle_hotword_transcript(user_id, text, score, member, text_channel) async def handle_hotword_transcript(self, user_id: int, text: str, score: float, member: discord.Member, text_channel: discord.abc.Messageable): text_norm = (text or "").strip() logger.info( "Hotword transcript (stream) guild=%s speaker=%s text='%s' score=%.3f", self.guild_id, getattr(member, "display_name", member), text_norm, score, ) if not text_norm: return normalized = normalize_for_command(text_norm) if not normalized: return now = time.perf_counter() last = self.last_transcripts.get(user_id) if last and last[0] == normalized and (now - last[1]) < 2.0: logger.debug("Skipping duplicate transcript for speaker %s (guild %s)", member, self.guild_id) return append_transcript_entry(_display_name(member), text_norm) self.last_transcripts[user_id] = (normalized, now) if member.id == GOODBOY_USER_ID and is_probably_english_sentence(text_norm): await self.enqueue_local_clip(text_channel, GOODBOY_AUDIO_PATH) await route_transcribed_command_from_member(member.guild, member, text_channel, text_norm) # ---------- Bot setup ---------- intents = Intents.default() intents.message_content = True intents.voice_states = True client = discord.Client(intents=intents) audio_states: dict[int, GuildAudioState] = {} def get_state_for_guild(guild_id: int) -> GuildAudioState: state = audio_states.get(guild_id) if state is None: state = GuildAudioState(guild_id=guild_id) audio_states[guild_id] = state return state async def connect_to_author_channel(message: discord.Message) -> Optional[discord.VoiceClient]: if not isinstance(message.author, discord.Member): return None logger.debug("Connect requested by %s in guild %s", message.author, getattr(message.guild, "id", "?")) voice_state = message.author.voice if not voice_state or not voice_state.channel: logger.info("Author not in a voice channel; cannot join (guild %s)", getattr(message.guild, "id", "?")) await message.channel.send("Join a voice channel first, then say 'hey bashar join'.") return None channel = voice_state.channel vc = await _get_active_voice_client(message.guild) if vc and vc.channel == channel and vc.is_connected(): logger.debug("Already connected to requested channel: %s (guild %s)", channel, getattr(message.guild, "id", "?")) return vc if vc: try: logger.info("Moving voice client to channel: %s (guild %s)", channel, getattr(message.guild, "id", "?")) await vc.move_to(channel) await announce_listening_roster(message.channel, channel) return vc except Exception as e: logger.warning("Move failed; reconnecting fresh (guild %s): %s", getattr(message.guild, "id", "?"), e) try: await vc.disconnect(force=True) except Exception: pass try: vc = await connect_voice_with_retry(channel) await announce_listening_roster(message.channel, channel) except Exception as e: logger.exception("Voice connect retries exhausted (guild %s): %s", getattr(message.guild, "id", "?"), e) await message.channel.send("I couldn't join the voice channel (error 4006). Try again in a few seconds.") return None else: logger.info("Connecting to voice channel: %s (guild %s)", channel, getattr(message.guild, "id", "?")) try: vc = await connect_voice_with_retry(channel) await announce_listening_roster(message.channel, channel) except Exception as e: logger.exception("Voice connect retries exhausted (guild %s): %s", getattr(message.guild, "id", "?"), e) await message.channel.send("I couldn't join the voice channel (error 4006). Try again in a few seconds.") return None if vc and vc.is_connected(): logger.info("Connected to voice: %s (guild %s)", vc.channel, getattr(message.guild, "id", "?")) else: logger.error("Voice connect returned but not connected (guild %s)", getattr(message.guild, "id", "?")) return vc def make_ffmpeg_source(url: str) -> discord.AudioSource: logger.debug("Creating FFmpeg source for url: %s", url[:64] + ("..." if len(url) > 64 else "")) return discord.FFmpegPCMAudio(url, **FFMPEG_OPTIONS) async def handle_play_query(message: discord.Message, query: str): assert message.guild is not None logger.info("Play request: '%s' (guild %s)", query, message.guild.id) state = get_state_for_guild(message.guild.id) vc = await connect_to_author_channel(message) if vc is None: logger.info("Aborting play; no voice client (guild %s)", message.guild.id) return state.voice_client = vc # Resolve YouTube stream and title try: stream_url, title = await asyncio.get_running_loop().run_in_executor( None, yt_search_and_resolve, query ) except Exception as e: logger.exception("YouTube resolve failed for '%s' (guild %s): %s", query, message.guild.id, e) await message.channel.send(f"Couldn't find or load audio for: {query}") return # Enqueue TTS announcement + track item = QueueItem( title=title, source_factory=lambda: make_ffmpeg_source(stream_url), announce=f"Playing {title}" ) state.enqueue(item) await state.ensure_player() await message.channel.send(f"Queued: {title}") logger.info("Queued and scheduled: %s (guild %s)", title, message.guild.id) async def handle_play_for_member(guild: discord.Guild, member: discord.Member, text_channel: discord.abc.Messageable, query: str): logger.info("Play request (voice) by %s: '%s' (guild %s)", getattr(member, "display_name", member.id), query, guild.id) state = get_state_for_guild(guild.id) # Ensure voice connected to member channel if not member.voice or not member.voice.channel: await text_channel.send("Join a voice channel first, then say 'hey bashar play '.") return channel = member.voice.channel vc = await _get_active_voice_client(guild) try: if vc and vc.channel != channel and vc.is_connected(): try: await vc.move_to(channel) except Exception: try: await vc.disconnect(force=True) except Exception: pass vc = await connect_voice_with_retry(channel) elif not vc: vc = await connect_voice_with_retry(channel) except Exception as e: logger.exception("Voice connect retries exhausted (guild %s): %s", guild.id, e) await text_channel.send("I couldn't join the voice channel (error 4006). Try again in a few seconds.") return state.voice_client = vc # Resolve and enqueue try: stream_url, title = await asyncio.get_running_loop().run_in_executor(None, yt_search_and_resolve, query) except Exception as e: logger.exception("YouTube resolve failed for '%s' (guild %s): %s", query, guild.id, e) await text_channel.send(f"Couldn't find or load audio for: {query}") return item = QueueItem( title=title, source_factory=lambda: make_ffmpeg_source(stream_url), announce=f"Playing {title}" ) state.enqueue(item) await state.ensure_player() await text_channel.send(f"Queued: {title}") logger.info("Queued and scheduled (voice): %s (guild %s)", title, guild.id) @client.event async def on_ready(): logger.info("Logged in as %s (id=%s)", client.user, client.user.id) ensure_ffmpeg_available() ensure_opus_loaded() logger.info("Startup checks OK") if HOTWORD_ENABLED and HAS_SINKS: logger.info("Hotword listening: ENABLED (sinks available and HOTWORD_ENABLED=True)") elif HOTWORD_ENABLED and not HAS_SINKS: logger.info("Hotword listening: DISABLED (HOTWORD_ENABLED=True but sinks unavailable)") else: logger.info("Hotword listening: DISABLED (HOTWORD_ENABLED unset/false)") @client.event async def on_message(message: discord.Message): # Ignore our own messages if message.author.id == client.user.id: return # "hey bashar logs" -> send last lines of bot.log try: content_raw = message.content or "" if content_raw.lower().strip() == f"{BOT_WAKE_WORD} logs": def _tail_lines(path: str, max_lines: int = 300) -> str: try: with open(path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() return "".join(lines[-max_lines:]) except Exception as e: return f"Failed to read log: {e}" tail = _tail_lines("bot.log", 300) if len(tail) > 1800: with io.BytesIO(tail.encode("utf-8", errors="replace")) as bio: bio.seek(0) await message.channel.send(content="Here are the last 300 lines of bot.log", file=discord.File(bio, filename="bot-tail.txt")) else: await message.channel.send(f"```{tail}```") return except Exception as e: logger.exception("Failed handling 'logs' command: %s", e) # fall through # Handle audio/voice-message attachments for STT if message.attachments: for att in message.attachments: ct = (att.content_type or "").lower() filename = (att.filename or "").lower() is_audio = ct.startswith("audio/") or any(filename.endswith(ext) for ext in (".wav", ".mp3", ".m4a", ".ogg", ".oga", ".opus", ".webm")) if not is_audio: continue logger.info("Downloading audio attachment for STT: %s (%s)", att.filename, att.content_type) with tempfile.TemporaryDirectory() as tmpdir: tmp_path = os.path.join(tmpdir, att.filename or "audio_input") try: await att.save(tmp_path) logger.debug("Saved attachment to %s", tmp_path) except Exception as e: logger.exception("Failed to save attachment: %s", e) continue loop = asyncio.get_running_loop() try: text, score = await loop.run_in_executor(None, transcribe_file, tmp_path) except Exception as e: logger.exception("Transcription failed: %s", e) continue text_norm = (text or "").strip() if not text_norm: await message.channel.send("I couldn't hear anything useful in that audio.") continue logger.info("Transcribed: %s (score=%.3f)", text_norm, score) append_transcript_entry(_display_name(message.author), text_norm) await route_transcribed_command(message, text_norm) # Only process first audio attachment per message return content = message.content.strip() parsed = parse_wake_command(content) if not parsed: return action, args = parsed logger.debug("Wake word detected. Raw='%s' | action='%s' | args='%s'", content, action, args) append_transcript_entry(_display_name(message.author), content) if action == "logs": try: await send_recent_logs(message.channel) except Exception as e: logger.exception("Failed handling 'logs' command: %s", e) await message.channel.send("Couldn't fetch logs.") return # "hey bashar join" if action == "join": vc = await connect_to_author_channel(message) if vc: state = get_state_for_guild(message.guild.id) state.voice_client = vc await message.channel.send("Joined your voice channel. Say 'hey bashar play ' here.") logger.info("Joined voice channel for guild %s", message.guild.id) # Auto-start hotword listener await state.start_listening(message.channel) return # "hey bashar leave" if action == "leave": state = get_state_for_guild(message.guild.id) await state.stop_listening() if state.voice_client and state.voice_client.is_connected(): await message.channel.send("Leaving voice channel.") logger.info("Disconnecting from voice (guild %s)", message.guild.id) await state.voice_client.disconnect(force=True) return # "hey bashar play " if action == "play": if not args: await message.channel.send("Say 'hey bashar play '.") return await handle_play_query(message, args) return if action == "skip": state = get_state_for_guild(message.guild.id) state.skip_current() await message.channel.send("Skipped the current track.") return if action == "stop": state = get_state_for_guild(message.guild.id) state.stop_all() await message.channel.send("Stopped playback and cleared the queue.") return # Unknown await message.channel.send("Commands: 'hey bashar join', 'hey bashar play ', 'hey bashar skip', 'hey bashar stop', 'hey bashar leave'.") logger.debug("Sent help for unknown command") async def route_transcribed_command(message: discord.Message, text: str): """Route transcribed text to existing handlers if it starts with the wake word.""" parsed = parse_wake_command(text) if not parsed: logger.debug("Ignoring transcript without wake word: %s", text) return action, args = parsed if action == "join": vc = await connect_to_author_channel(message) if vc: state = get_state_for_guild(message.guild.id) state.voice_client = vc await message.channel.send("Joined your voice channel. Say 'hey bashar play ' here.") return if action == "leave": state = get_state_for_guild(message.guild.id) if state.voice_client and state.voice_client.is_connected(): await message.channel.send("Leaving voice channel.") await state.voice_client.disconnect(force=True) return if action == "play": if not args: await message.channel.send("Say 'hey bashar play '.") return await handle_play_query(message, args) return if action == "skip": state = get_state_for_guild(message.guild.id) state.skip_current() await message.channel.send("Skipped the current track.") return if action == "stop": state = get_state_for_guild(message.guild.id) state.stop_all() await message.channel.send("Stopped playback and cleared the queue.") return await message.channel.send("Commands: 'hey bashar join', 'hey bashar play ', 'hey bashar skip', 'hey bashar stop', 'hey bashar leave'.") async def route_transcribed_command_from_member(guild: discord.Guild, member: discord.Member, text_channel: discord.abc.Messageable, text: str): logger.debug( "Routing voice transcript (guild %s, speaker=%s): %s", guild.id, getattr(member, "display_name", member), text, ) parsed = parse_wake_command(text) if not parsed: return action, args = parsed if action == "join": # Connect to the member's voice channel if not member.voice or not member.voice.channel: await text_channel.send("Join a voice channel first, then say 'hey bashar join'.") return state = get_state_for_guild(guild.id) vc = await _get_active_voice_client(guild) try: if vc and vc.channel != member.voice.channel and vc.is_connected(): try: await vc.move_to(member.voice.channel) except Exception: try: await vc.disconnect(force=True) except Exception: pass vc = await connect_voice_with_retry(member.voice.channel) elif not vc: vc = await connect_voice_with_retry(member.voice.channel) except Exception as e: logger.exception("Voice connect retries exhausted (guild %s): %s", guild.id, e) await text_channel.send("I couldn't join the voice channel (error 4006). Try again in a few seconds.") return state.voice_client = vc await text_channel.send("Joined your voice channel. Say 'hey bashar play ' here.") # Start listening if not already await state.start_listening(text_channel) return if action == "leave": state = get_state_for_guild(guild.id) await state.stop_listening() if state.voice_client and state.voice_client.is_connected(): await text_channel.send("Leaving voice channel.") await state.voice_client.disconnect(force=True) return if action == "play": if not args: await text_channel.send("Say 'hey bashar play '.") return await handle_play_for_member(guild, member, text_channel, args) return if action == "skip": state = get_state_for_guild(guild.id) state.skip_current() await text_channel.send("Skipped the current track.") return if action == "stop": state = get_state_for_guild(guild.id) state.stop_all() await text_channel.send("Stopped playback and cleared the queue.") return await text_channel.send("Commands: 'hey bashar join', 'hey bashar play ', 'hey bashar skip', 'hey bashar stop', 'hey bashar leave'.") @client.event async def on_voice_state_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): # Log only the bot's own state changes to avoid noise if client.user and member.id == client.user.id: logger.info( "Voice state update (guild %s): %s -> %s (self_mute=%s deaf=%s)", member.guild.id, getattr(before.channel, "name", None), getattr(after.channel, "name", None), after.self_mute, after.self_deaf, ) @client.event async def on_error(event_method: str, /, *args, **kwargs): logger.exception("Unhandled error in event '%s'", event_method) if __name__ == "__main__": token = os.getenv("DISCORD_TOKEN") if not token: raise SystemExit("Missing DISCORD_TOKEN in environment. Create a .env file or set the variable.") client.run(token)