Files
BasharBotV2/bot.py
2025-11-12 23:11:11 -05:00

1305 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import concurrent.futures
import io
import logging
import logging.handlers
import os
import re
import tempfile
import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Deque, Optional, Tuple
import discord
import numpy as np
import pyttsx3
import soundfile as sf
from concurrent.futures import ThreadPoolExecutor
from discord import Intents
from discord.errors import ClientException, ConnectionClosed
from dotenv import load_dotenv
from yt_dlp import YoutubeDL
from stt import transcribe_file
try:
from discord import sinks # Available in discord.py >=2.0 and py-cord
HAS_SINKS = True
except Exception:
HAS_SINKS = False
load_dotenv()
# ---------- Logging ----------
_log_level = os.getenv("LOG_LEVEL", "INFO").upper()
HOTWORD_ENABLED = os.getenv("HOTWORD_ENABLED", "true").lower() in {"1", "true", "yes", "on"}
logging.basicConfig(level=getattr(logging, _log_level, logging.INFO))
_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s", "%H:%M:%S")
# Root logger handlers
_root = logging.getLogger()
_root.setLevel(getattr(logging, _log_level, logging.INFO))
_have_console = any(isinstance(h, logging.StreamHandler) for h in _root.handlers)
if not _have_console:
_ch = logging.StreamHandler()
_ch.setLevel(getattr(logging, _log_level, logging.INFO))
_ch.setFormatter(_formatter)
_root.addHandler(_ch)
_have_file = any(isinstance(h, logging.handlers.RotatingFileHandler) for h in _root.handlers)
if not _have_file:
try:
_fh = logging.handlers.RotatingFileHandler("bot.log", maxBytes=2 * 1024 * 1024, backupCount=3, encoding="utf-8")
_fh.setLevel(getattr(logging, _log_level, logging.DEBUG))
_fh.setFormatter(_formatter)
_root.addHandler(_fh)
except Exception:
# Continue without file logging if filesystem not writable
pass
# Tweak library log levels
logging.getLogger("discord").setLevel(logging.INFO)
logging.getLogger("aiohttp").setLevel(logging.INFO)
logger = logging.getLogger("basharbot")
# ---------- Global configuration ----------
FFMPEG_OPTIONS = {
"before_options": "-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5",
"options": "-vn"
}
YTDLP_OPTIONS = {
"format": "bestaudio/best",
"noplaylist": True,
"default_search": "ytsearch1",
"quiet": True,
"source_address": "0.0.0.0",
}
BOT_WAKE_WORD = "hey bashar"
WAKE_WORD_TOKENS = BOT_WAKE_WORD.split()
WAKE_FIRST_WORDS = {"hey", "hi", "hay", "heyya", "heyyo", "yo", "ok", "okay", "oi"}
WAKE_SECOND_WORDS = {
"bashar",
"basher",
"beshar",
"busher",
"buchar",
"bishar",
"bishor",
"bichar",
"bisharp",
"bishars",
"bisharp",
"bayshar",
"bashara",
"bashir",
"basheer",
"bishara",
"basar",
"bahsar",
"vishar",
"vashar",
"wishar",
"bishaar",
"pashar",
}
COMMAND_ALIASES = {
"next": "skip",
}
PCM_SAMPLE_RATE = 48000
PCM_CHANNELS = 2
PCM_SAMPLE_WIDTH = 2 # bytes per sample
PCM_BYTES_PER_SECOND = PCM_SAMPLE_RATE * PCM_CHANNELS * PCM_SAMPLE_WIDTH
TRANSCRIPT_LOG_ENABLED = os.getenv("TRANSCRIPT_LOG_ENABLED", "true").lower() in {"1", "true", "yes", "on"}
TRANSCRIPT_LOG_PATH = os.getenv("TRANSCRIPT_LOG_PATH", "transcript.log")
GOODBOY_USER_ID = int(os.getenv("GOODBOY_USER_ID", "94578724413902848"))
GOODBOY_AUDIO_PATH = os.path.join(os.getcwd(), "goodboy.ogg")
def _display_name(user: object) -> str:
name = getattr(user, "display_name", None) or getattr(user, "name", None)
if not name:
name = str(user)
return name
def append_transcript_entry(name: str, text: str) -> None:
if not TRANSCRIPT_LOG_ENABLED:
return
try:
timestamp = datetime.now().strftime("%m/%d/%Y %H:%M")
line = f"{timestamp} {name} - {text}".rstrip() + "\n"
with open(TRANSCRIPT_LOG_PATH, "a", encoding="utf-8") as log_file:
log_file.write(line)
except Exception as exc:
logger.warning("Failed to append to transcript log: %s", exc)
_ENGLISH_SENTENCE_RE = re.compile(r"^[a-zA-Z0-9 ,.'\"?!-]+$")
def is_probably_english_sentence(text: str) -> bool:
if not text:
return False
if len(text.split()) < 3:
return False
if not text.endswith((".", "!", "?")):
return False
return bool(_ENGLISH_SENTENCE_RE.match(text))
async def announce_listening_roster(channel, voice_channel: Optional[discord.VoiceChannel]):
if channel is None or voice_channel is None:
return
others = [m for m in voice_channel.members if m != voice_channel.guild.me]
if not others:
return
names = ", ".join(_display_name(m) for m in others)
try:
await channel.send(f"Listening for: {names}")
except Exception:
logger.debug("Unable to send listening roster message")
def normalize_for_command(text: str) -> str:
cleaned = re.sub(r"[^a-z0-9\s]", " ", text.lower())
tokens = cleaned.split()
if tokens:
if tokens[0] in WAKE_FIRST_WORDS:
tokens[0] = "hey"
if len(tokens) > 1 and tokens[1] in WAKE_SECOND_WORDS:
tokens[1] = "bashar"
return " ".join(tokens)
def parse_wake_command(text: str) -> Optional[Tuple[str, str]]:
"""
Return (action, args) if the text starts with the wake word, after stripping punctuation.
Args may be empty. Returns None if wake word not detected.
"""
normalized = normalize_for_command(text)
if not normalized:
return None
tokens = normalized.split()
if len(tokens) < len(WAKE_WORD_TOKENS):
return None
if tokens[0] != "hey" or tokens[1] != "bashar":
return None
remainder = tokens[len(WAKE_WORD_TOKENS):]
if not remainder:
return ("", "")
action = remainder[0]
action = COMMAND_ALIASES.get(action, action)
args = " ".join(remainder[1:]) if len(remainder) > 1 else ""
return action, args
async def send_recent_logs(channel: discord.abc.Messageable, max_lines: int = 300) -> None:
def _tail_lines(path: str, max_lines: int = 300) -> str:
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
return "".join(lines[-max_lines:])
except Exception as e:
return f"Failed to read log: {e}"
tail = _tail_lines("bot.log", max_lines)
if len(tail) > 1800:
with io.BytesIO(tail.encode("utf-8", errors="replace")) as bio:
bio.seek(0)
await channel.send(content="Here are the last 300 lines of bot.log", file=discord.File(bio, filename="bot-tail.txt"))
else:
await channel.send(f"```{tail}```")
# ---------- Utilities ----------
def ensure_opus_loaded() -> bool:
"""Attempt to load the Opus library for voice receive/playback."""
if discord.opus.is_loaded():
logger.debug("Opus already loaded.")
return True
candidates = []
env_candidate = os.getenv("OPUS_LIB")
if env_candidate:
candidates.append(env_candidate)
candidates.extend([
"opus.dll",
"libopus-0.dll",
"libopus.so.0",
"libopus.so",
"libopus.dylib",
])
for name in candidates:
try:
discord.opus.load_opus(name)
logger.info("Loaded opus library: %s", name)
return True
except OSError:
continue
logger.warning("Opus library not found; install opuslib/opus-tools if voice receive misbehaves.")
return False
def ensure_ffmpeg_available() -> None:
"""Raise a clear error if ffmpeg is not in PATH."""
from shutil import which
path = which("ffmpeg")
if path is None:
logger.error("ffmpeg not found in PATH")
raise RuntimeError(
"ffmpeg not found in PATH. Install it and restart.\n"
"Windows (PowerShell): choco install ffmpeg -y"
)
logger.debug("ffmpeg found: %s", path)
def make_tts_engine() -> pyttsx3.Engine:
engine = pyttsx3.init()
# Slightly slower and clearer speech
try:
rate = engine.getProperty("rate")
engine.setProperty("rate", max(120, int(rate * 0.9)))
except Exception:
pass
logger.debug("Initialized TTS engine (pyttsx3)")
return engine
_tts_engine_singleton: Optional[pyttsx3.Engine] = None
_tts_executor: Optional[ThreadPoolExecutor] = None
def get_tts_engine() -> pyttsx3.Engine:
global _tts_engine_singleton
if _tts_engine_singleton is None:
_tts_engine_singleton = make_tts_engine()
return _tts_engine_singleton
def get_tts_executor() -> ThreadPoolExecutor:
global _tts_executor
if _tts_executor is None:
_tts_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="tts")
return _tts_executor
async def synthesize_tts_to_wav(text: str, wav_path: str) -> str:
"""Generate TTS to a WAV file using pyttsx3 in a background thread."""
loop = asyncio.get_running_loop()
engine = get_tts_engine()
def _save():
logger.debug("Synthesizing TTS to %s: %s", wav_path, (text if len(text) < 120 else text[:117] + "..."))
engine.save_to_file(text, wav_path)
engine.runAndWait()
await loop.run_in_executor(get_tts_executor(), _save)
logger.debug("TTS synthesis complete: %s", wav_path)
return wav_path
def yt_search_and_resolve(query: str) -> Tuple[str, str]:
"""
Search YouTube for the best match and return (stream_url, title).
Stream URL is suitable for ffmpeg input.
"""
logger.info("Resolving YouTube query: %s", query)
with YoutubeDL(YTDLP_OPTIONS) as ydl:
info = ydl.extract_info(query, download=False)
if "entries" in info:
info = info["entries"][0]
stream_url = info["url"]
title = info.get("title", "Unknown")
logger.info("Resolved track: title=%s", title)
return stream_url, title
# ---------- Audio playback management ----------
async def _force_cleanup_voice_client(guild: Optional[discord.Guild]) -> None:
if guild is None:
return
voice_client: Optional[discord.VoiceClient] = getattr(guild, "voice_client", None)
if voice_client is None:
# Ensure we don't retain stale session metadata if Discord believes we're still connected.
try:
bot_id = getattr(getattr(guild, "me", None), "id", None) or getattr(guild._state, "self_id", None)
if bot_id is not None:
removed = guild._voice_states.pop(bot_id, None)
if removed is not None:
logger.debug("Cleared lingering voice state for guild %s during cleanup", guild.id)
guild._state._voice_clients.pop(guild.id, None)
except Exception as state_err:
logger.debug("Failed to clear lingering voice state for guild %s: %s", getattr(guild, "id", "?"), state_err)
return
try:
await voice_client.disconnect(force=True)
logger.debug("Forced voice disconnect to clear stale session (guild %s)", guild.id)
except Exception as disconnect_err:
logger.debug("Force disconnect raised for guild %s: %s", guild.id, disconnect_err)
try:
voice_client.cleanup()
logger.debug("Voice cleanup fallback succeeded (guild %s)", guild.id)
except Exception as cleanup_err:
logger.debug("Voice cleanup fallback failed (guild %s): %s", guild.id, cleanup_err)
try:
bot_id = getattr(getattr(guild, "me", None), "id", None) or getattr(guild._state, "self_id", None)
if bot_id is not None:
removed = guild._voice_states.pop(bot_id, None)
if removed is not None:
logger.debug("Cleared lingering voice state post-disconnect for guild %s", guild.id)
except Exception as state_err:
logger.debug("Failed to clear voice state post-disconnect for guild %s: %s", guild.id, state_err)
try:
guild._state._voice_clients.pop(guild.id, None)
except Exception as remove_err:
logger.debug("Failed to clear cached voice client for guild %s: %s", guild.id, remove_err)
async def _get_active_voice_client(guild: Optional[discord.Guild]) -> Optional[discord.VoiceClient]:
if guild is None:
return None
voice_client: Optional[discord.VoiceClient] = getattr(guild, "voice_client", None)
try:
bot_id = getattr(getattr(guild, "me", None), "id", None) or getattr(guild._state, "self_id", None)
if bot_id is not None and voice_client is None and guild._voice_states.get(bot_id):
logger.debug("Detected lingering voice state without client for guild %s; clearing before reconnect", guild.id)
guild._voice_states.pop(bot_id, None)
guild._state._voice_clients.pop(guild.id, None)
except Exception as state_err:
logger.debug("Failed to inspect lingering voice state for guild %s: %s", getattr(guild, "id", "?"), state_err)
if voice_client and not voice_client.is_connected():
logger.debug("Detected stale voice client handle for guild %s; cleaning up before reuse", guild.id)
await _force_cleanup_voice_client(guild)
return getattr(guild, "voice_client", None)
return voice_client
async def connect_voice_with_retry(channel: discord.abc.Connectable, *, retries: int = 2, delay: float = 3.0) -> discord.VoiceClient:
"""
Robust voice connect helper that mitigates transient 4006 invalid session errors
by aggressively clearing stale session_id to force fresh handshakes.
"""
last_exc: Optional[Exception] = None
guild: Optional[discord.Guild] = getattr(channel, "guild", None)
if guild is None:
raise RuntimeError("Voice channel without guild cannot establish a connection.")
# Always start with a clean slate to avoid 4006 stale-session loops
await _force_cleanup_voice_client(guild)
await asyncio.sleep(0.5)
for attempt in range(1, retries + 1):
existing_vc: Optional[discord.VoiceClient] = await _get_active_voice_client(guild)
if existing_vc and existing_vc.channel == channel and existing_vc.is_connected():
logger.info("Re-using existing voice connection to %s (attempt %d/%d)", channel, attempt, retries)
return existing_vc
try:
logger.info("Attempting voice connect to %s (attempt %d/%d)", channel, attempt, retries)
# WORKAROUND for persistent 4006: Manually create VoiceClient and patch it to prevent session resume
from discord import VoiceClient as VoiceClientClass
state = guild._state
key_id = guild.id
# Remove any existing voice client to force fresh connection
state._remove_voice_client(key_id)
# Create new voice client
voice_client = VoiceClientClass(state._get_client(), channel)
# Force clear any stale session_id that might be cached
if hasattr(voice_client, 'session_id'):
voice_client.session_id = None
# Register it
state._add_voice_client(key_id, voice_client)
# Now connect with no reconnect to avoid resume attempts
try:
await voice_client.connect(timeout=25.0, reconnect=False)
except Exception:
# If connection fails, clean up the registered client
state._remove_voice_client(key_id)
raise
vc = voice_client
# Ensure we are self-undeafened so we can receive audio
try:
await channel.guild.change_voice_state(channel=channel, self_mute=False, self_deaf=False)
except Exception as change_err:
logger.debug("Unable to explicitly set voice state: %s", change_err)
if not vc.is_connected():
logger.warning("Voice connect returned but client not connected on attempt %d; cleaning up (guild %s)", attempt, guild.id)
await _force_cleanup_voice_client(guild)
raise ConnectionClosed(None, shard_id=None, code=4006)
logger.info("Voice connect succeeded on attempt %d (%s)", attempt, channel)
return vc
except ConnectionClosed as e:
last_exc = e
logger.warning(
"Voice connect failed with ConnectionClosed(code=%s, reason=%s) on attempt %d/%d",
getattr(e, "code", None),
getattr(e, "reason", None),
attempt,
retries,
)
await _force_cleanup_voice_client(guild)
except ClientException as e:
last_exc = e
logger.warning(
"Voice connect failed with ClientException('%s') on attempt %d/%d",
e,
attempt,
retries,
)
if "Already connected to a voice channel" in str(e):
existing_vc = await _get_active_voice_client(guild)
if existing_vc and existing_vc.channel == channel and existing_vc.is_connected():
logger.info("Detected active voice session despite ClientException; re-using existing client on guild %s", guild.id)
return existing_vc
await _force_cleanup_voice_client(guild)
except Exception as e:
last_exc = e
logger.exception("Voice connect raised %s on attempt %d/%d", e, attempt, retries)
await _force_cleanup_voice_client(guild)
# Reset voice state and wait before retrying
try:
await guild.change_voice_state(channel=None, self_mute=False, self_deaf=False)
await asyncio.sleep(0.5)
except Exception as reset_err:
logger.debug("Failed to reset guild voice state: %s", reset_err)
if attempt < retries:
wait_time = delay * (1.5 ** (attempt - 1))
logger.info("Waiting %.1fs before retry %d (guild %s)", wait_time, attempt + 1, guild.id)
await asyncio.sleep(wait_time)
assert last_exc is not None
raise last_exc
@dataclass
class QueueItem:
title: str
source_factory: Callable[[], discord.AudioSource]
announce: Optional[str] = None
class HotwordStreamSink(sinks.Sink):
def __init__(
self,
state: "GuildAudioState",
text_channel: discord.abc.Messageable,
loop: asyncio.AbstractEventLoop,
min_chunk_seconds: float = 1.0,
window_seconds: float = 4.5,
inactivity_seconds: float = 1.0,
):
super().__init__()
self.state = state
self.text_channel = text_channel
self.loop = loop
self.closed = False
self.buffers: defaultdict[int, bytearray] = defaultdict(bytearray)
self.last_activity: defaultdict[int, float] = defaultdict(lambda: 0.0)
self.processing_users: set[int] = set()
self.pending_tasks: dict[int, concurrent.futures.Future] = {}
self.min_chunk_bytes = int(max(PCM_BYTES_PER_SECOND * min_chunk_seconds, PCM_BYTES_PER_SECOND * 0.5))
self.window_bytes = int(PCM_BYTES_PER_SECOND * window_seconds)
self.inactivity_seconds = inactivity_seconds
def close(self):
self.closed = True
for fut in list(self.pending_tasks.values()):
try:
fut.cancel()
except Exception:
pass
self.pending_tasks.clear()
self.buffers.clear()
self.processing_users.clear()
def update_text_channel(self, channel: discord.abc.Messageable):
self.text_channel = channel
def cleanup(self):
self.closed = True
for fut in list(self.pending_tasks.values()):
try:
fut.cancel()
except Exception:
pass
self.pending_tasks.clear()
return super().cleanup()
@sinks.Filters.container
def write(self, data, user):
if self.closed or user is None:
return
try:
user_id = int(user)
except Exception:
return
buffer = self.buffers[user_id]
buffer.extend(data)
if len(buffer) > self.window_bytes:
del buffer[: len(buffer) - int(self.window_bytes)]
now = time.perf_counter()
self.last_activity[user_id] = now
if len(buffer) < self.min_chunk_bytes:
return
existing = self.pending_tasks.get(user_id)
if existing and not existing.done():
existing.cancel()
self.pending_tasks.pop(user_id, None)
async def delayed_dispatch(uid: int, expected_time: float):
try:
await asyncio.sleep(self.inactivity_seconds)
if self.closed:
return
last = self.last_activity.get(uid, 0.0)
if abs(last - expected_time) > 1e-6:
return
buffer = self.buffers.get(uid)
if not buffer or len(buffer) < self.min_chunk_bytes:
return
if uid in self.processing_users:
return
self.processing_users.add(uid)
chunk = bytes(buffer)
buffer.clear()
try:
await self.state.handle_hotword_buffer(uid, chunk, self.text_channel)
finally:
self.processing_users.discard(uid)
except asyncio.CancelledError:
return
finally:
self.pending_tasks.pop(uid, None)
future = asyncio.run_coroutine_threadsafe(delayed_dispatch(user_id, now), self.loop)
def _done_callback(fut, uid=user_id):
if fut.cancelled():
return
try:
fut.result()
except asyncio.CancelledError:
return
except Exception as exc:
logger.exception("Hotword delayed dispatch failed for user %s: %s", uid, exc)
finally:
self.pending_tasks.pop(uid, None)
future.add_done_callback(_done_callback)
self.pending_tasks[user_id] = future
@dataclass
class GuildAudioState:
guild_id: int
voice_client: Optional[discord.VoiceClient] = None
queue: Deque[QueueItem] = field(default_factory=deque)
current: Optional[QueueItem] = None
player_task: Optional[asyncio.Task] = None
queue_event: asyncio.Event = field(default_factory=asyncio.Event)
listen_enabled: bool = False
hotword_sink: Optional["HotwordStreamSink"] = None
last_transcripts: dict[int, Tuple[str, float]] = field(default_factory=dict)
async def ensure_player(self):
if self.player_task is None or self.player_task.done():
logger.debug("Starting player loop for guild %s", self.guild_id)
self.player_task = asyncio.create_task(self._player_loop())
def enqueue(self, item: QueueItem):
self.queue.append(item)
logger.info("Enqueued: %s (qsize=%d) on guild %s", item.title, len(self.queue), self.guild_id)
self.queue_event.set()
def skip_current(self):
if self.voice_client and self.voice_client.is_playing():
logger.info("Skipping current track (guild %s)", self.guild_id)
self.voice_client.stop()
def stop_all(self):
logger.info("Stopping playback and clearing queue (guild %s)", self.guild_id)
self.queue.clear()
if self.voice_client and self.voice_client.is_playing():
self.voice_client.stop()
self.current = None
self.queue_event.set()
async def enqueue_local_clip(self, text_channel: discord.abc.Messageable, file_path: str):
if not os.path.exists(file_path):
await text_channel.send("Audio clip not found on server.")
return
if not self.voice_client or not self.voice_client.is_connected():
await text_channel.send("I'm not in a voice channel right now.")
return
def source_factory() -> discord.AudioSource:
return discord.FFmpegPCMAudio(file_path, **FFMPEG_OPTIONS)
item = QueueItem(title=os.path.basename(file_path), source_factory=source_factory, announce=None)
self.enqueue(item)
await self.ensure_player()
async def _play_tts(self, text: str):
if not self.voice_client or not self.voice_client.is_connected():
logger.debug("Skipping TTS; voice client not connected (guild %s)", self.guild_id)
return
with tempfile.TemporaryDirectory() as tmpdir:
tts_path = os.path.join(tmpdir, "tts.wav")
await synthesize_tts_to_wav(text, tts_path)
source = discord.FFmpegPCMAudio(tts_path, **FFMPEG_OPTIONS)
fut = asyncio.get_running_loop().create_future()
def after_playback(_):
logger.debug("Finished TTS playback (guild %s)", self.guild_id)
if not fut.done():
fut.set_result(True)
try:
self.voice_client.play(source, after=after_playback)
except Exception as e:
logger.exception("Error starting TTS playback (guild %s): %s", self.guild_id, e)
return
await fut
await asyncio.sleep(0.1)
async def _player_loop(self):
logger.debug("Player loop running (guild %s)", self.guild_id)
while True:
await self.queue_event.wait()
self.queue_event.clear()
if not self.voice_client or not self.voice_client.is_connected():
logger.debug("No connected voice client; waiting (guild %s)", self.guild_id)
await asyncio.sleep(0.2)
continue
if self.voice_client.is_playing():
# Will trigger when current track finishes (after callback sets event)
logger.debug("Voice already playing; yielding (guild %s)", self.guild_id)
await asyncio.sleep(0.2)
continue
if not self.queue:
# Nothing to play, wait
logger.debug("Queue empty; waiting (guild %s)", self.guild_id)
continue
self.current = self.queue.popleft()
logger.info("Starting next track: %s (remaining qsize=%d) guild %s", self.current.title, len(self.queue), self.guild_id)
# Announce if needed
if self.current.announce:
try:
await self._play_tts(self.current.announce)
except Exception as e:
# Announce failures shouldn't break playback
logger.warning("TTS announce failed (guild %s): %s", self.guild_id, e)
# Start playing audio
source = self.current.source_factory()
play_done = asyncio.get_running_loop().create_future()
def _after_play(err: Optional[Exception]):
# FFmpeg end or error wake the loop
if err:
logger.error("Playback finished with error (guild %s): %s", self.guild_id, err)
else:
logger.debug("Playback finished normally (guild %s)", self.guild_id)
if not play_done.done():
play_done.set_result(True)
# Nudge the loop to consider the next item
self.queue_event.set()
try:
self.voice_client.play(source, after=_after_play)
logger.info("FFmpeg playback started (guild %s) on channel %s", self.guild_id, getattr(self.voice_client.channel, "name", "?"))
except Exception as e:
logger.exception("Failed to start playback (guild %s): %s", self.guild_id, e)
self.current = None
# Wake loop to attempt next or wait
self.queue_event.set()
continue
await play_done
self.current = None
async def start_listening(self, text_channel: discord.abc.Messageable):
if not HOTWORD_ENABLED:
logger.debug("Hotword listening disabled by environment (guild %s)", self.guild_id)
return
if not HAS_SINKS:
logger.warning("Hotword listening requested but sinks are unavailable on this stack.")
try:
await text_channel.send("Live hotword listening is unavailable on this install. Send a voice message instead.")
except Exception:
pass
return
if not self.voice_client or not self.voice_client.is_connected():
logger.debug("Cannot start listener without an active voice client (guild %s)", self.guild_id)
return
self.listen_enabled = True
self.last_transcripts.clear()
if self.hotword_sink and not self.hotword_sink.closed:
self.hotword_sink.update_text_channel(text_channel)
logger.debug("Hotword listener already running (guild %s)", self.guild_id)
return
# If another recording is running, stop it first
if getattr(self.voice_client, "recording", False):
try:
self.voice_client.stop_recording()
except Exception:
pass
loop = asyncio.get_running_loop()
sink = HotwordStreamSink(self, text_channel, loop)
self.hotword_sink = sink
logger.info("Starting continuous hotword listener (guild %s)", self.guild_id)
async def _finished_callback(sink_obj, *_):
await self._on_sink_finished(sink_obj)
self.voice_client.start_recording(sink, _finished_callback)
channel = getattr(self.voice_client, "channel", None)
if channel:
others = [m for m in channel.members if m.id != client.user.id]
if others:
names = ", ".join(_display_name(m) for m in others)
try:
await text_channel.send(f"Listening for: {names}")
except Exception:
logger.debug("Unable to send listening roster message (guild %s)", self.guild_id)
async def _on_sink_finished(self, sink_obj: "HotwordStreamSink"):
sink_obj.close()
if self.hotword_sink is sink_obj:
self.hotword_sink = None
self.listen_enabled = False
logger.debug("Hotword sink finished (guild %s)", self.guild_id)
async def stop_listening(self):
if self.listen_enabled:
logger.info("Stopping hotword listener (guild %s)", self.guild_id)
self.listen_enabled = False
sink = self.hotword_sink
if sink:
sink.close()
if self.voice_client and getattr(self.voice_client, "recording", False):
try:
self.voice_client.stop_recording()
except Exception:
pass
self.hotword_sink = None
async def handle_hotword_buffer(self, user_id: int, pcm_bytes: bytes, text_channel: discord.abc.Messageable):
if not pcm_bytes:
return
if not self.voice_client or not self.voice_client.is_connected():
return
guild = client.get_guild(self.guild_id)
if guild is None:
return
member = guild.get_member(int(user_id))
if member is None:
return
samples = np.frombuffer(pcm_bytes, dtype=np.int16)
if samples.size == 0:
return
if PCM_CHANNELS > 1:
usable = (samples.size // PCM_CHANNELS) * PCM_CHANNELS
if usable == 0:
return
samples = samples[:usable].reshape(-1, PCM_CHANNELS)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
try:
sf.write(tmp_path, samples, PCM_SAMPLE_RATE, subtype="PCM_16")
loop = asyncio.get_running_loop()
text, score = await loop.run_in_executor(None, transcribe_file, tmp_path)
except Exception as e:
logger.warning("Transcription failed for speaker %s: %s", user_id, e)
return
finally:
try:
os.remove(tmp_path)
except OSError:
pass
await self.handle_hotword_transcript(user_id, text, score, member, text_channel)
async def handle_hotword_transcript(self, user_id: int, text: str, score: float, member: discord.Member, text_channel: discord.abc.Messageable):
text_norm = (text or "").strip()
logger.info(
"Hotword transcript (stream) guild=%s speaker=%s text='%s' score=%.3f",
self.guild_id,
getattr(member, "display_name", member),
text_norm,
score,
)
if not text_norm:
return
normalized = normalize_for_command(text_norm)
if not normalized:
return
now = time.perf_counter()
last = self.last_transcripts.get(user_id)
if last and last[0] == normalized and (now - last[1]) < 2.0:
logger.debug("Skipping duplicate transcript for speaker %s (guild %s)", member, self.guild_id)
return
append_transcript_entry(_display_name(member), text_norm)
self.last_transcripts[user_id] = (normalized, now)
if member.id == GOODBOY_USER_ID and is_probably_english_sentence(text_norm):
await self.enqueue_local_clip(text_channel, GOODBOY_AUDIO_PATH)
await route_transcribed_command_from_member(member.guild, member, text_channel, text_norm)
# ---------- Bot setup ----------
intents = Intents.default()
intents.message_content = True
intents.voice_states = True
client = discord.Client(intents=intents)
audio_states: dict[int, GuildAudioState] = {}
def get_state_for_guild(guild_id: int) -> GuildAudioState:
state = audio_states.get(guild_id)
if state is None:
state = GuildAudioState(guild_id=guild_id)
audio_states[guild_id] = state
return state
async def connect_to_author_channel(message: discord.Message) -> Optional[discord.VoiceClient]:
if not isinstance(message.author, discord.Member):
return None
logger.debug("Connect requested by %s in guild %s", message.author, getattr(message.guild, "id", "?"))
voice_state = message.author.voice
if not voice_state or not voice_state.channel:
logger.info("Author not in a voice channel; cannot join (guild %s)", getattr(message.guild, "id", "?"))
await message.channel.send("Join a voice channel first, then say 'hey bashar join'.")
return None
channel = voice_state.channel
vc = await _get_active_voice_client(message.guild)
if vc and vc.channel == channel and vc.is_connected():
logger.debug("Already connected to requested channel: %s (guild %s)", channel, getattr(message.guild, "id", "?"))
return vc
if vc:
try:
logger.info("Moving voice client to channel: %s (guild %s)", channel, getattr(message.guild, "id", "?"))
await vc.move_to(channel)
await announce_listening_roster(message.channel, channel)
return vc
except Exception as e:
logger.warning("Move failed; reconnecting fresh (guild %s): %s", getattr(message.guild, "id", "?"), e)
try:
await vc.disconnect(force=True)
except Exception:
pass
try:
vc = await connect_voice_with_retry(channel)
await announce_listening_roster(message.channel, channel)
except Exception as e:
logger.exception("Voice connect retries exhausted (guild %s): %s", getattr(message.guild, "id", "?"), e)
await message.channel.send("I couldn't join the voice channel (error 4006). Try again in a few seconds.")
return None
else:
logger.info("Connecting to voice channel: %s (guild %s)", channel, getattr(message.guild, "id", "?"))
try:
vc = await connect_voice_with_retry(channel)
await announce_listening_roster(message.channel, channel)
except Exception as e:
logger.exception("Voice connect retries exhausted (guild %s): %s", getattr(message.guild, "id", "?"), e)
await message.channel.send("I couldn't join the voice channel (error 4006). Try again in a few seconds.")
return None
if vc and vc.is_connected():
logger.info("Connected to voice: %s (guild %s)", vc.channel, getattr(message.guild, "id", "?"))
else:
logger.error("Voice connect returned but not connected (guild %s)", getattr(message.guild, "id", "?"))
return vc
def make_ffmpeg_source(url: str) -> discord.AudioSource:
logger.debug("Creating FFmpeg source for url: %s", url[:64] + ("..." if len(url) > 64 else ""))
return discord.FFmpegPCMAudio(url, **FFMPEG_OPTIONS)
async def handle_play_query(message: discord.Message, query: str):
assert message.guild is not None
logger.info("Play request: '%s' (guild %s)", query, message.guild.id)
state = get_state_for_guild(message.guild.id)
vc = await connect_to_author_channel(message)
if vc is None:
logger.info("Aborting play; no voice client (guild %s)", message.guild.id)
return
state.voice_client = vc
# Resolve YouTube stream and title
try:
stream_url, title = await asyncio.get_running_loop().run_in_executor(
None, yt_search_and_resolve, query
)
except Exception as e:
logger.exception("YouTube resolve failed for '%s' (guild %s): %s", query, message.guild.id, e)
await message.channel.send(f"Couldn't find or load audio for: {query}")
return
# Enqueue TTS announcement + track
item = QueueItem(
title=title,
source_factory=lambda: make_ffmpeg_source(stream_url),
announce=f"Playing {title}"
)
state.enqueue(item)
await state.ensure_player()
await message.channel.send(f"Queued: {title}")
logger.info("Queued and scheduled: %s (guild %s)", title, message.guild.id)
async def handle_play_for_member(guild: discord.Guild, member: discord.Member, text_channel: discord.abc.Messageable, query: str):
logger.info("Play request (voice) by %s: '%s' (guild %s)", getattr(member, "display_name", member.id), query, guild.id)
state = get_state_for_guild(guild.id)
# Ensure voice connected to member channel
if not member.voice or not member.voice.channel:
await text_channel.send("Join a voice channel first, then say 'hey bashar play <song>'.")
return
channel = member.voice.channel
vc = await _get_active_voice_client(guild)
try:
if vc and vc.channel != channel and vc.is_connected():
try:
await vc.move_to(channel)
except Exception:
try:
await vc.disconnect(force=True)
except Exception:
pass
vc = await connect_voice_with_retry(channel)
elif not vc:
vc = await connect_voice_with_retry(channel)
except Exception as e:
logger.exception("Voice connect retries exhausted (guild %s): %s", guild.id, e)
await text_channel.send("I couldn't join the voice channel (error 4006). Try again in a few seconds.")
return
state.voice_client = vc
# Resolve and enqueue
try:
stream_url, title = await asyncio.get_running_loop().run_in_executor(None, yt_search_and_resolve, query)
except Exception as e:
logger.exception("YouTube resolve failed for '%s' (guild %s): %s", query, guild.id, e)
await text_channel.send(f"Couldn't find or load audio for: {query}")
return
item = QueueItem(
title=title,
source_factory=lambda: make_ffmpeg_source(stream_url),
announce=f"Playing {title}"
)
state.enqueue(item)
await state.ensure_player()
await text_channel.send(f"Queued: {title}")
logger.info("Queued and scheduled (voice): %s (guild %s)", title, guild.id)
@client.event
async def on_ready():
logger.info("Logged in as %s (id=%s)", client.user, client.user.id)
ensure_ffmpeg_available()
ensure_opus_loaded()
logger.info("Startup checks OK")
if HOTWORD_ENABLED and HAS_SINKS:
logger.info("Hotword listening: ENABLED (sinks available and HOTWORD_ENABLED=True)")
elif HOTWORD_ENABLED and not HAS_SINKS:
logger.info("Hotword listening: DISABLED (HOTWORD_ENABLED=True but sinks unavailable)")
else:
logger.info("Hotword listening: DISABLED (HOTWORD_ENABLED unset/false)")
@client.event
async def on_message(message: discord.Message):
# Ignore our own messages
if message.author.id == client.user.id:
return
# "hey bashar logs" -> send last lines of bot.log
try:
content_raw = message.content or ""
if content_raw.lower().strip() == f"{BOT_WAKE_WORD} logs":
def _tail_lines(path: str, max_lines: int = 300) -> str:
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
return "".join(lines[-max_lines:])
except Exception as e:
return f"Failed to read log: {e}"
tail = _tail_lines("bot.log", 300)
if len(tail) > 1800:
with io.BytesIO(tail.encode("utf-8", errors="replace")) as bio:
bio.seek(0)
await message.channel.send(content="Here are the last 300 lines of bot.log", file=discord.File(bio, filename="bot-tail.txt"))
else:
await message.channel.send(f"```{tail}```")
return
except Exception as e:
logger.exception("Failed handling 'logs' command: %s", e)
# fall through
# Handle audio/voice-message attachments for STT
if message.attachments:
for att in message.attachments:
ct = (att.content_type or "").lower()
filename = (att.filename or "").lower()
is_audio = ct.startswith("audio/") or any(filename.endswith(ext) for ext in (".wav", ".mp3", ".m4a", ".ogg", ".oga", ".opus", ".webm"))
if not is_audio:
continue
logger.info("Downloading audio attachment for STT: %s (%s)", att.filename, att.content_type)
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = os.path.join(tmpdir, att.filename or "audio_input")
try:
await att.save(tmp_path)
logger.debug("Saved attachment to %s", tmp_path)
except Exception as e:
logger.exception("Failed to save attachment: %s", e)
continue
loop = asyncio.get_running_loop()
try:
text, score = await loop.run_in_executor(None, transcribe_file, tmp_path)
except Exception as e:
logger.exception("Transcription failed: %s", e)
continue
text_norm = (text or "").strip()
if not text_norm:
await message.channel.send("I couldn't hear anything useful in that audio.")
continue
logger.info("Transcribed: %s (score=%.3f)", text_norm, score)
append_transcript_entry(_display_name(message.author), text_norm)
await route_transcribed_command(message, text_norm)
# Only process first audio attachment per message
return
content = message.content.strip()
parsed = parse_wake_command(content)
if not parsed:
return
action, args = parsed
logger.debug("Wake word detected. Raw='%s' | action='%s' | args='%s'", content, action, args)
append_transcript_entry(_display_name(message.author), content)
if action == "logs":
try:
await send_recent_logs(message.channel)
except Exception as e:
logger.exception("Failed handling 'logs' command: %s", e)
await message.channel.send("Couldn't fetch logs.")
return
# "hey bashar join"
if action == "join":
vc = await connect_to_author_channel(message)
if vc:
state = get_state_for_guild(message.guild.id)
state.voice_client = vc
await message.channel.send("Joined your voice channel. Say 'hey bashar play <song>' here.")
logger.info("Joined voice channel for guild %s", message.guild.id)
# Auto-start hotword listener
await state.start_listening(message.channel)
return
# "hey bashar leave"
if action == "leave":
state = get_state_for_guild(message.guild.id)
await state.stop_listening()
if state.voice_client and state.voice_client.is_connected():
await message.channel.send("Leaving voice channel.")
logger.info("Disconnecting from voice (guild %s)", message.guild.id)
await state.voice_client.disconnect(force=True)
return
# "hey bashar play <query>"
if action == "play":
if not args:
await message.channel.send("Say 'hey bashar play <search terms>'.")
return
await handle_play_query(message, args)
return
if action == "skip":
state = get_state_for_guild(message.guild.id)
state.skip_current()
await message.channel.send("Skipped the current track.")
return
if action == "stop":
state = get_state_for_guild(message.guild.id)
state.stop_all()
await message.channel.send("Stopped playback and cleared the queue.")
return
# Unknown
await message.channel.send("Commands: 'hey bashar join', 'hey bashar play <song>', 'hey bashar skip', 'hey bashar stop', 'hey bashar leave'.")
logger.debug("Sent help for unknown command")
async def route_transcribed_command(message: discord.Message, text: str):
"""Route transcribed text to existing handlers if it starts with the wake word."""
parsed = parse_wake_command(text)
if not parsed:
logger.debug("Ignoring transcript without wake word: %s", text)
return
action, args = parsed
if action == "join":
vc = await connect_to_author_channel(message)
if vc:
state = get_state_for_guild(message.guild.id)
state.voice_client = vc
await message.channel.send("Joined your voice channel. Say 'hey bashar play <song>' here.")
return
if action == "leave":
state = get_state_for_guild(message.guild.id)
if state.voice_client and state.voice_client.is_connected():
await message.channel.send("Leaving voice channel.")
await state.voice_client.disconnect(force=True)
return
if action == "play":
if not args:
await message.channel.send("Say 'hey bashar play <search terms>'.")
return
await handle_play_query(message, args)
return
if action == "skip":
state = get_state_for_guild(message.guild.id)
state.skip_current()
await message.channel.send("Skipped the current track.")
return
if action == "stop":
state = get_state_for_guild(message.guild.id)
state.stop_all()
await message.channel.send("Stopped playback and cleared the queue.")
return
await message.channel.send("Commands: 'hey bashar join', 'hey bashar play <song>', 'hey bashar skip', 'hey bashar stop', 'hey bashar leave'.")
async def route_transcribed_command_from_member(guild: discord.Guild, member: discord.Member, text_channel: discord.abc.Messageable, text: str):
logger.debug(
"Routing voice transcript (guild %s, speaker=%s): %s",
guild.id,
getattr(member, "display_name", member),
text,
)
parsed = parse_wake_command(text)
if not parsed:
return
action, args = parsed
if action == "join":
# Connect to the member's voice channel
if not member.voice or not member.voice.channel:
await text_channel.send("Join a voice channel first, then say 'hey bashar join'.")
return
state = get_state_for_guild(guild.id)
vc = await _get_active_voice_client(guild)
try:
if vc and vc.channel != member.voice.channel and vc.is_connected():
try:
await vc.move_to(member.voice.channel)
except Exception:
try:
await vc.disconnect(force=True)
except Exception:
pass
vc = await connect_voice_with_retry(member.voice.channel)
elif not vc:
vc = await connect_voice_with_retry(member.voice.channel)
except Exception as e:
logger.exception("Voice connect retries exhausted (guild %s): %s", guild.id, e)
await text_channel.send("I couldn't join the voice channel (error 4006). Try again in a few seconds.")
return
state.voice_client = vc
await text_channel.send("Joined your voice channel. Say 'hey bashar play <song>' here.")
# Start listening if not already
await state.start_listening(text_channel)
return
if action == "leave":
state = get_state_for_guild(guild.id)
await state.stop_listening()
if state.voice_client and state.voice_client.is_connected():
await text_channel.send("Leaving voice channel.")
await state.voice_client.disconnect(force=True)
return
if action == "play":
if not args:
await text_channel.send("Say 'hey bashar play <search terms>'.")
return
await handle_play_for_member(guild, member, text_channel, args)
return
if action == "skip":
state = get_state_for_guild(guild.id)
state.skip_current()
await text_channel.send("Skipped the current track.")
return
if action == "stop":
state = get_state_for_guild(guild.id)
state.stop_all()
await text_channel.send("Stopped playback and cleared the queue.")
return
await text_channel.send("Commands: 'hey bashar join', 'hey bashar play <song>', 'hey bashar skip', 'hey bashar stop', 'hey bashar leave'.")
@client.event
async def on_voice_state_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
# Log only the bot's own state changes to avoid noise
if client.user and member.id == client.user.id:
logger.info(
"Voice state update (guild %s): %s -> %s (self_mute=%s deaf=%s)",
member.guild.id,
getattr(before.channel, "name", None),
getattr(after.channel, "name", None),
after.self_mute,
after.self_deaf,
)
@client.event
async def on_error(event_method: str, /, *args, **kwargs):
logger.exception("Unhandled error in event '%s'", event_method)
if __name__ == "__main__":
token = os.getenv("DISCORD_TOKEN")
if not token:
raise SystemExit("Missing DISCORD_TOKEN in environment. Create a .env file or set the variable.")
client.run(token)