from __future__ import annotations import json import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Any from zoneinfo import ZoneInfo DEFAULT_THREAD_ID = "default" def utc_now() -> datetime: return datetime.now(timezone.utc) def iso_now() -> str: return utc_now().isoformat() def iso_now_in_zone(zone: ZoneInfo) -> str: return utc_now().astimezone(zone).isoformat() def parse_iso(value: str) -> datetime: parsed = datetime.fromisoformat(value) if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed def time_since(value: str, now: datetime | None = None) -> str: then = parse_iso(value) current = now or utc_now() if current.tzinfo is None: current = current.replace(tzinfo=timezone.utc) seconds = max(0, int((current - then).total_seconds())) if seconds < 60: return f"{seconds} seconds ago" minutes = seconds // 60 if minutes < 60: return _plural(minutes, "minute") + " ago" hours = minutes // 60 if hours < 24: return _plural(hours, "hour") + " ago" days = hours // 24 return _plural(days, "day") + " ago" def _plural(value: int, unit: str) -> str: suffix = "" if value == 1 else "s" return f"{value} {unit}{suffix}" class MemoryStore: def __init__(self, path: str) -> None: self.path = Path(path).expanduser().resolve() self.path.parent.mkdir(parents=True, exist_ok=True) self._init_db() def _connect(self) -> sqlite3.Connection: connection = sqlite3.connect(self.path) connection.row_factory = sqlite3.Row return connection def _init_db(self) -> None: with self._connect() as db: db.executescript( """ CREATE TABLE IF NOT EXISTS chat_threads ( id TEXT PRIMARY KEY, title TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, thread_id TEXT, role TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, kind TEXT NOT NULL, content TEXT NOT NULL, importance INTEGER NOT NULL DEFAULT 3, metadata TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5( content, kind UNINDEXED, content='memories', content_rowid='id' ); CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN INSERT INTO memories_fts(rowid, content, kind) VALUES (new.id, new.content, new.kind); END; CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN INSERT INTO memories_fts(memories_fts, rowid, content, kind) VALUES('delete', old.id, old.content, old.kind); END; CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN INSERT INTO memories_fts(memories_fts, rowid, content, kind) VALUES('delete', old.id, old.content, old.kind); INSERT INTO memories_fts(rowid, content, kind) VALUES (new.id, new.content, new.kind); END; CREATE TABLE IF NOT EXISTS user_profile ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS scheduled_jobs ( id TEXT PRIMARY KEY, prompt TEXT NOT NULL, trigger_type TEXT NOT NULL, trigger_value TEXT NOT NULL, next_run_at TEXT, created_at TEXT NOT NULL, last_run_at TEXT, enabled INTEGER NOT NULL DEFAULT 1 ); CREATE TABLE IF NOT EXISTS outbox ( id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL, created_at TEXT NOT NULL, delivered_at TEXT ); """ ) self._ensure_column(db, "conversations", "thread_id", "TEXT") now = iso_now() db.execute( """ INSERT INTO chat_threads(id, title, created_at, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(id) DO NOTHING """, (DEFAULT_THREAD_ID, "New chat", now, now), ) db.execute( "UPDATE conversations SET thread_id = ? WHERE thread_id IS NULL OR thread_id = ''", (DEFAULT_THREAD_ID,), ) @staticmethod def _ensure_column(db: sqlite3.Connection, table: str, column: str, definition: str) -> None: columns = {row["name"] for row in db.execute(f"PRAGMA table_info({table})").fetchall()} if column not in columns: db.execute(f"ALTER TABLE {table} ADD COLUMN {column} {definition}") def ensure_thread(self, thread_id: str | None = None, title: str | None = None) -> dict[str, Any]: now = iso_now() resolved_id = (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID resolved_title = (title or "New chat").strip() or "New chat" with self._connect() as db: db.execute( """ INSERT INTO chat_threads(id, title, created_at, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET updated_at=excluded.updated_at """, (resolved_id, resolved_title, now, now), ) row = db.execute( "SELECT id, title, created_at, updated_at FROM chat_threads WHERE id = ?", (resolved_id,), ).fetchone() return dict(row) def create_thread(self, title: str | None = None) -> dict[str, Any]: import uuid thread_id = f"chat-{uuid.uuid4()}" return self.ensure_thread(thread_id, title or "New chat") def list_threads(self) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( """ SELECT t.id, t.title, t.created_at, t.updated_at, COUNT(c.id) AS message_count, MAX(c.created_at) AS last_message_at FROM chat_threads t LEFT JOIN conversations c ON c.thread_id = t.id GROUP BY t.id ORDER BY COALESCE(MAX(c.created_at), t.updated_at) DESC """ ).fetchall() return [dict(row) for row in rows] def delete_thread(self, thread_id: str) -> bool: with self._connect() as db: db.execute("DELETE FROM conversations WHERE thread_id = ?", (thread_id,)) cursor = db.execute("DELETE FROM chat_threads WHERE id = ?", (thread_id,)) return cursor.rowcount > 0 def rename_thread(self, thread_id: str, title: str) -> dict[str, Any] | None: clean_title = self._clean_thread_title(title) if not clean_title: return None now = iso_now() with self._connect() as db: db.execute( "UPDATE chat_threads SET title = ?, updated_at = ? WHERE id = ?", (clean_title, now, thread_id), ) row = db.execute( "SELECT id, title, created_at, updated_at FROM chat_threads WHERE id = ?", (thread_id,), ).fetchone() return dict(row) if row else None def get_thread(self, thread_id: str) -> dict[str, Any] | None: with self._connect() as db: row = db.execute( "SELECT id, title, created_at, updated_at FROM chat_threads WHERE id = ?", (thread_id,), ).fetchone() return dict(row) if row else None def add_conversation(self, role: str, content: str, thread_id: str | None = DEFAULT_THREAD_ID) -> None: resolved_thread_id = (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID self.ensure_thread(resolved_thread_id) now = iso_now() with self._connect() as db: db.execute( "INSERT INTO conversations(thread_id, role, content, created_at) VALUES (?, ?, ?, ?)", (resolved_thread_id, role, content, now), ) db.execute( "UPDATE chat_threads SET updated_at = ? WHERE id = ?", (now, resolved_thread_id), ) def last_interaction(self, thread_id: str | None = None) -> dict[str, Any] | None: with self._connect() as db: if thread_id: row = db.execute( """ SELECT thread_id, role, content, created_at FROM conversations WHERE thread_id = ? ORDER BY id DESC LIMIT 1 """, (thread_id,), ).fetchone() else: row = db.execute( "SELECT thread_id, role, content, created_at FROM conversations ORDER BY id DESC LIMIT 1" ).fetchone() return dict(row) if row else None def recent_conversation(self, limit: int = 8, thread_id: str | None = None) -> list[dict[str, Any]]: with self._connect() as db: if thread_id: rows = db.execute( """ SELECT id, thread_id, role, content, created_at FROM conversations WHERE thread_id = ? ORDER BY id DESC LIMIT ? """, (thread_id, limit), ).fetchall() else: rows = db.execute( "SELECT id, thread_id, role, content, created_at FROM conversations ORDER BY id DESC LIMIT ?", (limit,), ).fetchall() return [dict(row) for row in reversed(rows)] def remember(self, kind: str, content: str, importance: int = 3, metadata: dict[str, Any] | None = None) -> dict[str, Any]: now = iso_now() with self._connect() as db: cursor = db.execute( """ INSERT INTO memories(kind, content, importance, metadata, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, (kind, content, importance, json.dumps(metadata or {}), now, now), ) memory_id = cursor.lastrowid return {"id": memory_id, "kind": kind, "content": content, "importance": importance, "created_at": now} def recall(self, query: str, limit: int = 6) -> list[dict[str, Any]]: if not query.strip(): return self.top_memories(limit) with self._connect() as db: try: rows = db.execute( """ SELECT m.id, m.kind, m.content, m.importance, m.metadata, m.created_at, bm25(memories_fts) AS rank FROM memories_fts JOIN memories m ON m.id = memories_fts.rowid WHERE memories_fts MATCH ? ORDER BY rank, m.importance DESC LIMIT ? """, (self._fts_query(query), limit), ).fetchall() except sqlite3.OperationalError: rows = db.execute( """ SELECT id, kind, content, importance, metadata, created_at FROM memories WHERE content LIKE ? ORDER BY importance DESC, id DESC LIMIT ? """, (f"%{query}%", limit), ).fetchall() return [self._memory_row(row) for row in rows] def top_memories(self, limit: int = 6) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( """ SELECT id, kind, content, importance, metadata, created_at FROM memories ORDER BY importance DESC, updated_at DESC LIMIT ? """, (limit,), ).fetchall() return [self._memory_row(row) for row in rows] def inspect(self, limit: int = 50) -> dict[str, Any]: with self._connect() as db: memories = db.execute( """ SELECT id, kind, content, importance, metadata, created_at, updated_at FROM memories ORDER BY importance DESC, updated_at DESC LIMIT ? """, (limit,), ).fetchall() conversations = db.execute( """ SELECT id, thread_id, role, content, created_at FROM conversations ORDER BY id DESC LIMIT ? """, (limit,), ).fetchall() threads = db.execute( """ SELECT id, title, created_at, updated_at FROM chat_threads ORDER BY updated_at DESC LIMIT ? """, (limit,), ).fetchall() profile_rows = db.execute( "SELECT key, value, updated_at FROM user_profile ORDER BY key" ).fetchall() jobs = db.execute( "SELECT * FROM scheduled_jobs ORDER BY enabled DESC, next_run_at" ).fetchall() outbox = db.execute( "SELECT id, content, created_at, delivered_at FROM outbox ORDER BY id DESC LIMIT ?", (limit,), ).fetchall() profile = [] for row in profile_rows: item = dict(row) try: item["value"] = json.loads(item["value"]) except json.JSONDecodeError: pass profile.append(item) return { "path": str(self.path), "memories": [self._memory_row(row) for row in memories], "chat_threads": [dict(row) for row in threads], "conversations": [dict(row) for row in conversations], "profile": profile, "scheduled_jobs": [dict(row) for row in jobs], "outbox": [dict(row) for row in outbox], } def clear( self, include_memories: bool = True, include_conversations: bool = True, include_profile: bool = False, include_jobs: bool = False, include_outbox: bool = True, ) -> dict[str, int]: deleted: dict[str, int] = {} with self._connect() as db: if include_memories: deleted["memories"] = db.execute("DELETE FROM memories").rowcount db.execute("INSERT INTO memories_fts(memories_fts) VALUES('rebuild')") if include_conversations: deleted["conversations"] = db.execute("DELETE FROM conversations").rowcount if include_profile: deleted["profile"] = db.execute("DELETE FROM user_profile").rowcount if include_jobs: deleted["scheduled_jobs"] = db.execute("DELETE FROM scheduled_jobs").rowcount if include_outbox: deleted["outbox"] = db.execute("DELETE FROM outbox").rowcount return deleted def set_profile(self, key: str, value: Any) -> None: with self._connect() as db: db.execute( """ INSERT INTO user_profile(key, value, updated_at) VALUES (?, ?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at """, (key, json.dumps(value), iso_now()), ) def get_profile(self) -> dict[str, Any]: with self._connect() as db: rows = db.execute("SELECT key, value FROM user_profile").fetchall() profile = {} for row in rows: try: profile[row["key"]] = json.loads(row["value"]) except json.JSONDecodeError: profile[row["key"]] = row["value"] return profile def add_job( self, job_id: str, prompt: str, trigger_type: str, trigger_value: str, next_run_at: str | None = None, ) -> dict[str, Any]: with self._connect() as db: db.execute( """ INSERT INTO scheduled_jobs(id, prompt, trigger_type, trigger_value, next_run_at, created_at, enabled) VALUES (?, ?, ?, ?, ?, ?, 1) ON CONFLICT(id) DO UPDATE SET prompt=excluded.prompt, trigger_type=excluded.trigger_type, trigger_value=excluded.trigger_value, next_run_at=excluded.next_run_at, enabled=1 """, (job_id, prompt, trigger_type, trigger_value, next_run_at, iso_now()), ) return { "id": job_id, "prompt": prompt, "trigger_type": trigger_type, "trigger_value": trigger_value, "next_run_at": next_run_at, } def list_jobs(self) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( "SELECT * FROM scheduled_jobs WHERE enabled = 1 ORDER BY next_run_at IS NULL, next_run_at" ).fetchall() return [dict(row) for row in rows] def mark_job_run(self, job_id: str, next_run_at: str | None = None, enabled: bool = True) -> None: with self._connect() as db: db.execute( "UPDATE scheduled_jobs SET last_run_at = ?, next_run_at = ?, enabled = ? WHERE id = ?", (iso_now(), next_run_at, 1 if enabled else 0, job_id), ) def add_outbox(self, content: str) -> None: with self._connect() as db: db.execute("INSERT INTO outbox(content, created_at) VALUES (?, ?)", (content, iso_now())) def list_outbox(self, limit: int = 100) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( "SELECT id, content, created_at, delivered_at FROM outbox ORDER BY id DESC LIMIT ?", (limit,), ).fetchall() return [dict(row) for row in rows] def get_outbox(self, inbox_id: int) -> dict[str, Any] | None: with self._connect() as db: row = db.execute( "SELECT id, content, created_at, delivered_at FROM outbox WHERE id = ?", (inbox_id,), ).fetchone() return dict(row) if row else None def delete_outbox(self, inbox_id: int) -> bool: with self._connect() as db: cursor = db.execute("DELETE FROM outbox WHERE id = ?", (inbox_id,)) return cursor.rowcount > 0 def undelivered_outbox(self) -> list[dict[str, Any]]: now = iso_now() with self._connect() as db: rows = db.execute( "SELECT id, content, created_at FROM outbox WHERE delivered_at IS NULL ORDER BY id" ).fetchall() db.execute( "UPDATE outbox SET delivered_at = ? WHERE delivered_at IS NULL", (now,), ) return [dict(row) for row in rows] @staticmethod def _thread_title(content: str) -> str: text = " ".join(content.strip().split()) if not text: return "New chat" return text[:42] + ("..." if len(text) > 42 else "") @staticmethod def _clean_thread_title(title: str) -> str: text = " ".join(title.strip().strip('"').strip("'").split()) return text[:64] @staticmethod def _fts_query(query: str) -> str: tokens = [token.replace('"', "") for token in query.split() if token.strip()] return " OR ".join(f'"{token}"' for token in tokens) or '""' @staticmethod def _memory_row(row: sqlite3.Row) -> dict[str, Any]: data = dict(row) if "metadata" in data: try: data["metadata"] = json.loads(data["metadata"]) except json.JSONDecodeError: data["metadata"] = {} return data