from __future__ import annotations import json import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Any from zoneinfo import ZoneInfo DEFAULT_THREAD_ID = "default" def utc_now() -> datetime: return datetime.now(timezone.utc) def iso_now() -> str: return utc_now().isoformat() def iso_now_in_zone(zone: ZoneInfo) -> str: return utc_now().astimezone(zone).isoformat() def parse_iso(value: str) -> datetime: parsed = datetime.fromisoformat(value) if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed def unix_to_iso(value: Any) -> str | None: try: timestamp = int(value) except (TypeError, ValueError): return None if timestamp <= 0: return None return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() def time_since(value: str, now: datetime | None = None) -> str: then = parse_iso(value) current = now or utc_now() if current.tzinfo is None: current = current.replace(tzinfo=timezone.utc) seconds = max(0, int((current - then).total_seconds())) if seconds < 60: return f"{seconds} seconds ago" minutes = seconds // 60 if minutes < 60: return _plural(minutes, "minute") + " ago" hours = minutes // 60 if hours < 24: return _plural(hours, "hour") + " ago" days = hours // 24 return _plural(days, "day") + " ago" def _plural(value: int, unit: str) -> str: suffix = "" if value == 1 else "s" return f"{value} {unit}{suffix}" class MemoryStore: def __init__(self, path: str) -> None: self.path = Path(path).expanduser().resolve() self.path.parent.mkdir(parents=True, exist_ok=True) self._init_db() def _connect(self) -> sqlite3.Connection: connection = sqlite3.connect(self.path) connection.row_factory = sqlite3.Row return connection def _init_db(self) -> None: with self._connect() as db: db.executescript( """ CREATE TABLE IF NOT EXISTS chat_threads ( id TEXT PRIMARY KEY, title TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, thread_id TEXT, role TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, kind TEXT NOT NULL, content TEXT NOT NULL, importance INTEGER NOT NULL DEFAULT 3, metadata TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5( content, kind UNINDEXED, content='memories', content_rowid='id' ); CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN INSERT INTO memories_fts(rowid, content, kind) VALUES (new.id, new.content, new.kind); END; CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN INSERT INTO memories_fts(memories_fts, rowid, content, kind) VALUES('delete', old.id, old.content, old.kind); END; CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN INSERT INTO memories_fts(memories_fts, rowid, content, kind) VALUES('delete', old.id, old.content, old.kind); INSERT INTO memories_fts(rowid, content, kind) VALUES (new.id, new.content, new.kind); END; CREATE TABLE IF NOT EXISTS user_profile ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS scheduled_jobs ( id TEXT PRIMARY KEY, prompt TEXT NOT NULL, trigger_type TEXT NOT NULL, trigger_value TEXT NOT NULL, next_run_at TEXT, created_at TEXT NOT NULL, last_run_at TEXT, enabled INTEGER NOT NULL DEFAULT 1 ); CREATE TABLE IF NOT EXISTS outbox ( id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL, created_at TEXT NOT NULL, delivered_at TEXT ); CREATE TABLE IF NOT EXISTS negotiation_threads ( negotiation_hash TEXT PRIMARY KEY, uex_negotiation_id INTEGER, listing_id INTEGER, listing_slug TEXT, title TEXT, counterparty_username TEXT, status TEXT NOT NULL DEFAULT 'open', last_message_at TEXT, last_synced_at TEXT NOT NULL, last_notification_id INTEGER, last_notification_at TEXT, unread_count INTEGER NOT NULL DEFAULT 0, closed_at TEXT, metadata_json TEXT NOT NULL DEFAULT '{}' ); CREATE TABLE IF NOT EXISTS negotiation_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, negotiation_hash TEXT NOT NULL, uex_message_id INTEGER, author TEXT, author_username TEXT, is_me INTEGER NOT NULL DEFAULT 0, body TEXT NOT NULL, sent_at TEXT, source TEXT, raw_json TEXT NOT NULL DEFAULT '{}' ); CREATE TABLE IF NOT EXISTS negotiation_ratings ( negotiation_hash TEXT PRIMARY KEY, deal_closed INTEGER NOT NULL, deal_value REAL, currency TEXT, clarity_rating INTEGER, speed_rating INTEGER, respect_rating INTEGER, fairness_rating INTEGER, comment TEXT, submitted_at TEXT NOT NULL, raw_json TEXT NOT NULL DEFAULT '{}' ); CREATE TABLE IF NOT EXISTS negotiation_sync_state ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ); """ ) self._ensure_column(db, "conversations", "thread_id", "TEXT") now = iso_now() db.execute( """ INSERT INTO chat_threads(id, title, created_at, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(id) DO NOTHING """, (DEFAULT_THREAD_ID, "New chat", now, now), ) db.execute( "UPDATE conversations SET thread_id = ? WHERE thread_id IS NULL OR thread_id = ''", (DEFAULT_THREAD_ID,), ) @staticmethod def _ensure_column(db: sqlite3.Connection, table: str, column: str, definition: str) -> None: columns = {row["name"] for row in db.execute(f"PRAGMA table_info({table})").fetchall()} if column not in columns: db.execute(f"ALTER TABLE {table} ADD COLUMN {column} {definition}") def ensure_thread(self, thread_id: str | None = None, title: str | None = None) -> dict[str, Any]: now = iso_now() resolved_id = (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID resolved_title = (title or "New chat").strip() or "New chat" with self._connect() as db: db.execute( """ INSERT INTO chat_threads(id, title, created_at, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET updated_at=excluded.updated_at """, (resolved_id, resolved_title, now, now), ) row = db.execute( "SELECT id, title, created_at, updated_at FROM chat_threads WHERE id = ?", (resolved_id,), ).fetchone() return dict(row) def create_thread(self, title: str | None = None) -> dict[str, Any]: import uuid thread_id = f"chat-{uuid.uuid4()}" return self.ensure_thread(thread_id, title or "New chat") def list_threads(self) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( """ SELECT t.id, t.title, t.created_at, t.updated_at, COUNT(c.id) AS message_count, MAX(c.created_at) AS last_message_at FROM chat_threads t LEFT JOIN conversations c ON c.thread_id = t.id GROUP BY t.id ORDER BY COALESCE(MAX(c.created_at), t.updated_at) DESC """ ).fetchall() return [dict(row) for row in rows] def delete_thread(self, thread_id: str) -> bool: with self._connect() as db: db.execute("DELETE FROM conversations WHERE thread_id = ?", (thread_id,)) cursor = db.execute("DELETE FROM chat_threads WHERE id = ?", (thread_id,)) return cursor.rowcount > 0 def rename_thread(self, thread_id: str, title: str) -> dict[str, Any] | None: clean_title = self._clean_thread_title(title) if not clean_title: return None now = iso_now() with self._connect() as db: db.execute( "UPDATE chat_threads SET title = ?, updated_at = ? WHERE id = ?", (clean_title, now, thread_id), ) row = db.execute( "SELECT id, title, created_at, updated_at FROM chat_threads WHERE id = ?", (thread_id,), ).fetchone() return dict(row) if row else None def get_thread(self, thread_id: str) -> dict[str, Any] | None: with self._connect() as db: row = db.execute( "SELECT id, title, created_at, updated_at FROM chat_threads WHERE id = ?", (thread_id,), ).fetchone() return dict(row) if row else None def add_conversation(self, role: str, content: str, thread_id: str | None = DEFAULT_THREAD_ID) -> None: resolved_thread_id = (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID self.ensure_thread(resolved_thread_id) now = iso_now() with self._connect() as db: db.execute( "INSERT INTO conversations(thread_id, role, content, created_at) VALUES (?, ?, ?, ?)", (resolved_thread_id, role, content, now), ) db.execute( "UPDATE chat_threads SET updated_at = ? WHERE id = ?", (now, resolved_thread_id), ) def last_interaction(self, thread_id: str | None = None) -> dict[str, Any] | None: with self._connect() as db: if thread_id: row = db.execute( """ SELECT thread_id, role, content, created_at FROM conversations WHERE thread_id = ? ORDER BY id DESC LIMIT 1 """, (thread_id,), ).fetchone() else: row = db.execute( "SELECT thread_id, role, content, created_at FROM conversations ORDER BY id DESC LIMIT 1" ).fetchone() return dict(row) if row else None def recent_conversation(self, limit: int = 8, thread_id: str | None = None) -> list[dict[str, Any]]: with self._connect() as db: if thread_id: rows = db.execute( """ SELECT id, thread_id, role, content, created_at FROM conversations WHERE thread_id = ? ORDER BY id DESC LIMIT ? """, (thread_id, limit), ).fetchall() else: rows = db.execute( "SELECT id, thread_id, role, content, created_at FROM conversations ORDER BY id DESC LIMIT ?", (limit,), ).fetchall() return [dict(row) for row in reversed(rows)] def remember(self, kind: str, content: str, importance: int = 3, metadata: dict[str, Any] | None = None) -> dict[str, Any]: now = iso_now() with self._connect() as db: cursor = db.execute( """ INSERT INTO memories(kind, content, importance, metadata, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, (kind, content, importance, json.dumps(metadata or {}), now, now), ) memory_id = cursor.lastrowid return {"id": memory_id, "kind": kind, "content": content, "importance": importance, "created_at": now} def recall(self, query: str, limit: int = 6) -> list[dict[str, Any]]: if not query.strip(): return self.top_memories(limit) with self._connect() as db: try: rows = db.execute( """ SELECT m.id, m.kind, m.content, m.importance, m.metadata, m.created_at, bm25(memories_fts) AS rank FROM memories_fts JOIN memories m ON m.id = memories_fts.rowid WHERE memories_fts MATCH ? ORDER BY rank, m.importance DESC LIMIT ? """, (self._fts_query(query), limit), ).fetchall() except sqlite3.OperationalError: rows = db.execute( """ SELECT id, kind, content, importance, metadata, created_at FROM memories WHERE content LIKE ? ORDER BY importance DESC, id DESC LIMIT ? """, (f"%{query}%", limit), ).fetchall() return [self._memory_row(row) for row in rows] def top_memories(self, limit: int = 6) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( """ SELECT id, kind, content, importance, metadata, created_at FROM memories ORDER BY importance DESC, updated_at DESC LIMIT ? """, (limit,), ).fetchall() return [self._memory_row(row) for row in rows] def inspect(self, limit: int = 50) -> dict[str, Any]: with self._connect() as db: memories = db.execute( """ SELECT id, kind, content, importance, metadata, created_at, updated_at FROM memories ORDER BY importance DESC, updated_at DESC LIMIT ? """, (limit,), ).fetchall() conversations = db.execute( """ SELECT id, thread_id, role, content, created_at FROM conversations ORDER BY id DESC LIMIT ? """, (limit,), ).fetchall() threads = db.execute( """ SELECT id, title, created_at, updated_at FROM chat_threads ORDER BY updated_at DESC LIMIT ? """, (limit,), ).fetchall() profile_rows = db.execute( "SELECT key, value, updated_at FROM user_profile ORDER BY key" ).fetchall() jobs = db.execute( "SELECT * FROM scheduled_jobs ORDER BY enabled DESC, next_run_at" ).fetchall() outbox = db.execute( "SELECT id, content, created_at, delivered_at FROM outbox ORDER BY id DESC LIMIT ?", (limit,), ).fetchall() negotiation_threads = db.execute( """ SELECT negotiation_hash, title, counterparty_username, status, unread_count, last_message_at, last_synced_at FROM negotiation_threads ORDER BY COALESCE(last_message_at, last_synced_at) DESC LIMIT ? """, (limit,), ).fetchall() negotiation_messages = db.execute( """ SELECT negotiation_hash, author_username, is_me, body, sent_at FROM negotiation_messages ORDER BY COALESCE(sent_at, '') DESC, id DESC LIMIT ? """, (limit,), ).fetchall() profile = [] for row in profile_rows: item = dict(row) try: item["value"] = json.loads(item["value"]) except json.JSONDecodeError: pass profile.append(item) return { "path": str(self.path), "memories": [self._memory_row(row) for row in memories], "chat_threads": [dict(row) for row in threads], "conversations": [dict(row) for row in conversations], "profile": profile, "scheduled_jobs": [dict(row) for row in jobs], "outbox": [dict(row) for row in outbox], "negotiation_threads": [dict(row) for row in negotiation_threads], "negotiation_messages": [dict(row) for row in negotiation_messages], } def clear( self, include_memories: bool = True, include_conversations: bool = True, include_profile: bool = False, include_jobs: bool = False, include_outbox: bool = True, ) -> dict[str, int]: deleted: dict[str, int] = {} with self._connect() as db: if include_memories: deleted["memories"] = db.execute("DELETE FROM memories").rowcount db.execute("INSERT INTO memories_fts(memories_fts) VALUES('rebuild')") if include_conversations: deleted["conversations"] = db.execute("DELETE FROM conversations").rowcount if include_profile: deleted["profile"] = db.execute("DELETE FROM user_profile").rowcount if include_jobs: deleted["scheduled_jobs"] = db.execute("DELETE FROM scheduled_jobs").rowcount if include_outbox: deleted["outbox"] = db.execute("DELETE FROM outbox").rowcount deleted["negotiation_threads"] = db.execute("DELETE FROM negotiation_threads").rowcount deleted["negotiation_messages"] = db.execute("DELETE FROM negotiation_messages").rowcount deleted["negotiation_ratings"] = db.execute("DELETE FROM negotiation_ratings").rowcount deleted["negotiation_sync_state"] = db.execute("DELETE FROM negotiation_sync_state").rowcount return deleted def set_profile(self, key: str, value: Any) -> None: with self._connect() as db: db.execute( """ INSERT INTO user_profile(key, value, updated_at) VALUES (?, ?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at """, (key, json.dumps(value), iso_now()), ) def get_profile(self) -> dict[str, Any]: with self._connect() as db: rows = db.execute("SELECT key, value FROM user_profile").fetchall() profile = {} for row in rows: try: profile[row["key"]] = json.loads(row["value"]) except json.JSONDecodeError: profile[row["key"]] = row["value"] return profile def add_job( self, job_id: str, prompt: str, trigger_type: str, trigger_value: str, next_run_at: str | None = None, ) -> dict[str, Any]: with self._connect() as db: db.execute( """ INSERT INTO scheduled_jobs(id, prompt, trigger_type, trigger_value, next_run_at, created_at, enabled) VALUES (?, ?, ?, ?, ?, ?, 1) ON CONFLICT(id) DO UPDATE SET prompt=excluded.prompt, trigger_type=excluded.trigger_type, trigger_value=excluded.trigger_value, next_run_at=excluded.next_run_at, enabled=1 """, (job_id, prompt, trigger_type, trigger_value, next_run_at, iso_now()), ) return { "id": job_id, "prompt": prompt, "trigger_type": trigger_type, "trigger_value": trigger_value, "next_run_at": next_run_at, } def list_jobs(self) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( "SELECT * FROM scheduled_jobs WHERE enabled = 1 ORDER BY next_run_at IS NULL, next_run_at" ).fetchall() return [dict(row) for row in rows] def mark_job_run(self, job_id: str, next_run_at: str | None = None, enabled: bool = True) -> None: with self._connect() as db: db.execute( "UPDATE scheduled_jobs SET last_run_at = ?, next_run_at = ?, enabled = ? WHERE id = ?", (iso_now(), next_run_at, 1 if enabled else 0, job_id), ) def add_outbox(self, content: str) -> None: with self._connect() as db: db.execute("INSERT INTO outbox(content, created_at) VALUES (?, ?)", (content, iso_now())) def list_outbox(self, limit: int = 100) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( "SELECT id, content, created_at, delivered_at FROM outbox ORDER BY id DESC LIMIT ?", (limit,), ).fetchall() return [dict(row) for row in rows] def get_outbox(self, inbox_id: int) -> dict[str, Any] | None: with self._connect() as db: row = db.execute( "SELECT id, content, created_at, delivered_at FROM outbox WHERE id = ?", (inbox_id,), ).fetchone() return dict(row) if row else None def delete_outbox(self, inbox_id: int) -> bool: with self._connect() as db: cursor = db.execute("DELETE FROM outbox WHERE id = ?", (inbox_id,)) return cursor.rowcount > 0 def undelivered_outbox(self) -> list[dict[str, Any]]: now = iso_now() with self._connect() as db: rows = db.execute( "SELECT id, content, created_at FROM outbox WHERE delivered_at IS NULL ORDER BY id" ).fetchall() db.execute( "UPDATE outbox SET delivered_at = ? WHERE delivered_at IS NULL", (now,), ) return [dict(row) for row in rows] @staticmethod def _thread_title(content: str) -> str: text = " ".join(content.strip().split()) if not text: return "New chat" return text[:42] + ("..." if len(text) > 42 else "") @staticmethod def _clean_thread_title(title: str) -> str: text = " ".join(title.strip().strip('"').strip("'").split()) return text[:64] @staticmethod def _fts_query(query: str) -> str: tokens = [token.replace('"', "") for token in query.split() if token.strip()] return " OR ".join(f'"{token}"' for token in tokens) or '""' @staticmethod def _memory_row(row: sqlite3.Row) -> dict[str, Any]: data = dict(row) if "metadata" in data: try: data["metadata"] = json.loads(data["metadata"]) except json.JSONDecodeError: data["metadata"] = {} return data def set_negotiation_sync_state(self, key: str, value: Any) -> None: with self._connect() as db: db.execute( """ INSERT INTO negotiation_sync_state(key, value, updated_at) VALUES (?, ?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at """, (key, json.dumps(value), iso_now()), ) def get_negotiation_sync_state(self, key: str, default: Any = None) -> Any: with self._connect() as db: row = db.execute( "SELECT value FROM negotiation_sync_state WHERE key = ?", (key,), ).fetchone() if not row: return default try: return json.loads(row["value"]) except json.JSONDecodeError: return default def upsert_negotiation( self, negotiation_hash: str, *, uex_negotiation_id: int | None = None, listing_id: int | None = None, listing_slug: str | None = None, title: str | None = None, counterparty_username: str | None = None, status: str = "open", last_message_at: str | None = None, last_synced_at: str | None = None, last_notification_id: int | None = None, last_notification_at: str | None = None, unread_count: int | None = None, closed_at: str | None = None, metadata: dict[str, Any] | None = None, ) -> None: if not negotiation_hash.strip(): return now = last_synced_at or iso_now() with self._connect() as db: existing = db.execute( """ SELECT unread_count, metadata_json FROM negotiation_threads WHERE negotiation_hash = ? """, (negotiation_hash,), ).fetchone() current_unread = int(existing["unread_count"]) if existing else 0 merged_metadata = {} if existing: try: merged_metadata = json.loads(existing["metadata_json"]) except json.JSONDecodeError: merged_metadata = {} if metadata: merged_metadata.update(metadata) db.execute( """ INSERT INTO negotiation_threads( negotiation_hash, uex_negotiation_id, listing_id, listing_slug, title, counterparty_username, status, last_message_at, last_synced_at, last_notification_id, last_notification_at, unread_count, closed_at, metadata_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(negotiation_hash) DO UPDATE SET uex_negotiation_id = COALESCE(excluded.uex_negotiation_id, negotiation_threads.uex_negotiation_id), listing_id = COALESCE(excluded.listing_id, negotiation_threads.listing_id), listing_slug = COALESCE(excluded.listing_slug, negotiation_threads.listing_slug), title = COALESCE(excluded.title, negotiation_threads.title), counterparty_username = COALESCE(excluded.counterparty_username, negotiation_threads.counterparty_username), status = COALESCE(excluded.status, negotiation_threads.status), last_message_at = COALESCE(excluded.last_message_at, negotiation_threads.last_message_at), last_synced_at = excluded.last_synced_at, last_notification_id = COALESCE(excluded.last_notification_id, negotiation_threads.last_notification_id), last_notification_at = COALESCE(excluded.last_notification_at, negotiation_threads.last_notification_at), unread_count = COALESCE(excluded.unread_count, negotiation_threads.unread_count), closed_at = COALESCE(excluded.closed_at, negotiation_threads.closed_at), metadata_json = excluded.metadata_json """, ( negotiation_hash.strip(), uex_negotiation_id, listing_id, listing_slug, title, counterparty_username, status or "open", last_message_at, now, last_notification_id, last_notification_at, current_unread if unread_count is None else max(0, int(unread_count)), closed_at, json.dumps(merged_metadata), ), ) def replace_negotiation_messages( self, negotiation_hash: str, messages: list[dict[str, Any]], *, mark_read: bool = False, ) -> None: if not negotiation_hash.strip(): return normalized = [self._normalize_negotiation_message(negotiation_hash, item) for item in messages] normalized = [item for item in normalized if item] with self._connect() as db: db.execute("DELETE FROM negotiation_messages WHERE negotiation_hash = ?", (negotiation_hash,)) for item in normalized: db.execute( """ INSERT INTO negotiation_messages( negotiation_hash, uex_message_id, author, author_username, is_me, body, sent_at, source, raw_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( negotiation_hash, item["uex_message_id"], item["author"], item["author_username"], 1 if item["is_me"] else 0, item["body"], item["sent_at"], item["source"], json.dumps(item["raw_json"]), ), ) last_message_at = normalized[-1]["sent_at"] if normalized else None db.execute( """ UPDATE negotiation_threads SET last_message_at = COALESCE(?, last_message_at), last_synced_at = ?, unread_count = CASE WHEN ? THEN 0 ELSE unread_count END WHERE negotiation_hash = ? """, (last_message_at, iso_now(), 1 if mark_read else 0, negotiation_hash), ) def mark_negotiation_notified( self, negotiation_hash: str, *, notification_id: int | None = None, notification_at: str | None = None, ) -> None: with self._connect() as db: db.execute( """ UPDATE negotiation_threads SET unread_count = unread_count + 1, last_notification_id = COALESCE(?, last_notification_id), last_notification_at = COALESCE(?, last_notification_at) WHERE negotiation_hash = ? """, (notification_id, notification_at, negotiation_hash), ) def mark_negotiation_read(self, negotiation_hash: str) -> None: with self._connect() as db: db.execute( "UPDATE negotiation_threads SET unread_count = 0 WHERE negotiation_hash = ?", (negotiation_hash,), ) def store_negotiation_rating(self, negotiation_hash: str, payload: dict[str, Any], raw_json: dict[str, Any] | None = None) -> None: now = iso_now() with self._connect() as db: db.execute( """ INSERT INTO negotiation_ratings( negotiation_hash, deal_closed, deal_value, currency, clarity_rating, speed_rating, respect_rating, fairness_rating, comment, submitted_at, raw_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(negotiation_hash) DO UPDATE SET deal_closed = excluded.deal_closed, deal_value = excluded.deal_value, currency = excluded.currency, clarity_rating = excluded.clarity_rating, speed_rating = excluded.speed_rating, respect_rating = excluded.respect_rating, fairness_rating = excluded.fairness_rating, comment = excluded.comment, submitted_at = excluded.submitted_at, raw_json = excluded.raw_json """, ( negotiation_hash, 1 if payload.get("deal_closed") else 0, payload.get("deal_value"), payload.get("currency"), payload.get("clarity_rating"), payload.get("speed_rating"), payload.get("respect_rating"), payload.get("fairness_rating"), payload.get("comment"), now, json.dumps(raw_json or payload), ), ) def list_negotiations( self, *, status: str = "all", unread_only: bool = False, search: str = "", limit: int = 50, ) -> list[dict[str, Any]]: status_filter = str(status or "all").strip().casefold() search_filter = f"%{search.strip().casefold()}%" if search.strip() else None clauses = [] params: list[Any] = [] if status_filter not in {"", "all"}: clauses.append("status = ?") params.append(status_filter) if unread_only: clauses.append("unread_count > 0") if search_filter: clauses.append( """ ( lower(COALESCE(title, '')) LIKE ? OR lower(COALESCE(counterparty_username, '')) LIKE ? OR lower(COALESCE(listing_slug, '')) LIKE ? ) """ ) params.extend([search_filter, search_filter, search_filter]) where = f"WHERE {' AND '.join(clauses)}" if clauses else "" with self._connect() as db: rows = db.execute( f""" SELECT negotiation_hash, uex_negotiation_id, listing_id, listing_slug, title, counterparty_username, status, last_message_at, last_synced_at, last_notification_id, last_notification_at, unread_count, closed_at, metadata_json FROM negotiation_threads {where} ORDER BY unread_count DESC, COALESCE(last_message_at, last_notification_at, last_synced_at) DESC LIMIT ? """, (*params, max(1, min(limit, 500))), ).fetchall() return [self._negotiation_thread_row(row) for row in rows] def get_negotiation(self, negotiation_hash: str) -> dict[str, Any] | None: with self._connect() as db: thread = db.execute( """ SELECT negotiation_hash, uex_negotiation_id, listing_id, listing_slug, title, counterparty_username, status, last_message_at, last_synced_at, last_notification_id, last_notification_at, unread_count, closed_at, metadata_json FROM negotiation_threads WHERE negotiation_hash = ? """, (negotiation_hash,), ).fetchone() if not thread: return None messages = db.execute( """ SELECT uex_message_id, author, author_username, is_me, body, sent_at, source, raw_json FROM negotiation_messages WHERE negotiation_hash = ? ORDER BY COALESCE(sent_at, '') ASC, id ASC """, (negotiation_hash,), ).fetchall() rating = db.execute( """ SELECT deal_closed, deal_value, currency, clarity_rating, speed_rating, respect_rating, fairness_rating, comment, submitted_at, raw_json FROM negotiation_ratings WHERE negotiation_hash = ? """, (negotiation_hash,), ).fetchone() result = self._negotiation_thread_row(thread) result["messages"] = [self._negotiation_message_row(row) for row in messages] result["rating"] = self._negotiation_rating_row(rating) if rating else None return result def search_negotiation_messages(self, query: str, limit: int = 8) -> list[dict[str, Any]]: q = query.strip() if not q: return [] with self._connect() as db: rows = db.execute( """ SELECT m.negotiation_hash, t.title, t.counterparty_username, m.author_username, m.is_me, m.body, m.sent_at FROM negotiation_messages m JOIN negotiation_threads t ON t.negotiation_hash = m.negotiation_hash WHERE lower(m.body) LIKE ? ORDER BY COALESCE(m.sent_at, '') DESC, m.id DESC LIMIT ? """, (f"%{q.casefold()}%", max(1, min(limit, 50))), ).fetchall() return [dict(row) for row in rows] @staticmethod def _normalize_negotiation_message(negotiation_hash: str, item: dict[str, Any]) -> dict[str, Any] | None: if not isinstance(item, dict): return None body = str(item.get("body") or item.get("message") or item.get("content") or item.get("text") or item.get("event") or "").strip() if not body: return None return { "negotiation_hash": negotiation_hash, "uex_message_id": MemoryStore._int_or_none(item.get("id") or item.get("id_message")), "author": str(item.get("user_name") or item.get("author") or item.get("sender") or item.get("user_username") or "UEX"), "author_username": str(item.get("user_username") or item.get("author_username") or item.get("username") or "").strip() or None, "is_me": bool(item.get("is_me")), "body": body, "sent_at": unix_to_iso(item.get("date_added")) or str(item.get("sent_at") or "").strip() or None, "source": str(item.get("api_name") or item.get("source") or "uex"), "raw_json": item, } @staticmethod def _negotiation_thread_row(row: sqlite3.Row | dict[str, Any]) -> dict[str, Any]: data = dict(row) try: data["metadata"] = json.loads(data.pop("metadata_json")) except (KeyError, json.JSONDecodeError): data["metadata"] = {} data["hash"] = data.pop("negotiation_hash") return data @staticmethod def _negotiation_message_row(row: sqlite3.Row | dict[str, Any]) -> dict[str, Any]: data = dict(row) try: data["raw_json"] = json.loads(data["raw_json"]) except (KeyError, json.JSONDecodeError): data["raw_json"] = {} data["is_me"] = bool(data.get("is_me")) return data @staticmethod def _negotiation_rating_row(row: sqlite3.Row | dict[str, Any]) -> dict[str, Any]: data = dict(row) try: data["raw_json"] = json.loads(data["raw_json"]) except (KeyError, json.JSONDecodeError): data["raw_json"] = {} data["deal_closed"] = bool(data.get("deal_closed")) return data @staticmethod def _int_or_none(value: Any) -> int | None: try: return int(value) except (TypeError, ValueError): return None