diff --git a/tests/test_plans.py b/tests/test_plans.py new file mode 100644 index 0000000..98a9524 --- /dev/null +++ b/tests/test_plans.py @@ -0,0 +1,217 @@ +import pytest +from datetime import timedelta + +from traderai.memory import MemoryStore, utc_now +from traderai.plans import ContinualPlanRunner, ContinualPlanStore +from traderai.scheduler import WakeScheduler +from traderai.tools import ToolRegistry + + +class BuyingUEX: + def __init__(self): + self.posts = [] + + async def get(self, path, params=None, authenticated=False): + if path == "marketplace_listings": + return { + "data": [ + { + "id": 501, + "slug": "wikelo-panel-good", + "title": "Wikelo Idris panel", + "operation": "sell", + "type": "item", + "price": 450_000, + "currency": "UEC", + "in_stock": 2, + "location": "Orison", + "user_username": "seller_a", + }, + { + "id": 502, + "slug": "wikelo-panel-expensive", + "title": "Wikelo Idris panel premium", + "operation": "sell", + "type": "item", + "price": 900_000, + "currency": "UEC", + "in_stock": 1, + "location": "Area18", + "user_username": "seller_b", + }, + ], + } + return {"data": []} + + async def post(self, path, payload, authenticated=True): + self.posts.append({"path": path, "payload": payload, "authenticated": authenticated}) + return {"status": "ok", "posted": self.posts[-1]} + + async def delete(self, path, params=None, authenticated=True): + return {"status": "ok"} + + +class FakePlanAgent: + def __init__(self): + self.prompts = [] + + async def generate_wake_response(self, wake_message): + self.prompts.append(wake_message) + return "Custom plan checked notifications and found no blockers." + + +def plan_stack(tmp_path): + memory = MemoryStore(str(tmp_path / "memory.sqlite3")) + store = ContinualPlanStore(memory) + scheduler = WakeScheduler(memory) + tools = ToolRegistry(BuyingUEX(), memory=memory, scheduler=scheduler, plan_store=store) + runner = ContinualPlanRunner(store, tools, memory) + tools.plan_runner = runner + scheduler.bind_plan_runner(runner) + return memory, store, tools, runner, scheduler + + +def test_continual_plan_store_creates_needs_input_plan(tmp_path): + _, store, _, _, _ = plan_stack(tmp_path) + + plan = store.create_plan("Wikelo Idris", objective="Get all parts", items=[]) + + assert plan["status"] == "needs_input" + assert plan["items"] == [] + assert plan["events"][0]["kind"] == "needs_input" + + +def test_custom_plan_without_items_is_active(tmp_path): + _, store, _, _, _ = plan_stack(tmp_path) + + plan = store.create_plan("Watch negotiations", kind="custom", objective="Check replies and summarize next steps", items=[]) + + assert plan["status"] == "active" + assert plan["items"] == [] + + +def test_continual_plan_store_creates_buying_checklist(tmp_path): + _, store, _, _, _ = plan_stack(tmp_path) + + plan = store.create_plan( + "Wikelo Idris", + objective="Get all listed parts", + items=[{"item_name": "Wikelo Idris panel", "desired_quantity": 2, "max_unit_price": 500_000}], + ) + + assert plan["status"] == "active" + assert plan["items"][0]["item_name"] == "Wikelo Idris panel" + assert plan["items"][0]["desired_quantity"] == 2 + + +@pytest.mark.asyncio +async def test_buying_runner_tracks_candidates_and_drafts_only(tmp_path): + memory, store, tools, runner, _ = plan_stack(tmp_path) + plan = store.create_plan( + "Wikelo Idris", + objective="Get all listed parts", + items=[{"item_name": "Wikelo Idris panel", "desired_quantity": 1, "max_unit_price": 500_000}], + ) + + result = await runner.run_plan(plan["id"]) + snapshot = store.get_plan(plan["id"]) + + assert result["drafted"] == 1 + assert any(candidate["listing_id"] == "501" and candidate["status"] == "drafted" for candidate in snapshot["candidates"]) + assert snapshot["negotiations"][0]["status"] == "drafted" + assert len(tools.pending_actions) == 1 + assert not tools.uex.posts + assert "Drafted 1 negotiation" in memory.list_outbox()[0]["content"] + + +@pytest.mark.asyncio +async def test_plan_approval_logs_back_to_plan(tmp_path): + _, store, tools, runner, _ = plan_stack(tmp_path) + plan = store.create_plan( + "Wikelo Idris", + objective="Get all listed parts", + items=[{"item_name": "Wikelo Idris panel", "max_unit_price": 500_000}], + ) + await runner.run_plan(plan["id"]) + action_id = next(iter(tools.pending_actions)) + + approved = await tools.approve(action_id) + snapshot = store.get_plan(plan["id"]) + + assert approved["posted"]["path"] == "marketplace_negotiations_messages" + assert any(event["kind"] == "approved" for event in snapshot["events"]) + assert any(negotiation["status"] == "approved" for negotiation in snapshot["negotiations"]) + + +@pytest.mark.asyncio +async def test_custom_runner_continues_plan_through_agent(tmp_path): + memory, store, tools, runner, _ = plan_stack(tmp_path) + agent = FakePlanAgent() + runner.bind_agent(agent) + plan = store.create_plan( + "Watch open negotiations", + kind="custom", + objective="Check UEX replies and recommend next action", + constraints={"instructions": "Pay attention to stale buyer replies."}, + items=[], + ) + + result = await runner.run_plan(plan["id"]) + snapshot = store.get_plan(plan["id"]) + + assert result["status"] == "ok" + assert "Custom plan checked notifications" in result["summary"] + assert plan["id"] in agent.prompts[0] + assert any(event["kind"] == "run" for event in snapshot["events"]) + assert "Custom plan checked notifications" in memory.list_outbox()[0]["content"] + + +@pytest.mark.asyncio +async def test_scheduler_plan_run_survives_runner_error(tmp_path): + memory = MemoryStore(str(tmp_path / "memory.sqlite3")) + store = ContinualPlanStore(memory) + plan = store.create_plan( + "Broken plan", + objective="Test failure handling", + items=[{"item_name": "Wikelo Idris panel"}], + ) + + class FailingRunner: + def __init__(self, store): + self.store = store + + async def run_plan(self, plan_id): + self.store.add_event(plan_id, "error", "boom") + memory.add_outbox("Broken plan: boom") + return {"error": "boom", "plan": self.store.get_plan(plan_id)} + + scheduler = WakeScheduler(memory) + scheduler.bind_plan_runner(FailingRunner(store)) + + await scheduler._run_plan(plan["id"]) + + snapshot = store.get_plan(plan["id"]) + assert snapshot["status"] == "active" + assert snapshot["events"][0]["kind"] == "error" + assert "boom" in memory.list_outbox()[0]["content"] + + +@pytest.mark.asyncio +async def test_scheduler_schedules_overdue_plan_catchup_on_start(tmp_path): + memory, store, _, runner, scheduler = plan_stack(tmp_path) + plan = store.create_plan( + "Overdue plan", + objective="Check after restart", + items=[{"item_name": "Wikelo Idris panel"}], + ) + store.update_schedule(plan["id"], (utc_now() - timedelta(minutes=5)).isoformat()) + + scheduler.start() + try: + catchup = scheduler.scheduler.get_job(scheduler._plan_catchup_job_id(plan["id"])) + snapshot = store.get_plan(plan["id"]) + finally: + scheduler.shutdown() + + assert catchup is not None + assert any(event["kind"] == "catchup_scheduled" for event in snapshot["events"]) diff --git a/traderai/agent.py b/traderai/agent.py index 9b0c21e..576f3f2 100644 --- a/traderai/agent.py +++ b/traderai/agent.py @@ -14,6 +14,7 @@ from traderai.tools import ToolRegistry SYSTEM_PROMPT = """You are TraderAI, a local assistant for UEX marketplace work. Use tools when the user asks about UEX data, open/current listings, active negotiations, unread notifications, messages, offers, or posting ads. +Use continual plan tools when the user asks for multi-day or recurring marketplace work, such as finding several parts, watching for deals, tracking candidates, or coordinating negotiations over time. UEX credentials are configured server-side when available. Never ask the user to provide UEX_SECRET_KEY or UEX_BEARER_TOKEN in chat; call the authenticated UEX tool and only mention credential configuration if the tool returns an authentication error. Use the specific UEX tool for the needed endpoint, such as get_uex_commodities_prices or get_uex_vehicles. Use fields, limit, and summary mode so tool results stay compact. When the user asks for history, trends, changes over time, or past prices, prefer the summarize_uex_*_history tools when available; use search_uex_api_index(history_only=true) if you need to discover history endpoints. @@ -22,6 +23,7 @@ Use Cornerstone tools when the user asks where an item is sold, which shops carr Prefer open and current UEX marketplace information. Do not use historical sale data, completed sale records, or sale/average-history information unless the user explicitly asks for historical sales. Treat UEX marketplace prices as in-game aUEC/UEC credits, never real-world dollars, unless the user explicitly says otherwise. For marketplace writes, draft the exact pending action and tell the user what will be sent; never claim it was sent until approval succeeds. +For continual plans, never invent an unknown parts checklist. If the required items cannot be derived from provided details or tools, create the plan in a needs-input state and say what item list is missing. When a scheduled wake job fires, always write a concise Inbox-ready result that says what you checked, the key findings, and the suggested next action. Keep prices, listing ids, slugs, users, and UEX status codes precise. If data is missing, say what you need next.""" @@ -473,6 +475,7 @@ class OllamaAgent: "method": action.method, "endpoint": action.endpoint, "payload": action.payload, + "metadata": action.metadata or {}, } for action in self.tools.pending_actions.values() ] diff --git a/traderai/plans.py b/traderai/plans.py new file mode 100644 index 0000000..80ddf9a --- /dev/null +++ b/traderai/plans.py @@ -0,0 +1,590 @@ +from __future__ import annotations + +import json +import uuid +from typing import Any + +from traderai.memory import MemoryStore, iso_now + + +DEFAULT_PLAN_CADENCE = "0 */6 * * *" + + +class ContinualPlanStore: + def __init__(self, memory: MemoryStore) -> None: + self.memory = memory + self._init_db() + + def _init_db(self) -> None: + with self.memory._connect() as db: + db.executescript( + """ + CREATE TABLE IF NOT EXISTS continual_plans ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + kind TEXT NOT NULL, + status TEXT NOT NULL, + objective TEXT NOT NULL, + constraints TEXT NOT NULL DEFAULT '{}', + cadence TEXT NOT NULL, + next_run_at TEXT, + last_run_at TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS continual_plan_items ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + plan_id TEXT NOT NULL, + item_name TEXT NOT NULL, + desired_quantity INTEGER NOT NULL DEFAULT 1, + max_unit_price REAL, + status TEXT NOT NULL DEFAULT 'active', + acquired_quantity INTEGER NOT NULL DEFAULT 0, + metadata TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS continual_plan_candidates ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + plan_id TEXT NOT NULL, + plan_item_id INTEGER NOT NULL, + listing_id TEXT, + listing_slug TEXT, + title TEXT, + seller TEXT, + price REAL, + currency TEXT, + stock INTEGER, + location TEXT, + score REAL, + first_seen_at TEXT NOT NULL, + last_seen_at TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'current', + metadata TEXT NOT NULL DEFAULT '{}', + UNIQUE(plan_item_id, listing_id) + ); + + CREATE TABLE IF NOT EXISTS continual_plan_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + plan_id TEXT NOT NULL, + kind TEXT NOT NULL, + message TEXT NOT NULL, + metadata TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS continual_plan_negotiations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + plan_id TEXT NOT NULL, + plan_item_id INTEGER, + candidate_id INTEGER, + listing_id TEXT, + listing_slug TEXT, + negotiation_id TEXT, + negotiation_hash TEXT, + status TEXT NOT NULL DEFAULT 'drafted', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + """ + ) + + def create_plan( + self, + title: str, + kind: str = "buying", + objective: str = "", + items: list[dict[str, Any]] | None = None, + constraints: dict[str, Any] | None = None, + cadence: str | None = None, + status: str | None = None, + ) -> dict[str, Any]: + clean_items = [item for item in (items or []) if str(item.get("item_name") or item.get("name") or "").strip()] + plan_id = f"plan-{uuid.uuid4()}" + now = iso_now() + clean_kind = (kind.strip() or "buying").casefold() + resolved_status = status or ("needs_input" if clean_kind == "buying" and not clean_items else "active") + with self.memory._connect() as db: + db.execute( + """ + INSERT INTO continual_plans(id, title, kind, status, objective, constraints, cadence, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + plan_id, + title.strip() or "Continual plan", + clean_kind, + resolved_status, + objective.strip() or title.strip(), + json.dumps(constraints or {}), + (cadence or DEFAULT_PLAN_CADENCE).strip() or DEFAULT_PLAN_CADENCE, + now, + now, + ), + ) + for item in clean_items: + db.execute( + """ + INSERT INTO continual_plan_items( + plan_id, item_name, desired_quantity, max_unit_price, status, + acquired_quantity, metadata, created_at, updated_at + ) + VALUES (?, ?, ?, ?, 'active', ?, ?, ?, ?) + """, + ( + plan_id, + str(item.get("item_name") or item.get("name")).strip(), + max(1, int(item.get("desired_quantity") or item.get("quantity") or 1)), + item.get("max_unit_price"), + max(0, int(item.get("acquired_quantity") or 0)), + json.dumps(item.get("metadata") or {}), + now, + now, + ), + ) + if clean_kind == "buying" and not clean_items: + self.add_event(plan_id, "needs_input", "Created plan, but no item checklist was provided. Add the required parts before it can run.") + elif clean_items: + self.add_event(plan_id, "created", f"Created continual {clean_kind} plan with {len(clean_items)} checklist item(s).") + else: + self.add_event(plan_id, "created", f"Created continual {clean_kind} plan.") + return self.get_plan(plan_id) or {} + + def list_plans(self, include_inactive: bool = True) -> list[dict[str, Any]]: + where = "" if include_inactive else "WHERE status = 'active'" + with self.memory._connect() as db: + rows = db.execute( + f""" + SELECT * + FROM continual_plans + {where} + ORDER BY + CASE status WHEN 'active' THEN 0 WHEN 'needs_input' THEN 1 WHEN 'paused' THEN 2 ELSE 3 END, + updated_at DESC + """ + ).fetchall() + return [self._plan_row(row) for row in rows] + + def get_plan(self, plan_id: str) -> dict[str, Any] | None: + with self.memory._connect() as db: + plan = db.execute("SELECT * FROM continual_plans WHERE id = ?", (plan_id,)).fetchone() + if not plan: + return None + data = self._plan_row(plan) + data["items"] = self.list_items(plan_id) + data["candidates"] = self.list_candidates(plan_id) + data["negotiations"] = self.list_negotiations(plan_id) + data["events"] = self.list_events(plan_id) + return data + + def list_items(self, plan_id: str) -> list[dict[str, Any]]: + with self.memory._connect() as db: + rows = db.execute( + "SELECT * FROM continual_plan_items WHERE plan_id = ? ORDER BY id", + (plan_id,), + ).fetchall() + return [self._json_row(row, "metadata") for row in rows] + + def list_candidates(self, plan_id: str, limit: int = 100) -> list[dict[str, Any]]: + with self.memory._connect() as db: + rows = db.execute( + """ + SELECT * + FROM continual_plan_candidates + WHERE plan_id = ? + ORDER BY status = 'current' DESC, score DESC, last_seen_at DESC + LIMIT ? + """, + (plan_id, limit), + ).fetchall() + return [self._json_row(row, "metadata") for row in rows] + + def list_events(self, plan_id: str, limit: int = 50) -> list[dict[str, Any]]: + with self.memory._connect() as db: + rows = db.execute( + """ + SELECT * + FROM continual_plan_events + WHERE plan_id = ? + ORDER BY id DESC + LIMIT ? + """, + (plan_id, limit), + ).fetchall() + return [self._json_row(row, "metadata") for row in rows] + + def list_negotiations(self, plan_id: str) -> list[dict[str, Any]]: + with self.memory._connect() as db: + rows = db.execute( + "SELECT * FROM continual_plan_negotiations WHERE plan_id = ? ORDER BY updated_at DESC", + (plan_id,), + ).fetchall() + return [dict(row) for row in rows] + + def set_status(self, plan_id: str, status: str) -> dict[str, Any] | None: + with self.memory._connect() as db: + db.execute( + "UPDATE continual_plans SET status = ?, updated_at = ? WHERE id = ?", + (status, iso_now(), plan_id), + ) + self.add_event(plan_id, status, f"Plan status changed to {status}.") + return self.get_plan(plan_id) + + def add_event(self, plan_id: str, kind: str, message: str, metadata: dict[str, Any] | None = None) -> dict[str, Any]: + now = iso_now() + with self.memory._connect() as db: + cursor = db.execute( + """ + INSERT INTO continual_plan_events(plan_id, kind, message, metadata, created_at) + VALUES (?, ?, ?, ?, ?) + """, + (plan_id, kind, message, json.dumps(metadata or {}), now), + ) + return {"id": cursor.lastrowid, "plan_id": plan_id, "kind": kind, "message": message, "created_at": now} + + def update_schedule(self, plan_id: str, next_run_at: str | None = None, last_run_at: str | None = None) -> None: + fields = ["next_run_at = ?", "updated_at = ?"] + values: list[Any] = [next_run_at, iso_now()] + if last_run_at is not None: + fields.insert(1, "last_run_at = ?") + values.insert(1, last_run_at) + values.append(plan_id) + with self.memory._connect() as db: + db.execute(f"UPDATE continual_plans SET {', '.join(fields)} WHERE id = ?", values) + + def upsert_candidate(self, plan_id: str, plan_item_id: int, listing: dict[str, Any], score: float) -> dict[str, Any]: + now = iso_now() + listing_id = str(listing.get("id") or listing.get("listing_id") or listing.get("slug") or uuid.uuid4()) + metadata = dict(listing) + with self.memory._connect() as db: + db.execute( + """ + INSERT INTO continual_plan_candidates( + plan_id, plan_item_id, listing_id, listing_slug, title, seller, price, currency, + stock, location, score, first_seen_at, last_seen_at, status, metadata + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'current', ?) + ON CONFLICT(plan_item_id, listing_id) DO UPDATE SET + listing_slug=excluded.listing_slug, + title=excluded.title, + seller=excluded.seller, + price=excluded.price, + currency=excluded.currency, + stock=excluded.stock, + location=excluded.location, + score=excluded.score, + last_seen_at=excluded.last_seen_at, + status='current', + metadata=excluded.metadata + """, + ( + plan_id, + plan_item_id, + listing_id, + listing.get("slug"), + listing.get("title"), + listing.get("advertiser") or listing.get("user_username") or listing.get("seller"), + listing.get("price"), + listing.get("currency"), + listing.get("in_stock") or listing.get("stock"), + listing.get("location"), + score, + now, + now, + json.dumps(metadata), + ), + ) + row = db.execute( + "SELECT * FROM continual_plan_candidates WHERE plan_item_id = ? AND listing_id = ?", + (plan_item_id, listing_id), + ).fetchone() + return self._json_row(row, "metadata") + + def mark_stale_candidates(self, plan_item_id: int, seen_listing_ids: set[str]) -> int: + with self.memory._connect() as db: + rows = db.execute( + "SELECT id, listing_id FROM continual_plan_candidates WHERE plan_item_id = ? AND status = 'current'", + (plan_item_id,), + ).fetchall() + stale_ids = [row["id"] for row in rows if str(row["listing_id"]) not in seen_listing_ids] + if stale_ids: + placeholders = ",".join("?" for _ in stale_ids) + db.execute( + f"UPDATE continual_plan_candidates SET status = 'stale', last_seen_at = ? WHERE id IN ({placeholders})", + (iso_now(), *stale_ids), + ) + return len(stale_ids) + + def mark_candidate_drafted(self, candidate_id: int) -> None: + with self.memory._connect() as db: + db.execute("UPDATE continual_plan_candidates SET status = 'drafted', last_seen_at = ? WHERE id = ?", (iso_now(), candidate_id)) + + def add_negotiation(self, plan_id: str, plan_item_id: int | None, candidate_id: int | None, metadata: dict[str, Any]) -> dict[str, Any]: + now = iso_now() + with self.memory._connect() as db: + cursor = db.execute( + """ + INSERT INTO continual_plan_negotiations( + plan_id, plan_item_id, candidate_id, listing_id, listing_slug, + negotiation_id, negotiation_hash, status, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + plan_id, + plan_item_id, + candidate_id, + metadata.get("listing_id"), + metadata.get("listing_slug"), + metadata.get("id_negotiation"), + metadata.get("hash"), + metadata.get("status") or "drafted", + now, + now, + ), + ) + row = db.execute("SELECT * FROM continual_plan_negotiations WHERE id = ?", (cursor.lastrowid,)).fetchone() + return dict(row) + + def has_negotiation_for_candidate(self, plan_id: str, plan_item_id: int, candidate: dict[str, Any]) -> bool: + with self.memory._connect() as db: + row = db.execute( + """ + SELECT id + FROM continual_plan_negotiations + WHERE plan_id = ? + AND plan_item_id = ? + AND ( + candidate_id = ? + OR (listing_id IS NOT NULL AND listing_id = ?) + OR (listing_slug IS NOT NULL AND listing_slug = ?) + ) + LIMIT 1 + """, + ( + plan_id, + plan_item_id, + candidate.get("id"), + candidate.get("listing_id"), + candidate.get("listing_slug"), + ), + ).fetchone() + return row is not None + + @staticmethod + def _json_row(row: Any, *json_fields: str) -> dict[str, Any]: + data = dict(row) + for field in json_fields: + try: + data[field] = json.loads(data.get(field) or "{}") + except (TypeError, json.JSONDecodeError): + data[field] = {} + return data + + @classmethod + def _plan_row(cls, row: Any) -> dict[str, Any]: + return cls._json_row(row, "constraints") + + +class ContinualPlanRunner: + def __init__(self, store: ContinualPlanStore, tools: Any, memory: MemoryStore, agent: Any | None = None) -> None: + self.store = store + self.tools = tools + self.memory = memory + self.agent = agent + + def bind_agent(self, agent: Any) -> None: + self.agent = agent + + async def run_plan(self, plan_id: str) -> dict[str, Any]: + plan = self.store.get_plan(plan_id) + if not plan: + return {"error": f"Plan not found: {plan_id}"} + if plan["status"] != "active": + message = f"Skipped {plan['title']} because status is {plan['status']}." + self.store.add_event(plan_id, "skipped", message) + return {"status": "skipped", "summary": message, "plan": self.store.get_plan(plan_id)} + try: + if plan["kind"] == "buying": + result = await self._run_buying_plan(plan) + else: + result = await self._run_agent_plan(plan) + self.store.update_schedule(plan_id, plan.get("next_run_at"), last_run_at=iso_now()) + self.memory.add_outbox(result["summary"]) + return {**result, "plan": self.store.get_plan(plan_id)} + except Exception as exc: + message = f"Continual plan failed: {exc}" + self.store.add_event(plan_id, "error", message) + self.memory.add_outbox(f"{plan['title']}: {message}") + self.store.update_schedule(plan_id, plan.get("next_run_at"), last_run_at=iso_now()) + return {"error": str(exc), "summary": message, "plan": self.store.get_plan(plan_id)} + + async def _run_agent_plan(self, plan: dict[str, Any]) -> dict[str, Any]: + if self.agent is None: + raise RuntimeError("No agent is bound to run generic continual plans.") + prompt = self._agent_plan_prompt(plan) + response = await self.agent.generate_wake_response(prompt) + summary = f"{plan['title']}: {response}" + self.store.add_event(plan["id"], "run", "Ran generic continual plan through the agent.", {"response": response}) + return {"status": "ok", "summary": summary, "checked": 0, "drafted": 0} + + async def _run_buying_plan(self, plan: dict[str, Any]) -> dict[str, Any]: + items = [item for item in plan.get("items") or [] if item.get("status") != "acquired"] + if not items: + self.store.set_status(plan["id"], "completed") + summary = f"{plan['title']}: all checklist items are marked acquired." + return {"status": "completed", "summary": summary, "drafted": 0, "checked": 0} + + checked = 0 + drafted = 0 + best_lines = [] + constraints = plan.get("constraints") or {} + excluded_sellers = {str(value).casefold() for value in constraints.get("excluded_sellers") or []} + preferred_locations = [str(value).casefold() for value in constraints.get("preferred_locations") or []] + + for item in items: + response = await self.tools.search_marketplace_listings( + query=item["item_name"], + operation="sell", + type="item", + limit=25, + ) + listings = response.get("listings") or response.get("data") or [] + seen: set[str] = set() + candidates = [] + for listing in listings: + if not isinstance(listing, dict): + continue + listing_id = str(listing.get("id") or listing.get("slug") or "") + if listing_id: + seen.add(listing_id) + if str(listing.get("advertiser") or listing.get("seller") or "").casefold() in excluded_sellers: + continue + score = self._candidate_score(listing, item, preferred_locations) + candidate = self.store.upsert_candidate(plan["id"], int(item["id"]), listing, score) + candidates.append(candidate) + stale = self.store.mark_stale_candidates(int(item["id"]), seen) + checked += 1 + current_candidates = [candidate for candidate in candidates if candidate.get("status") == "current"] + current_candidates.sort(key=lambda candidate: (-float(candidate.get("score") or 0), float(candidate.get("price") or 10**18))) + best = current_candidates[0] if current_candidates else None + if not best: + best_lines.append(f"{item['item_name']}: no active matching sell listings found.") + self.store.add_event(plan["id"], "search", f"{item['item_name']}: no active candidates found.", {"stale": stale}) + continue + + best_lines.append( + f"{item['item_name']}: best candidate is {best.get('title') or best.get('listing_slug')} " + f"at {self._format_price(best.get('price'), best.get('currency'))} from {best.get('seller') or 'unknown seller'}." + ) + self.store.add_event( + plan["id"], + "search", + f"{item['item_name']}: found {len(current_candidates)} current candidate(s); {stale} stale candidate(s) marked.", + {"best_candidate_id": best.get("id")}, + ) + + if self.store.has_negotiation_for_candidate(plan["id"], int(item["id"]), best) or not self._within_budget(best, item, constraints): + continue + draft = await self._draft_buying_message(plan, item, best) + if "pending_action" in draft: + drafted += 1 + self.store.mark_candidate_drafted(int(best["id"])) + self.store.add_negotiation( + plan["id"], + int(item["id"]), + int(best["id"]), + { + "listing_id": best.get("listing_id"), + "listing_slug": best.get("listing_slug"), + "status": "drafted", + }, + ) + self.store.add_event( + plan["id"], + "draft", + f"Drafted negotiation opener for {item['item_name']} candidate {best.get('listing_id')}.", + {"pending_action_id": draft["pending_action"].get("id"), "candidate_id": best.get("id")}, + ) + + summary = f"{plan['title']}: checked {checked} item(s). " + " ".join(best_lines[:4]) + if drafted: + summary += f" Drafted {drafted} negotiation message(s) for approval." + self.store.add_event(plan["id"], "run", summary, {"checked": checked, "drafted": drafted}) + return {"status": "ok", "summary": summary, "checked": checked, "drafted": drafted} + + async def _draft_buying_message(self, plan: dict[str, Any], item: dict[str, Any], candidate: dict[str, Any]) -> dict[str, Any]: + tone = (plan.get("constraints") or {}).get("message_tone") or "polite and concise" + message = ( + f"Hi, I am interested in your {candidate.get('title') or item['item_name']} listing " + f"for {self._format_price(candidate.get('price'), candidate.get('currency'))}. " + f"Is it still available? I am trying to complete: {plan['objective']}. " + f"Tone note: {tone}." + ) + return await self.tools.draft_negotiation_message( + message=message, + id_listing=self._int_or_none(candidate.get("listing_id")), + plan_id=plan["id"], + plan_item_id=int(item["id"]), + candidate_id=int(candidate["id"]), + listing_slug=candidate.get("listing_slug"), + ) + + @staticmethod + def _candidate_score(listing: dict[str, Any], item: dict[str, Any], preferred_locations: list[str]) -> float: + price = float(listing.get("price") or 10**12) + max_price = item.get("max_unit_price") + budget_bonus = 40.0 if max_price and price <= float(max_price) else 0.0 + stock = float(listing.get("in_stock") or listing.get("stock") or 1) + location = str(listing.get("location") or "").casefold() + location_bonus = 8.0 if preferred_locations and any(place in location for place in preferred_locations) else 0.0 + return round(max(0.0, 50.0 - (price / 10_000_000.0)) + min(stock, 20.0) + budget_bonus + location_bonus, 4) + + @staticmethod + def _within_budget(candidate: dict[str, Any], item: dict[str, Any], constraints: dict[str, Any]) -> bool: + price = candidate.get("price") + if price is None: + return False + max_price = item.get("max_unit_price") or constraints.get("max_unit_price") + return max_price is None or float(price) <= float(max_price) + + @staticmethod + def _format_price(price: Any, currency: Any) -> str: + if isinstance(price, (int, float)): + return f"{price:,.0f} {currency or 'UEC'}" + return f"unknown price {currency or 'UEC'}" + + @staticmethod + def _int_or_none(value: Any) -> int | None: + try: + return int(value) + except (TypeError, ValueError): + return None + + @staticmethod + def _agent_plan_prompt(plan: dict[str, Any]) -> str: + recent_events = [ + { + "kind": event.get("kind"), + "message": event.get("message"), + "created_at": event.get("created_at"), + } + for event in (plan.get("events") or [])[:8] + ] + payload = { + "plan_id": plan.get("id"), + "title": plan.get("title"), + "kind": plan.get("kind"), + "objective": plan.get("objective"), + "constraints": plan.get("constraints") or {}, + "items": plan.get("items") or [], + "recent_events": recent_events, + } + return ( + "Continual plan wake run. Continue this durable plan and write an Inbox-ready summary. " + "Use tools as needed. For any account-affecting marketplace write, only draft a pending action for approval. " + "Do not claim a message, offer, listing, or negotiation was sent unless an approved action result says it was sent. " + f"Plan JSON: {json.dumps(payload, ensure_ascii=True)}" + ) diff --git a/traderai/scheduler.py b/traderai/scheduler.py index dee4c31..af3cd00 100644 --- a/traderai/scheduler.py +++ b/traderai/scheduler.py @@ -1,6 +1,6 @@ from __future__ import annotations -from datetime import datetime +from datetime import datetime, timedelta from typing import Any from uuid import uuid4 @@ -10,7 +10,7 @@ from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger from tzlocal import get_localzone -from traderai.memory import MemoryStore, iso_now, time_since +from traderai.memory import MemoryStore, iso_now, parse_iso, time_since, utc_now UEX_NOTIFICATION_JOB_ID = "uex-notification-poll" @@ -22,11 +22,15 @@ class WakeScheduler: self.scheduler = AsyncIOScheduler(timezone=get_localzone()) self.agent = None self.uex = None + self.plan_runner = None self.notification_poll_seconds = 60 def bind_agent(self, agent: Any) -> None: self.agent = agent + def bind_plan_runner(self, plan_runner: Any) -> None: + self.plan_runner = plan_runner + def bind_uex_notifications(self, uex: Any, poll_seconds: int = 60) -> None: self.uex = uex self.notification_poll_seconds = max(15, poll_seconds) @@ -37,6 +41,9 @@ class WakeScheduler: self._schedule_notification_poll() for job in self.memory.list_jobs(): self._schedule_existing(job) + if self.plan_runner is not None: + for plan in self.plan_runner.store.list_plans(include_inactive=False): + self.schedule_plan(plan) def shutdown(self) -> None: if self.scheduler.running: @@ -59,6 +66,70 @@ class WakeScheduler: def list_jobs(self) -> list[dict[str, Any]]: return self.memory.list_jobs() + def schedule_plan(self, plan: dict[str, Any]) -> dict[str, Any]: + if self.plan_runner is None or plan.get("status") != "active": + return plan + job_id = self._plan_job_id(plan["id"]) + previous_next_run = plan.get("next_run_at") + trigger = CronTrigger.from_crontab(plan.get("cadence") or "0 */6 * * *") + self.scheduler.add_job(self._run_plan, trigger=trigger, id=job_id, args=[plan["id"]], replace_existing=True) + job = self.scheduler.get_job(job_id) + next_run = job.next_run_time if job else None + self.plan_runner.store.update_schedule(plan["id"], next_run.isoformat() if next_run else None) + if self._plan_is_overdue(previous_next_run): + catchup_id = self._plan_catchup_job_id(plan["id"]) + self.scheduler.add_job( + self._run_plan, + trigger=DateTrigger(run_date=datetime.now() + timedelta(seconds=5)), + id=catchup_id, + args=[plan["id"]], + replace_existing=True, + ) + self.plan_runner.store.add_event( + plan["id"], + "catchup_scheduled", + "Plan was overdue while the app was closed, so a one-time catch-up run was scheduled after startup.", + {"previous_next_run_at": previous_next_run}, + ) + return self.plan_runner.store.get_plan(plan["id"]) or plan + + def unschedule_plan(self, plan_id: str) -> None: + job_id = self._plan_job_id(plan_id) + if self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) + catchup_id = self._plan_catchup_job_id(plan_id) + if self.scheduler.get_job(catchup_id): + self.scheduler.remove_job(catchup_id) + if self.plan_runner is not None: + self.plan_runner.store.update_schedule(plan_id, None) + + async def _run_plan(self, plan_id: str) -> None: + if self.plan_runner is None: + return + result = await self.plan_runner.run_plan(plan_id) + plan = result.get("plan") or self.plan_runner.store.get_plan(plan_id) + if plan and plan.get("status") == "active": + job = self.scheduler.get_job(self._plan_job_id(plan_id)) + next_run = job.next_run_time if job else None + self.plan_runner.store.update_schedule(plan_id, next_run.isoformat() if next_run else None) + + @staticmethod + def _plan_job_id(plan_id: str) -> str: + return f"continual-{plan_id}" + + @staticmethod + def _plan_catchup_job_id(plan_id: str) -> str: + return f"continual-catchup-{plan_id}" + + @staticmethod + def _plan_is_overdue(next_run_at: str | None) -> bool: + if not next_run_at: + return False + try: + return parse_iso(next_run_at) <= utc_now() + except ValueError: + return False + def _schedule_existing(self, job: dict[str, Any]) -> None: if job["trigger_type"] == "cron": trigger = CronTrigger.from_crontab(job["trigger_value"]) diff --git a/traderai/server.py b/traderai/server.py index 8bf622a..383c7be 100644 --- a/traderai/server.py +++ b/traderai/server.py @@ -23,6 +23,7 @@ from traderai.config import save_settings, settings_payload from traderai.config import get_settings from traderai.cornerstone_client import CornerstoneClient from traderai.memory import DEFAULT_THREAD_ID, MemoryStore +from traderai.plans import ContinualPlanRunner, ContinualPlanStore from traderai.scheduler import WakeScheduler from traderai.scmdb_client import SCMDBClient from traderai.tools import ToolRegistry @@ -60,6 +61,27 @@ class ClearMemoryRequest(BaseModel): include_outbox: bool = True +class ContinualPlanItemRequest(BaseModel): + item_name: str + desired_quantity: int = 1 + max_unit_price: float | None = None + + +class ContinualPlanCreateRequest(BaseModel): + title: str + objective: str + kind: str = "buying" + cadence: str | None = None + constraints: dict[str, Any] = {} + items: list[ContinualPlanItemRequest] = [] + + +class ContinualPlanEventRequest(BaseModel): + kind: str = "note" + message: str + metadata: dict[str, Any] = {} + + class ConfigUpdateRequest(BaseModel): values: dict @@ -75,6 +97,7 @@ UPDATE_ASSET_NAME = "TraderAI.exe" def create_app() -> FastAPI: settings = get_settings() memory = MemoryStore(settings.traderai_memory_path) + plan_store = ContinualPlanStore(memory) scheduler = WakeScheduler(memory) uex = UEXClient(settings.uex_base_url, settings.uex_secret_key, settings.uex_bearer_token) scmdb = SCMDBClient(settings.scmdb_base_url) @@ -86,7 +109,10 @@ def create_app() -> FastAPI: scheduler=scheduler, scmdb=scmdb, cornerstone=cornerstone, + plan_store=plan_store, ) + plan_runner = ContinualPlanRunner(plan_store, tools, memory) + tools.plan_runner = plan_runner agent = OllamaAgent( settings.ollama_base_url, settings.ollama_model, @@ -95,7 +121,9 @@ def create_app() -> FastAPI: user_name=settings.traderai_user_name, num_ctx=settings.ollama_num_ctx, ) + plan_runner.bind_agent(agent) scheduler.bind_agent(agent) + scheduler.bind_plan_runner(plan_runner) scheduler.bind_uex_notifications(uex, settings.uex_notification_poll_seconds) app = FastAPI(title="TraderAI") @@ -348,6 +376,66 @@ def create_app() -> FastAPI: async def wake_jobs() -> dict: return {"scheduled_jobs": scheduler.list_jobs()} + @app.get("/api/plans") + async def continual_plans(include_inactive: bool = True) -> dict: + return {"plans": plan_store.list_plans(include_inactive=include_inactive)} + + @app.post("/api/plans") + async def create_continual_plan(request: ContinualPlanCreateRequest) -> dict: + result = await tools.create_continual_plan( + title=request.title, + objective=request.objective, + kind=request.kind, + items=[item.model_dump() for item in request.items], + constraints=request.constraints, + cadence=request.cadence, + ) + if result.get("error"): + raise HTTPException(status_code=400, detail=result["error"]) + return result + + @app.get("/api/plans/{plan_id}") + async def continual_plan(plan_id: str) -> dict: + plan = plan_store.get_plan(plan_id) + if not plan: + raise HTTPException(status_code=404, detail="Plan not found.") + return {"plan": plan} + + @app.post("/api/plans/{plan_id}/pause") + async def pause_continual_plan(plan_id: str) -> dict: + result = await tools.pause_continual_plan(plan_id) + if result.get("error"): + raise HTTPException(status_code=404, detail=result["error"]) + return result + + @app.post("/api/plans/{plan_id}/resume") + async def resume_continual_plan(plan_id: str) -> dict: + result = await tools.resume_continual_plan(plan_id) + if result.get("error"): + raise HTTPException(status_code=404, detail=result["error"]) + return result + + @app.post("/api/plans/{plan_id}/cancel") + async def cancel_continual_plan(plan_id: str) -> dict: + result = await tools.cancel_continual_plan(plan_id) + if result.get("error"): + raise HTTPException(status_code=404, detail=result["error"]) + return result + + @app.post("/api/plans/{plan_id}/run") + async def run_continual_plan(plan_id: str) -> dict: + result = await tools.run_continual_plan_now(plan_id) + if result.get("error"): + raise HTTPException(status_code=400, detail=result["error"]) + return result + + @app.post("/api/plans/{plan_id}/events") + async def add_continual_plan_event(plan_id: str, request: ContinualPlanEventRequest) -> dict: + if not plan_store.get_plan(plan_id): + raise HTTPException(status_code=404, detail="Plan not found.") + event = plan_store.add_event(plan_id, request.kind, request.message, request.metadata) + return {"event": event, "plan": plan_store.get_plan(plan_id)} + @app.get("/api/memory") async def inspect_memory(limit: int = 50) -> dict: return memory.inspect(max(1, min(limit, 200))) diff --git a/traderai/tools.py b/traderai/tools.py index 519be99..1f9afb4 100644 --- a/traderai/tools.py +++ b/traderai/tools.py @@ -148,6 +148,7 @@ class PendingAction: endpoint: str payload: dict[str, Any] method: str = "POST" + metadata: dict[str, Any] | None = None class ToolRegistry: @@ -159,6 +160,8 @@ class ToolRegistry: scheduler: WakeScheduler | None = None, scmdb: SCMDBClient | None = None, cornerstone: CornerstoneClient | None = None, + plan_store: Any | None = None, + plan_runner: Any | None = None, ) -> None: self.uex = uex self.scmdb = scmdb or SCMDBClient() @@ -166,6 +169,8 @@ class ToolRegistry: self.require_write_approval = require_write_approval self.memory = memory self.scheduler = scheduler + self.plan_store = plan_store + self.plan_runner = plan_runner self.pending_actions: dict[str, PendingAction] = {} self.handlers: dict[str, ToolHandler] = { "search_marketplace_listings": self.search_marketplace_listings, @@ -178,6 +183,13 @@ class ToolRegistry: "recall_memory": self.recall_memory, "schedule_wake_job": self.schedule_wake_job, "list_wake_jobs": self.list_wake_jobs, + "create_continual_plan": self.create_continual_plan, + "list_continual_plans": self.list_continual_plans, + "get_continual_plan": self.get_continual_plan, + "pause_continual_plan": self.pause_continual_plan, + "resume_continual_plan": self.resume_continual_plan, + "cancel_continual_plan": self.cancel_continual_plan, + "run_continual_plan_now": self.run_continual_plan_now, "check_uex_notifications": self.check_uex_notifications, "list_scmdb_versions": self.list_scmdb_versions, "search_scmdb_missions": self.search_scmdb_missions, @@ -285,6 +297,11 @@ class ToolRegistry: "message": {"type": "string"}, "hash": {"type": "string"}, "id_negotiation": {"type": "integer"}, + "id_listing": {"type": "integer"}, + "plan_id": {"type": "string"}, + "plan_item_id": {"type": "integer"}, + "candidate_id": {"type": "integer"}, + "listing_slug": {"type": "string"}, "is_production": {"type": "integer", "enum": [0, 1], "default": 1}, }, }, @@ -376,6 +393,83 @@ class ToolRegistry: "parameters": {"type": "object", "properties": {}}, }, }, + { + "type": "function", + "function": { + "name": "create_continual_plan", + "description": "Create a durable multi-run plan. Use this for long-running marketplace work over days. kind=buying uses structured listing/candidate tracking; kind=custom continues through an agent wake prompt. All UEX writes are draft-only for approval.", + "parameters": { + "type": "object", + "required": ["title", "objective"], + "properties": { + "title": {"type": "string"}, + "objective": {"type": "string"}, + "kind": {"type": "string", "enum": ["buying", "custom"], "default": "buying"}, + "cadence": {"type": "string", "description": "Five-field cron expression, default every six hours."}, + "constraints": {"type": "object", "description": "Plan-specific options such as message_tone, excluded_sellers, preferred_locations, max_unit_price, or custom instructions."}, + "items": { + "type": "array", + "items": { + "type": "object", + "properties": { + "item_name": {"type": "string"}, + "desired_quantity": {"type": "integer", "minimum": 1}, + "max_unit_price": {"type": "number"}, + }, + }, + }, + }, + }, + }, + }, + { + "type": "function", + "function": { + "name": "list_continual_plans", + "description": "List durable continual plans and their statuses.", + "parameters": {"type": "object", "properties": {"include_inactive": {"type": "boolean", "default": True}}}, + }, + }, + { + "type": "function", + "function": { + "name": "get_continual_plan", + "description": "Get one continual plan with checklist items, candidates, negotiations, and event history.", + "parameters": {"type": "object", "required": ["plan_id"], "properties": {"plan_id": {"type": "string"}}}, + }, + }, + { + "type": "function", + "function": { + "name": "pause_continual_plan", + "description": "Pause a continual plan so scheduled runs stop.", + "parameters": {"type": "object", "required": ["plan_id"], "properties": {"plan_id": {"type": "string"}}}, + }, + }, + { + "type": "function", + "function": { + "name": "resume_continual_plan", + "description": "Resume a paused or needs-input continual plan. It only becomes active when it has checklist items.", + "parameters": {"type": "object", "required": ["plan_id"], "properties": {"plan_id": {"type": "string"}}}, + }, + }, + { + "type": "function", + "function": { + "name": "cancel_continual_plan", + "description": "Cancel a continual plan.", + "parameters": {"type": "object", "required": ["plan_id"], "properties": {"plan_id": {"type": "string"}}}, + }, + }, + { + "type": "function", + "function": { + "name": "run_continual_plan_now", + "description": "Run one continual plan immediately and put the result in the Inbox.", + "parameters": {"type": "object", "required": ["plan_id"], "properties": {"plan_id": {"type": "string"}}}, + }, + }, { "type": "function", "function": { @@ -400,13 +494,17 @@ class ToolRegistry: if not action: return {"error": f"Pending action not found: {action_id}"} if action.method == "DELETE": - return await self.uex.delete(action.endpoint, action.payload, authenticated=True) - return await self.uex.post(action.endpoint, self._production_payload(action.endpoint, action.payload), authenticated=True) + result = await self.uex.delete(action.endpoint, action.payload, authenticated=True) + else: + result = await self.uex.post(action.endpoint, self._production_payload(action.endpoint, action.payload), authenticated=True) + self._record_pending_action_result(action, "approved", result) + return result async def decline(self, action_id: str) -> dict[str, Any]: action = self.pending_actions.pop(action_id, None) if not action: return {"error": f"Pending action not found: {action_id}"} + self._record_pending_action_result(action, "declined", {}) return { "declined": True, "pending_action": { @@ -415,6 +513,7 @@ class ToolRegistry: "method": action.method, "endpoint": action.endpoint, "payload": action.payload, + "metadata": action.metadata or {}, }, } @@ -1046,10 +1145,24 @@ class ToolRegistry: message: str, hash: str | None = None, id_negotiation: int | None = None, + id_listing: int | None = None, + plan_id: str | None = None, + plan_item_id: int | None = None, + candidate_id: int | None = None, + listing_slug: str | None = None, is_production: int = 1, ) -> dict[str, Any]: - payload = {"message": message, "hash": hash, "id_negotiation": id_negotiation, "is_production": is_production} - return self._pending("Send negotiation message", "marketplace_negotiations_messages", payload) + payload = {"message": message, "hash": hash, "id_negotiation": id_negotiation, "id_listing": id_listing, "is_production": is_production} + metadata = { + "plan_id": plan_id, + "plan_item_id": plan_item_id, + "candidate_id": candidate_id, + "listing_id": id_listing, + "listing_slug": listing_slug, + "hash": hash, + "id_negotiation": id_negotiation, + } + return self._pending("Send negotiation message", "marketplace_negotiations_messages", payload, metadata=metadata) async def draft_marketplace_listing(self, **payload: Any) -> dict[str, Any]: return self._pending("Post marketplace listing", "marketplace_advertise", payload) @@ -1083,6 +1196,68 @@ class ToolRegistry: return {"error": "Scheduler is not configured."} return {"scheduled_jobs": self.scheduler.list_jobs()} + async def create_continual_plan( + self, + title: str, + objective: str, + kind: str = "buying", + items: list[dict[str, Any]] | None = None, + constraints: dict[str, Any] | None = None, + cadence: str | None = None, + ) -> dict[str, Any]: + if self.plan_store is None: + return {"error": "Continual plan store is not configured."} + plan = self.plan_store.create_plan(title, kind=kind, objective=objective, items=items or [], constraints=constraints or {}, cadence=cadence) + if self.scheduler is not None and plan.get("status") == "active": + self.scheduler.schedule_plan(plan) + plan = self.plan_store.get_plan(plan["id"]) or plan + return {"plan": plan} + + async def list_continual_plans(self, include_inactive: bool = True) -> dict[str, Any]: + if self.plan_store is None: + return {"error": "Continual plan store is not configured."} + return {"plans": self.plan_store.list_plans(include_inactive=include_inactive)} + + async def get_continual_plan(self, plan_id: str) -> dict[str, Any]: + if self.plan_store is None: + return {"error": "Continual plan store is not configured."} + plan = self.plan_store.get_plan(plan_id) + if not plan: + return {"error": f"Plan not found: {plan_id}"} + return {"plan": plan} + + async def pause_continual_plan(self, plan_id: str) -> dict[str, Any]: + if self.plan_store is None: + return {"error": "Continual plan store is not configured."} + if self.scheduler is not None: + self.scheduler.unschedule_plan(plan_id) + return {"plan": self.plan_store.set_status(plan_id, "paused")} + + async def resume_continual_plan(self, plan_id: str) -> dict[str, Any]: + if self.plan_store is None: + return {"error": "Continual plan store is not configured."} + plan = self.plan_store.get_plan(plan_id) + if not plan: + return {"error": f"Plan not found: {plan_id}"} + next_status = "active" if plan.get("items") else "needs_input" + plan = self.plan_store.set_status(plan_id, next_status) + if self.scheduler is not None and plan and plan.get("status") == "active": + self.scheduler.schedule_plan(plan) + plan = self.plan_store.get_plan(plan_id) + return {"plan": plan} + + async def cancel_continual_plan(self, plan_id: str) -> dict[str, Any]: + if self.plan_store is None: + return {"error": "Continual plan store is not configured."} + if self.scheduler is not None: + self.scheduler.unschedule_plan(plan_id) + return {"plan": self.plan_store.set_status(plan_id, "canceled")} + + async def run_continual_plan_now(self, plan_id: str) -> dict[str, Any]: + if self.plan_runner is None: + return {"error": "Continual plan runner is not configured."} + return await self.plan_runner.run_plan(plan_id) + async def check_uex_notifications(self) -> dict[str, Any]: response = await self.uex.get_user_notifications() notifications = response.get("notifications") or [] @@ -1280,11 +1455,19 @@ class ToolRegistry: "locations": locations[:limit], } - def _pending(self, label: str, endpoint: str, payload: dict[str, Any], method: str = "POST") -> dict[str, Any]: + def _pending( + self, + label: str, + endpoint: str, + payload: dict[str, Any], + method: str = "POST", + metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: action_id = str(uuid.uuid4()) payload = {key: value for key, value in payload.items() if value is not None} + metadata = {key: value for key, value in (metadata or {}).items() if value is not None} payload = self._production_payload(endpoint, payload) - self.pending_actions[action_id] = PendingAction(action_id, label, endpoint, payload, method) + self.pending_actions[action_id] = PendingAction(action_id, label, endpoint, payload, method, metadata) return { "pending_action": { "id": action_id, @@ -1292,10 +1475,27 @@ class ToolRegistry: "method": method, "endpoint": endpoint, "payload": payload, + "metadata": metadata, "approval_required": self.require_write_approval, } } + def _record_pending_action_result(self, action: PendingAction, result_kind: str, result: dict[str, Any]) -> None: + metadata = action.metadata or {} + plan_id = metadata.get("plan_id") + if not plan_id or self.plan_store is None: + return + message = f"{action.label} {result_kind} for continual plan." + event_metadata = {"action_id": action.id, "endpoint": action.endpoint, "payload": action.payload, "result": result, **metadata} + self.plan_store.add_event(plan_id, result_kind, message, event_metadata) + if result_kind == "approved" and action.endpoint == "marketplace_negotiations_messages": + self.plan_store.add_negotiation( + plan_id, + metadata.get("plan_item_id"), + metadata.get("candidate_id"), + {**metadata, "status": "approved"}, + ) + @staticmethod def _production_payload(endpoint: str, payload: dict[str, Any]) -> dict[str, Any]: if endpoint not in UEX_PRODUCTION_WRITE_RESOURCES: diff --git a/web/app.js b/web/app.js index 9ab49ee..82fd420 100644 --- a/web/app.js +++ b/web/app.js @@ -13,9 +13,11 @@ const configStatusEl = document.getElementById("config-status"); const configPathsEl = document.getElementById("config-paths"); const settingsToggle = document.getElementById("settings-toggle"); const memoryToggle = document.getElementById("memory-toggle"); +const plansToggle = document.getElementById("plans-toggle"); const ollamaToggle = document.getElementById("ollama-toggle"); const settingsPanel = document.getElementById("settings-panel"); const memoryPanel = document.getElementById("memory-panel"); +const plansPanel = document.getElementById("plans-panel"); const ollamaPanel = document.getElementById("ollama-panel"); const ollamaForm = document.getElementById("ollama-config-form"); const ollamaRefreshButton = document.getElementById("ollama-refresh"); @@ -47,6 +49,10 @@ const updateModalCopy = document.getElementById("update-modal-copy"); const updateModalClose = document.getElementById("update-modal-close"); const updateModalInstall = document.getElementById("update-modal-install"); const updateModalReleases = document.getElementById("update-modal-releases"); +const plansRefreshButton = document.getElementById("plans-refresh"); +const planForm = document.getElementById("plan-form"); +const plansStatusEl = document.getElementById("plans-status"); +const plansDashboardEl = document.getElementById("plans-dashboard"); let ollamaOnline = true; let latestUpdate = null; @@ -736,6 +742,7 @@ function toggleSidebarPanel(panelName) { const panels = { settings: { panel: settingsPanel, button: settingsToggle }, memory: { panel: memoryPanel, button: memoryToggle }, + plans: { panel: plansPanel, button: plansToggle }, ollama: { panel: ollamaPanel, button: ollamaToggle }, }; const target = panels[panelName]; @@ -756,6 +763,7 @@ function toggleSidebarPanel(panelName) { checkForUpdate(); } if (panelName === "memory") refreshMemory(); + if (panelName === "plans") refreshPlans(); if (panelName === "ollama") { refreshConfig(); refreshOllamaStatus(); @@ -1002,6 +1010,178 @@ async function submitNegotiationMessage(event) { } } +function parsePlanItems(text) { + return text + .split(/\r?\n/) + .map((line) => line.trim()) + .filter(Boolean) + .map((line) => { + const [name, quantity, maxPrice] = line.split("|").map((part) => part.trim()); + const item = { item_name: name }; + if (quantity) item.desired_quantity = Math.max(1, Number.parseInt(quantity, 10) || 1); + if (maxPrice) item.max_unit_price = Number(maxPrice.replace(/,/g, "")); + return item; + }); +} + +async function createPlan(event) { + event.preventDefault(); + const title = document.getElementById("plan-title").value.trim(); + const objective = document.getElementById("plan-objective").value.trim(); + if (!title || !objective) return; + const tone = document.getElementById("plan-tone").value.trim(); + const instructions = document.getElementById("plan-instructions").value.trim(); + const constraints = {}; + if (tone) constraints.message_tone = tone; + if (instructions) constraints.instructions = instructions; + const payload = { + title, + objective, + kind: document.getElementById("plan-kind").value || "buying", + cadence: document.getElementById("plan-cadence").value.trim() || null, + constraints, + items: parsePlanItems(document.getElementById("plan-items").value || ""), + }; + plansStatusEl.textContent = "Creating plan"; + try { + const response = await fetch("/api/plans", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(payload), + }); + const result = await response.json(); + if (!response.ok) throw new Error(result.detail || `HTTP ${response.status}`); + planForm.reset(); + plansStatusEl.textContent = result.plan?.status === "needs_input" + ? "Plan created, but it needs an item checklist." + : "Plan created"; + await refreshPlans(result.plan?.id); + } catch (error) { + plansStatusEl.textContent = `Plan create failed: ${fetchErrorMessage(error)}`; + } +} + +async function refreshPlans(openPlanId = null) { + if (!plansDashboardEl) return; + try { + const response = await fetch("/api/plans"); + const result = await response.json(); + await renderPlans(result.plans || [], openPlanId); + } catch (error) { + plansDashboardEl.textContent = `Plans failed: ${fetchErrorMessage(error)}`; + } +} + +async function renderPlans(plans, openPlanId = null) { + plansDashboardEl.innerHTML = ""; + if (!plans.length) { + plansDashboardEl.innerHTML = '
No continual plans
'; + return; + } + for (const plan of plans) { + const card = document.createElement("article"); + card.className = `plan-card${plan.status === "active" ? " active" : ""}`; + const title = document.createElement("h3"); + title.textContent = plan.title || "Untitled plan"; + const meta = document.createElement("div"); + meta.className = "plan-meta"; + meta.textContent = plan.objective || ""; + const pills = document.createElement("div"); + pills.className = "plan-pill-row"; + for (const value of [plan.status, plan.kind, plan.next_run_at ? `next ${formatShortDate(plan.next_run_at)}` : "not scheduled"]) { + const pill = document.createElement("span"); + pill.className = "plan-pill"; + pill.textContent = value; + pills.appendChild(pill); + } + const controls = document.createElement("div"); + controls.className = "plan-controls"; + controls.append( + planButton("Details", () => loadPlanDetail(plan.id, card)), + planButton("Run", () => postPlanAction(plan.id, "run")), + planButton(plan.status === "active" ? "Pause" : "Resume", () => postPlanAction(plan.id, plan.status === "active" ? "pause" : "resume")), + planButton("Cancel", () => postPlanAction(plan.id, "cancel"), "secondary small-button") + ); + card.append(title, meta, pills, controls); + plansDashboardEl.appendChild(card); + if (openPlanId && plan.id === openPlanId) await loadPlanDetail(plan.id, card); + } +} + +function planButton(label, onClick, className = "small-button") { + const button = document.createElement("button"); + button.type = "button"; + button.className = className; + button.textContent = label; + button.addEventListener("click", onClick); + return button; +} + +async function loadPlanDetail(planId, card) { + const existing = card.querySelector(".plan-detail"); + if (existing) { + existing.remove(); + return; + } + const response = await fetch(`/api/plans/${encodeURIComponent(planId)}`); + const result = await response.json(); + const plan = result.plan; + const detail = document.createElement("div"); + detail.className = "plan-detail"; + detail.append( + planSection("Checklist", (plan.items || []).map((item) => `${item.item_name}: ${item.acquired_quantity || 0}/${item.desired_quantity || 1}${item.max_unit_price ? `, max ${Number(item.max_unit_price).toLocaleString()} UEC` : ""} (${item.status})`)), + planSection("Best Candidates", bestCandidateLines(plan)), + planSection("Recent Events", (plan.events || []).slice(0, 5).map((event) => `${formatShortDate(event.created_at)} ${event.kind}: ${event.message}`)) + ); + card.appendChild(detail); +} + +function planSection(title, lines) { + const wrapper = document.createElement("section"); + const heading = document.createElement("h4"); + heading.textContent = title; + const list = document.createElement("ul"); + list.className = "plan-list"; + const items = lines.length ? lines : ["Empty"]; + for (const line of items) { + const item = document.createElement("li"); + item.textContent = line; + list.appendChild(item); + } + wrapper.append(heading, list); + return wrapper; +} + +function bestCandidateLines(plan) { + const byItem = new Map((plan.items || []).map((item) => [item.id, item.item_name])); + return (plan.candidates || []) + .filter((candidate) => candidate.status === "current" || candidate.status === "drafted") + .slice(0, 6) + .map((candidate) => `${byItem.get(candidate.plan_item_id) || "Item"}: ${candidate.title || candidate.listing_slug || candidate.listing_id} at ${Number(candidate.price || 0).toLocaleString()} ${candidate.currency || "UEC"} from ${candidate.seller || "unknown"} (${candidate.status})`); +} + +async function postPlanAction(planId, action) { + plansStatusEl.textContent = `${action} requested`; + try { + const response = await fetch(`/api/plans/${encodeURIComponent(planId)}/${action}`, { method: "POST" }); + const result = await response.json(); + if (!response.ok) throw new Error(result.detail || `HTTP ${response.status}`); + plansStatusEl.textContent = result.summary || `Plan ${action} complete`; + await refreshPlans(planId); + await refreshPending(); + await refreshInbox(); + } catch (error) { + plansStatusEl.textContent = `Plan ${action} failed: ${fetchErrorMessage(error)}`; + } +} + +function formatShortDate(value) { + if (!value) return ""; + const date = new Date(value); + if (Number.isNaN(date.getTime())) return value; + return date.toLocaleString([], { month: "short", day: "numeric", hour: "numeric", minute: "2-digit" }); +} + async function checkHealth() { try { const response = await fetch("/api/health"); @@ -1205,7 +1385,10 @@ configRefreshButton?.addEventListener("click", refreshConfig); configForm?.addEventListener("submit", saveConfig); settingsToggle?.addEventListener("click", () => toggleSidebarPanel("settings")); memoryToggle?.addEventListener("click", () => toggleSidebarPanel("memory")); +plansToggle?.addEventListener("click", () => toggleSidebarPanel("plans")); ollamaToggle?.addEventListener("click", () => toggleSidebarPanel("ollama")); +plansRefreshButton?.addEventListener("click", () => refreshPlans()); +planForm?.addEventListener("submit", createPlan); ollamaForm?.addEventListener("submit", saveOllamaConfig); ollamaRefreshButton?.addEventListener("click", refreshOllamaStatus); ollamaDownloadButton?.addEventListener("click", () => { @@ -1320,6 +1503,7 @@ async function sendMessage() { refreshPending(); refreshMemory(); +refreshPlans(); refreshConfig(); refreshOllamaStatus(); refreshChats().then(() => loadChatMessages(currentThreadId)); diff --git a/web/index.html b/web/index.html index 27f60f4..5ced0b9 100644 --- a/web/index.html +++ b/web/index.html @@ -69,6 +69,10 @@ Memory +
+