from __future__ import annotations import json import uuid from typing import Any from traderai.memory import MemoryStore, iso_now DEFAULT_PLAN_CADENCE = "0 */6 * * *" class ContinualPlanStore: def __init__(self, memory: MemoryStore) -> None: self.memory = memory self._init_db() def _init_db(self) -> None: with self.memory._connect() as db: db.executescript( """ CREATE TABLE IF NOT EXISTS continual_plans ( id TEXT PRIMARY KEY, title TEXT NOT NULL, kind TEXT NOT NULL, status TEXT NOT NULL, objective TEXT NOT NULL, constraints TEXT NOT NULL DEFAULT '{}', cadence TEXT NOT NULL, next_run_at TEXT, last_run_at TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS continual_plan_items ( id INTEGER PRIMARY KEY AUTOINCREMENT, plan_id TEXT NOT NULL, item_name TEXT NOT NULL, desired_quantity INTEGER NOT NULL DEFAULT 1, max_unit_price REAL, status TEXT NOT NULL DEFAULT 'active', acquired_quantity INTEGER NOT NULL DEFAULT 0, metadata TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS continual_plan_candidates ( id INTEGER PRIMARY KEY AUTOINCREMENT, plan_id TEXT NOT NULL, plan_item_id INTEGER NOT NULL, listing_id TEXT, listing_slug TEXT, title TEXT, seller TEXT, price REAL, currency TEXT, stock INTEGER, location TEXT, score REAL, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'current', metadata TEXT NOT NULL DEFAULT '{}', UNIQUE(plan_item_id, listing_id) ); CREATE TABLE IF NOT EXISTS continual_plan_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, plan_id TEXT NOT NULL, kind TEXT NOT NULL, message TEXT NOT NULL, metadata TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS continual_plan_negotiations ( id INTEGER PRIMARY KEY AUTOINCREMENT, plan_id TEXT NOT NULL, plan_item_id INTEGER, candidate_id INTEGER, listing_id TEXT, listing_slug TEXT, negotiation_id TEXT, negotiation_hash TEXT, status TEXT NOT NULL DEFAULT 'drafted', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); """ ) def create_plan( self, title: str, kind: str = "buying", objective: str = "", items: list[dict[str, Any]] | None = None, constraints: dict[str, Any] | None = None, cadence: str | None = None, status: str | None = None, ) -> dict[str, Any]: clean_items = [item for item in (items or []) if str(item.get("item_name") or item.get("name") or "").strip()] plan_id = f"plan-{uuid.uuid4()}" now = iso_now() clean_kind = (kind.strip() or "buying").casefold() resolved_status = status or ("needs_input" if clean_kind == "buying" and not clean_items else "active") with self.memory._connect() as db: db.execute( """ INSERT INTO continual_plans(id, title, kind, status, objective, constraints, cadence, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( plan_id, title.strip() or "Continual plan", clean_kind, resolved_status, objective.strip() or title.strip(), json.dumps(constraints or {}), (cadence or DEFAULT_PLAN_CADENCE).strip() or DEFAULT_PLAN_CADENCE, now, now, ), ) for item in clean_items: db.execute( """ INSERT INTO continual_plan_items( plan_id, item_name, desired_quantity, max_unit_price, status, acquired_quantity, metadata, created_at, updated_at ) VALUES (?, ?, ?, ?, 'active', ?, ?, ?, ?) """, ( plan_id, str(item.get("item_name") or item.get("name")).strip(), max(1, int(item.get("desired_quantity") or item.get("quantity") or 1)), item.get("max_unit_price"), max(0, int(item.get("acquired_quantity") or 0)), json.dumps(item.get("metadata") or {}), now, now, ), ) if clean_kind == "buying" and not clean_items: self.add_event(plan_id, "needs_input", "Created plan, but no item checklist was provided. Add the required parts before it can run.") elif clean_items: self.add_event(plan_id, "created", f"Created continual {clean_kind} plan with {len(clean_items)} checklist item(s).") else: self.add_event(plan_id, "created", f"Created continual {clean_kind} plan.") return self.get_plan(plan_id) or {} def list_plans(self, include_inactive: bool = True) -> list[dict[str, Any]]: where = "" if include_inactive else "WHERE status = 'active'" with self.memory._connect() as db: rows = db.execute( f""" SELECT * FROM continual_plans {where} ORDER BY CASE status WHEN 'active' THEN 0 WHEN 'needs_input' THEN 1 WHEN 'paused' THEN 2 ELSE 3 END, updated_at DESC """ ).fetchall() return [self._plan_row(row) for row in rows] def get_plan(self, plan_id: str) -> dict[str, Any] | None: with self.memory._connect() as db: plan = db.execute("SELECT * FROM continual_plans WHERE id = ?", (plan_id,)).fetchone() if not plan: return None data = self._plan_row(plan) data["items"] = self.list_items(plan_id) data["candidates"] = self.list_candidates(plan_id) data["negotiations"] = self.list_negotiations(plan_id) data["events"] = self.list_events(plan_id) return data def list_items(self, plan_id: str) -> list[dict[str, Any]]: with self.memory._connect() as db: rows = db.execute( "SELECT * FROM continual_plan_items WHERE plan_id = ? ORDER BY id", (plan_id,), ).fetchall() return [self._json_row(row, "metadata") for row in rows] def list_candidates(self, plan_id: str, limit: int = 100) -> list[dict[str, Any]]: with self.memory._connect() as db: rows = db.execute( """ SELECT * FROM continual_plan_candidates WHERE plan_id = ? ORDER BY status = 'current' DESC, score DESC, last_seen_at DESC LIMIT ? """, (plan_id, limit), ).fetchall() return [self._json_row(row, "metadata") for row in rows] def list_events(self, plan_id: str, limit: int = 50) -> list[dict[str, Any]]: with self.memory._connect() as db: rows = db.execute( """ SELECT * FROM continual_plan_events WHERE plan_id = ? ORDER BY id DESC LIMIT ? """, (plan_id, limit), ).fetchall() return [self._json_row(row, "metadata") for row in rows] def list_negotiations(self, plan_id: str) -> list[dict[str, Any]]: with self.memory._connect() as db: rows = db.execute( "SELECT * FROM continual_plan_negotiations WHERE plan_id = ? ORDER BY updated_at DESC", (plan_id,), ).fetchall() return [dict(row) for row in rows] def set_status(self, plan_id: str, status: str) -> dict[str, Any] | None: with self.memory._connect() as db: db.execute( "UPDATE continual_plans SET status = ?, updated_at = ? WHERE id = ?", (status, iso_now(), plan_id), ) self.add_event(plan_id, status, f"Plan status changed to {status}.") return self.get_plan(plan_id) def delete_plan(self, plan_id: str) -> bool: with self.memory._connect() as db: deleted = db.execute("DELETE FROM continual_plans WHERE id = ?", (plan_id,)).rowcount if not deleted: return False db.execute("DELETE FROM continual_plan_items WHERE plan_id = ?", (plan_id,)) db.execute("DELETE FROM continual_plan_candidates WHERE plan_id = ?", (plan_id,)) db.execute("DELETE FROM continual_plan_events WHERE plan_id = ?", (plan_id,)) db.execute("DELETE FROM continual_plan_negotiations WHERE plan_id = ?", (plan_id,)) return True def add_event(self, plan_id: str, kind: str, message: str, metadata: dict[str, Any] | None = None) -> dict[str, Any]: now = iso_now() with self.memory._connect() as db: cursor = db.execute( """ INSERT INTO continual_plan_events(plan_id, kind, message, metadata, created_at) VALUES (?, ?, ?, ?, ?) """, (plan_id, kind, message, json.dumps(metadata or {}), now), ) return {"id": cursor.lastrowid, "plan_id": plan_id, "kind": kind, "message": message, "created_at": now} def update_schedule(self, plan_id: str, next_run_at: str | None = None, last_run_at: str | None = None) -> None: fields = ["next_run_at = ?", "updated_at = ?"] values: list[Any] = [next_run_at, iso_now()] if last_run_at is not None: fields.insert(1, "last_run_at = ?") values.insert(1, last_run_at) values.append(plan_id) with self.memory._connect() as db: db.execute(f"UPDATE continual_plans SET {', '.join(fields)} WHERE id = ?", values) def upsert_candidate(self, plan_id: str, plan_item_id: int, listing: dict[str, Any], score: float) -> dict[str, Any]: now = iso_now() listing_id = str(listing.get("id") or listing.get("listing_id") or listing.get("slug") or uuid.uuid4()) metadata = dict(listing) with self.memory._connect() as db: db.execute( """ INSERT INTO continual_plan_candidates( plan_id, plan_item_id, listing_id, listing_slug, title, seller, price, currency, stock, location, score, first_seen_at, last_seen_at, status, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'current', ?) ON CONFLICT(plan_item_id, listing_id) DO UPDATE SET listing_slug=excluded.listing_slug, title=excluded.title, seller=excluded.seller, price=excluded.price, currency=excluded.currency, stock=excluded.stock, location=excluded.location, score=excluded.score, last_seen_at=excluded.last_seen_at, status='current', metadata=excluded.metadata """, ( plan_id, plan_item_id, listing_id, listing.get("slug"), listing.get("title"), listing.get("advertiser") or listing.get("user_username") or listing.get("seller"), listing.get("price"), listing.get("currency"), listing.get("in_stock") or listing.get("stock"), listing.get("location"), score, now, now, json.dumps(metadata), ), ) row = db.execute( "SELECT * FROM continual_plan_candidates WHERE plan_item_id = ? AND listing_id = ?", (plan_item_id, listing_id), ).fetchone() return self._json_row(row, "metadata") def mark_stale_candidates(self, plan_item_id: int, seen_listing_ids: set[str]) -> int: with self.memory._connect() as db: rows = db.execute( "SELECT id, listing_id FROM continual_plan_candidates WHERE plan_item_id = ? AND status = 'current'", (plan_item_id,), ).fetchall() stale_ids = [row["id"] for row in rows if str(row["listing_id"]) not in seen_listing_ids] if stale_ids: placeholders = ",".join("?" for _ in stale_ids) db.execute( f"UPDATE continual_plan_candidates SET status = 'stale', last_seen_at = ? WHERE id IN ({placeholders})", (iso_now(), *stale_ids), ) return len(stale_ids) def mark_candidate_drafted(self, candidate_id: int) -> None: with self.memory._connect() as db: db.execute("UPDATE continual_plan_candidates SET status = 'drafted', last_seen_at = ? WHERE id = ?", (iso_now(), candidate_id)) def add_negotiation(self, plan_id: str, plan_item_id: int | None, candidate_id: int | None, metadata: dict[str, Any]) -> dict[str, Any]: now = iso_now() with self.memory._connect() as db: cursor = db.execute( """ INSERT INTO continual_plan_negotiations( plan_id, plan_item_id, candidate_id, listing_id, listing_slug, negotiation_id, negotiation_hash, status, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( plan_id, plan_item_id, candidate_id, metadata.get("listing_id"), metadata.get("listing_slug"), metadata.get("id_negotiation"), metadata.get("hash"), metadata.get("status") or "drafted", now, now, ), ) row = db.execute("SELECT * FROM continual_plan_negotiations WHERE id = ?", (cursor.lastrowid,)).fetchone() return dict(row) def has_negotiation_for_candidate(self, plan_id: str, plan_item_id: int, candidate: dict[str, Any]) -> bool: with self.memory._connect() as db: row = db.execute( """ SELECT id FROM continual_plan_negotiations WHERE plan_id = ? AND plan_item_id = ? AND ( candidate_id = ? OR (listing_id IS NOT NULL AND listing_id = ?) OR (listing_slug IS NOT NULL AND listing_slug = ?) ) LIMIT 1 """, ( plan_id, plan_item_id, candidate.get("id"), candidate.get("listing_id"), candidate.get("listing_slug"), ), ).fetchone() return row is not None @staticmethod def _json_row(row: Any, *json_fields: str) -> dict[str, Any]: data = dict(row) for field in json_fields: try: data[field] = json.loads(data.get(field) or "{}") except (TypeError, json.JSONDecodeError): data[field] = {} return data @classmethod def _plan_row(cls, row: Any) -> dict[str, Any]: return cls._json_row(row, "constraints") class ContinualPlanRunner: def __init__(self, store: ContinualPlanStore, tools: Any, memory: MemoryStore, agent: Any | None = None) -> None: self.store = store self.tools = tools self.memory = memory self.agent = agent def bind_agent(self, agent: Any) -> None: self.agent = agent async def run_plan(self, plan_id: str) -> dict[str, Any]: plan = self.store.get_plan(plan_id) if not plan: return {"error": f"Plan not found: {plan_id}"} if plan["status"] != "active": message = f"Skipped {plan['title']} because status is {plan['status']}." self.store.add_event(plan_id, "skipped", message) return {"status": "skipped", "summary": message, "plan": self.store.get_plan(plan_id)} try: if plan["kind"] == "buying": result = await self._run_buying_plan(plan) else: result = await self._run_agent_plan(plan) self.store.update_schedule(plan_id, plan.get("next_run_at"), last_run_at=iso_now()) self.memory.add_outbox(result["summary"]) return {**result, "plan": self.store.get_plan(plan_id)} except Exception as exc: message = f"Continual plan failed: {exc}" self.store.add_event(plan_id, "error", message) self.memory.add_outbox(f"{plan['title']}: {message}") self.store.update_schedule(plan_id, plan.get("next_run_at"), last_run_at=iso_now()) return {"error": str(exc), "summary": message, "plan": self.store.get_plan(plan_id)} async def _run_agent_plan(self, plan: dict[str, Any]) -> dict[str, Any]: if self.agent is None: raise RuntimeError("No agent is bound to run generic continual plans.") prompt = self._agent_plan_prompt(plan) response = await self.agent.generate_wake_response(prompt) summary = f"{plan['title']}: {response}" self.store.add_event(plan["id"], "run", "Ran generic continual plan through the agent.", {"response": response}) return {"status": "ok", "summary": summary, "checked": 0, "drafted": 0} async def _run_buying_plan(self, plan: dict[str, Any]) -> dict[str, Any]: items = [item for item in plan.get("items") or [] if item.get("status") != "acquired"] if not items: self.store.set_status(plan["id"], "completed") summary = f"{plan['title']}: all checklist items are marked acquired." return {"status": "completed", "summary": summary, "drafted": 0, "checked": 0} checked = 0 drafted = 0 best_lines = [] constraints = plan.get("constraints") or {} excluded_sellers = {str(value).casefold() for value in constraints.get("excluded_sellers") or []} preferred_locations = [str(value).casefold() for value in constraints.get("preferred_locations") or []] for item in items: response = await self.tools.search_marketplace_listings( query=item["item_name"], operation="sell", type="item", limit=25, ) listings = response.get("listings") or response.get("data") or [] seen: set[str] = set() candidates = [] for listing in listings: if not isinstance(listing, dict): continue listing_id = str(listing.get("id") or listing.get("slug") or "") if listing_id: seen.add(listing_id) if str(listing.get("advertiser") or listing.get("seller") or "").casefold() in excluded_sellers: continue score = self._candidate_score(listing, item, preferred_locations) candidate = self.store.upsert_candidate(plan["id"], int(item["id"]), listing, score) candidates.append(candidate) stale = self.store.mark_stale_candidates(int(item["id"]), seen) checked += 1 current_candidates = [candidate for candidate in candidates if candidate.get("status") == "current"] current_candidates.sort(key=lambda candidate: (-float(candidate.get("score") or 0), float(candidate.get("price") or 10**18))) best = current_candidates[0] if current_candidates else None if not best: best_lines.append(f"{item['item_name']}: no active matching sell listings found.") self.store.add_event(plan["id"], "search", f"{item['item_name']}: no active candidates found.", {"stale": stale}) continue best_lines.append( f"{item['item_name']}: best candidate is {best.get('title') or best.get('listing_slug')} " f"at {self._format_price(best.get('price'), best.get('currency'))} from {best.get('seller') or 'unknown seller'}." ) self.store.add_event( plan["id"], "search", f"{item['item_name']}: found {len(current_candidates)} current candidate(s); {stale} stale candidate(s) marked.", {"best_candidate_id": best.get("id")}, ) if self.store.has_negotiation_for_candidate(plan["id"], int(item["id"]), best) or not self._within_budget(best, item, constraints): continue draft = await self._draft_buying_message(plan, item, best) if "pending_action" in draft: drafted += 1 self.store.mark_candidate_drafted(int(best["id"])) self.store.add_negotiation( plan["id"], int(item["id"]), int(best["id"]), { "listing_id": best.get("listing_id"), "listing_slug": best.get("listing_slug"), "status": "drafted", }, ) self.store.add_event( plan["id"], "draft", f"Drafted negotiation opener for {item['item_name']} candidate {best.get('listing_id')}.", {"pending_action_id": draft["pending_action"].get("id"), "candidate_id": best.get("id")}, ) summary = f"{plan['title']}: checked {checked} item(s). " + " ".join(best_lines[:4]) if drafted: summary += f" Drafted {drafted} negotiation message(s) for approval." self.store.add_event(plan["id"], "run", summary, {"checked": checked, "drafted": drafted}) return {"status": "ok", "summary": summary, "checked": checked, "drafted": drafted} async def _draft_buying_message(self, plan: dict[str, Any], item: dict[str, Any], candidate: dict[str, Any]) -> dict[str, Any]: tone = (plan.get("constraints") or {}).get("message_tone") or "polite and concise" message = ( f"Hi, I am interested in your {candidate.get('title') or item['item_name']} listing " f"for {self._format_price(candidate.get('price'), candidate.get('currency'))}. " f"Is it still available? I am trying to complete: {plan['objective']}. " f"Tone note: {tone}." ) return await self.tools.draft_negotiation_message( message=message, id_listing=self._int_or_none(candidate.get("listing_id")), plan_id=plan["id"], plan_item_id=int(item["id"]), candidate_id=int(candidate["id"]), listing_slug=candidate.get("listing_slug"), ) @staticmethod def _candidate_score(listing: dict[str, Any], item: dict[str, Any], preferred_locations: list[str]) -> float: price = float(listing.get("price") or 10**12) max_price = item.get("max_unit_price") budget_bonus = 40.0 if max_price and price <= float(max_price) else 0.0 stock = float(listing.get("in_stock") or listing.get("stock") or 1) location = str(listing.get("location") or "").casefold() location_bonus = 8.0 if preferred_locations and any(place in location for place in preferred_locations) else 0.0 return round(max(0.0, 50.0 - (price / 10_000_000.0)) + min(stock, 20.0) + budget_bonus + location_bonus, 4) @staticmethod def _within_budget(candidate: dict[str, Any], item: dict[str, Any], constraints: dict[str, Any]) -> bool: price = candidate.get("price") if price is None: return False max_price = item.get("max_unit_price") or constraints.get("max_unit_price") return max_price is None or float(price) <= float(max_price) @staticmethod def _format_price(price: Any, currency: Any) -> str: if isinstance(price, (int, float)): return f"{price:,.0f} {currency or 'UEC'}" return f"unknown price {currency or 'UEC'}" @staticmethod def _int_or_none(value: Any) -> int | None: try: return int(value) except (TypeError, ValueError): return None @staticmethod def _agent_plan_prompt(plan: dict[str, Any]) -> str: recent_events = [ { "kind": event.get("kind"), "message": event.get("message"), "created_at": event.get("created_at"), } for event in (plan.get("events") or [])[:8] ] payload = { "plan_id": plan.get("id"), "title": plan.get("title"), "kind": plan.get("kind"), "objective": plan.get("objective"), "constraints": plan.get("constraints") or {}, "items": plan.get("items") or [], "recent_events": recent_events, } return ( "Continual plan wake run. Continue this durable plan and write an Inbox-ready summary. " "Use tools as needed. For any account-affecting marketplace write, only draft a pending action for approval. " "Do not claim a message, offer, listing, or negotiation was sent unless an approved action result says it was sent. " f"Plan JSON: {json.dumps(payload, ensure_ascii=True)}" )