import pytest from datetime import timedelta from traderai.memory import MemoryStore, utc_now from traderai.plans import ContinualPlanRunner, ContinualPlanStore from traderai.scheduler import WakeScheduler from traderai.tools import ToolRegistry class BuyingUEX: def __init__(self): self.posts = [] async def get(self, path, params=None, authenticated=False): if path == "marketplace_listings": return { "data": [ { "id": 501, "slug": "wikelo-panel-good", "title": "Wikelo Idris panel", "operation": "sell", "type": "item", "price": 450_000, "currency": "UEC", "in_stock": 2, "location": "Orison", "user_username": "seller_a", }, { "id": 502, "slug": "wikelo-panel-expensive", "title": "Wikelo Idris panel premium", "operation": "sell", "type": "item", "price": 900_000, "currency": "UEC", "in_stock": 1, "location": "Area18", "user_username": "seller_b", }, ], } return {"data": []} async def post(self, path, payload, authenticated=True): self.posts.append({"path": path, "payload": payload, "authenticated": authenticated}) return {"status": "ok", "posted": self.posts[-1]} async def delete(self, path, params=None, authenticated=True): return {"status": "ok"} class FakePlanAgent: def __init__(self): self.prompts = [] async def generate_wake_response(self, wake_message): self.prompts.append(wake_message) return "Custom plan checked notifications and found no blockers." def plan_stack(tmp_path): memory = MemoryStore(str(tmp_path / "memory.sqlite3")) store = ContinualPlanStore(memory) scheduler = WakeScheduler(memory) tools = ToolRegistry(BuyingUEX(), memory=memory, scheduler=scheduler, plan_store=store) runner = ContinualPlanRunner(store, tools, memory) tools.plan_runner = runner scheduler.bind_plan_runner(runner) return memory, store, tools, runner, scheduler def test_continual_plan_store_creates_needs_input_plan(tmp_path): _, store, _, _, _ = plan_stack(tmp_path) plan = store.create_plan("Wikelo Idris", objective="Get all parts", items=[]) assert plan["status"] == "needs_input" assert plan["items"] == [] assert plan["events"][0]["kind"] == "needs_input" def test_custom_plan_without_items_is_active(tmp_path): _, store, _, _, _ = plan_stack(tmp_path) plan = store.create_plan("Watch negotiations", kind="custom", objective="Check replies and summarize next steps", items=[]) assert plan["status"] == "active" assert plan["items"] == [] def test_continual_plan_store_creates_buying_checklist(tmp_path): _, store, _, _, _ = plan_stack(tmp_path) plan = store.create_plan( "Wikelo Idris", objective="Get all listed parts", items=[{"item_name": "Wikelo Idris panel", "desired_quantity": 2, "max_unit_price": 500_000}], ) assert plan["status"] == "active" assert plan["items"][0]["item_name"] == "Wikelo Idris panel" assert plan["items"][0]["desired_quantity"] == 2 def test_continual_plan_store_deletes_plan_and_related_records(tmp_path): _, store, _, _, _ = plan_stack(tmp_path) plan = store.create_plan( "Delete me", objective="Remove everything", items=[{"item_name": "Wikelo Idris panel", "desired_quantity": 1}], ) item_id = int(plan["items"][0]["id"]) candidate = store.upsert_candidate(plan["id"], item_id, {"id": "listing-1", "title": "Panel", "price": 10}, 0.9) store.add_negotiation(plan["id"], item_id, int(candidate["id"]), {"listing_id": "listing-1", "listing_slug": "panel", "id_negotiation": "neg-1", "hash": "hash-1"}) assert store.delete_plan(plan["id"]) is True assert store.get_plan(plan["id"]) is None assert store.list_items(plan["id"]) == [] assert store.list_candidates(plan["id"]) == [] assert store.list_negotiations(plan["id"]) == [] assert store.list_events(plan["id"]) == [] @pytest.mark.asyncio async def test_buying_runner_tracks_candidates_and_drafts_only(tmp_path): memory, store, tools, runner, _ = plan_stack(tmp_path) plan = store.create_plan( "Wikelo Idris", objective="Get all listed parts", items=[{"item_name": "Wikelo Idris panel", "desired_quantity": 1, "max_unit_price": 500_000}], ) result = await runner.run_plan(plan["id"]) snapshot = store.get_plan(plan["id"]) assert result["drafted"] == 1 assert any(candidate["listing_id"] == "501" and candidate["status"] == "drafted" for candidate in snapshot["candidates"]) assert snapshot["negotiations"][0]["status"] == "drafted" assert len(tools.pending_actions) == 1 assert not tools.uex.posts assert "Drafted 1 negotiation" in memory.list_outbox()[0]["content"] @pytest.mark.asyncio async def test_plan_approval_logs_back_to_plan(tmp_path): _, store, tools, runner, _ = plan_stack(tmp_path) plan = store.create_plan( "Wikelo Idris", objective="Get all listed parts", items=[{"item_name": "Wikelo Idris panel", "max_unit_price": 500_000}], ) await runner.run_plan(plan["id"]) action_id = next(iter(tools.pending_actions)) approved = await tools.approve(action_id) snapshot = store.get_plan(plan["id"]) assert approved["posted"]["path"] == "marketplace_negotiations_messages" assert any(event["kind"] == "approved" for event in snapshot["events"]) assert any(negotiation["status"] == "approved" for negotiation in snapshot["negotiations"]) @pytest.mark.asyncio async def test_custom_runner_continues_plan_through_agent(tmp_path): memory, store, tools, runner, _ = plan_stack(tmp_path) agent = FakePlanAgent() runner.bind_agent(agent) plan = store.create_plan( "Watch open negotiations", kind="custom", objective="Check UEX replies and recommend next action", constraints={"instructions": "Pay attention to stale buyer replies."}, items=[], ) result = await runner.run_plan(plan["id"]) snapshot = store.get_plan(plan["id"]) assert result["status"] == "ok" assert "Custom plan checked notifications" in result["summary"] assert plan["id"] in agent.prompts[0] assert any(event["kind"] == "run" for event in snapshot["events"]) assert "Custom plan checked notifications" in memory.list_outbox()[0]["content"] @pytest.mark.asyncio async def test_scheduler_plan_run_survives_runner_error(tmp_path): memory = MemoryStore(str(tmp_path / "memory.sqlite3")) store = ContinualPlanStore(memory) plan = store.create_plan( "Broken plan", objective="Test failure handling", items=[{"item_name": "Wikelo Idris panel"}], ) class FailingRunner: def __init__(self, store): self.store = store async def run_plan(self, plan_id): self.store.add_event(plan_id, "error", "boom") memory.add_outbox("Broken plan: boom") return {"error": "boom", "plan": self.store.get_plan(plan_id)} scheduler = WakeScheduler(memory) scheduler.bind_plan_runner(FailingRunner(store)) await scheduler._run_plan(plan["id"]) snapshot = store.get_plan(plan["id"]) assert snapshot["status"] == "active" assert snapshot["events"][0]["kind"] == "error" assert "boom" in memory.list_outbox()[0]["content"] @pytest.mark.asyncio async def test_scheduler_schedules_overdue_plan_catchup_on_start(tmp_path): memory, store, _, runner, scheduler = plan_stack(tmp_path) plan = store.create_plan( "Overdue plan", objective="Check after restart", items=[{"item_name": "Wikelo Idris panel"}], ) store.update_schedule(plan["id"], (utc_now() - timedelta(minutes=5)).isoformat()) scheduler.start() try: catchup = scheduler.scheduler.get_job(scheduler._plan_catchup_job_id(plan["id"])) snapshot = store.get_plan(plan["id"]) finally: scheduler.shutdown() assert catchup is not None assert any(event["kind"] == "catchup_scheduled" for event in snapshot["events"]) @pytest.mark.asyncio async def test_tools_delete_continual_plan_removes_it(tmp_path): _, store, tools, _, _ = plan_stack(tmp_path) plan = store.create_plan( "Delete through tools", objective="Remove via registry", items=[{"item_name": "Wikelo Idris panel"}], ) result = await tools.delete_continual_plan(plan["id"]) assert result["deleted"] is True assert result["plan_id"] == plan["id"] assert store.get_plan(plan["id"]) is None