Files
HRiggs 454bb57484
Build Release EXE / build-windows-exe (release) Successful in 1m2s
feat: deepseek
2026-06-08 23:41:46 -04:00

257 lines
9.2 KiB
Python

import pytest
from datetime import timedelta
from traderai.memory import MemoryStore, utc_now
from traderai.plans import ContinualPlanRunner, ContinualPlanStore
from traderai.scheduler import WakeScheduler
from traderai.tools import ToolRegistry
class BuyingUEX:
def __init__(self):
self.posts = []
async def get(self, path, params=None, authenticated=False):
if path == "marketplace_listings":
return {
"data": [
{
"id": 501,
"slug": "wikelo-panel-good",
"title": "Wikelo Idris panel",
"operation": "sell",
"type": "item",
"price": 450_000,
"currency": "UEC",
"in_stock": 2,
"location": "Orison",
"user_username": "seller_a",
},
{
"id": 502,
"slug": "wikelo-panel-expensive",
"title": "Wikelo Idris panel premium",
"operation": "sell",
"type": "item",
"price": 900_000,
"currency": "UEC",
"in_stock": 1,
"location": "Area18",
"user_username": "seller_b",
},
],
}
return {"data": []}
async def post(self, path, payload, authenticated=True):
self.posts.append({"path": path, "payload": payload, "authenticated": authenticated})
return {"status": "ok", "posted": self.posts[-1]}
async def delete(self, path, params=None, authenticated=True):
return {"status": "ok"}
class FakePlanAgent:
def __init__(self):
self.prompts = []
async def generate_wake_response(self, wake_message):
self.prompts.append(wake_message)
return "Custom plan checked notifications and found no blockers."
def plan_stack(tmp_path):
memory = MemoryStore(str(tmp_path / "memory.sqlite3"))
store = ContinualPlanStore(memory)
scheduler = WakeScheduler(memory)
tools = ToolRegistry(BuyingUEX(), memory=memory, scheduler=scheduler, plan_store=store)
runner = ContinualPlanRunner(store, tools, memory)
tools.plan_runner = runner
scheduler.bind_plan_runner(runner)
return memory, store, tools, runner, scheduler
def test_continual_plan_store_creates_needs_input_plan(tmp_path):
_, store, _, _, _ = plan_stack(tmp_path)
plan = store.create_plan("Wikelo Idris", objective="Get all parts", items=[])
assert plan["status"] == "needs_input"
assert plan["items"] == []
assert plan["events"][0]["kind"] == "needs_input"
def test_custom_plan_without_items_is_active(tmp_path):
_, store, _, _, _ = plan_stack(tmp_path)
plan = store.create_plan("Watch negotiations", kind="custom", objective="Check replies and summarize next steps", items=[])
assert plan["status"] == "active"
assert plan["items"] == []
def test_continual_plan_store_creates_buying_checklist(tmp_path):
_, store, _, _, _ = plan_stack(tmp_path)
plan = store.create_plan(
"Wikelo Idris",
objective="Get all listed parts",
items=[{"item_name": "Wikelo Idris panel", "desired_quantity": 2, "max_unit_price": 500_000}],
)
assert plan["status"] == "active"
assert plan["items"][0]["item_name"] == "Wikelo Idris panel"
assert plan["items"][0]["desired_quantity"] == 2
def test_continual_plan_store_deletes_plan_and_related_records(tmp_path):
_, store, _, _, _ = plan_stack(tmp_path)
plan = store.create_plan(
"Delete me",
objective="Remove everything",
items=[{"item_name": "Wikelo Idris panel", "desired_quantity": 1}],
)
item_id = int(plan["items"][0]["id"])
candidate = store.upsert_candidate(plan["id"], item_id, {"id": "listing-1", "title": "Panel", "price": 10}, 0.9)
store.add_negotiation(plan["id"], item_id, int(candidate["id"]), {"listing_id": "listing-1", "listing_slug": "panel", "id_negotiation": "neg-1", "hash": "hash-1"})
assert store.delete_plan(plan["id"]) is True
assert store.get_plan(plan["id"]) is None
assert store.list_items(plan["id"]) == []
assert store.list_candidates(plan["id"]) == []
assert store.list_negotiations(plan["id"]) == []
assert store.list_events(plan["id"]) == []
@pytest.mark.asyncio
async def test_buying_runner_tracks_candidates_and_drafts_only(tmp_path):
memory, store, tools, runner, _ = plan_stack(tmp_path)
plan = store.create_plan(
"Wikelo Idris",
objective="Get all listed parts",
items=[{"item_name": "Wikelo Idris panel", "desired_quantity": 1, "max_unit_price": 500_000}],
)
result = await runner.run_plan(plan["id"])
snapshot = store.get_plan(plan["id"])
assert result["drafted"] == 1
assert any(candidate["listing_id"] == "501" and candidate["status"] == "drafted" for candidate in snapshot["candidates"])
assert snapshot["negotiations"][0]["status"] == "drafted"
assert len(tools.pending_actions) == 1
assert not tools.uex.posts
assert "Drafted 1 negotiation" in memory.list_outbox()[0]["content"]
pending = next(iter(tools.pending_actions.values()))
assert "Tone note" not in pending.payload["message"]
assert "Polaris build" not in pending.payload["message"] or "putting together parts for a Polaris build" in pending.payload["message"]
@pytest.mark.asyncio
async def test_plan_approval_logs_back_to_plan(tmp_path):
_, store, tools, runner, _ = plan_stack(tmp_path)
plan = store.create_plan(
"Wikelo Idris",
objective="Get all listed parts",
items=[{"item_name": "Wikelo Idris panel", "max_unit_price": 500_000}],
)
await runner.run_plan(plan["id"])
action_id = next(iter(tools.pending_actions))
approved = await tools.approve(action_id)
snapshot = store.get_plan(plan["id"])
assert approved["posted"]["path"] == "marketplace_negotiations_messages"
assert any(event["kind"] == "approved" for event in snapshot["events"])
assert any(negotiation["status"] == "approved" for negotiation in snapshot["negotiations"])
@pytest.mark.asyncio
async def test_custom_runner_continues_plan_through_agent(tmp_path):
memory, store, tools, runner, _ = plan_stack(tmp_path)
agent = FakePlanAgent()
runner.bind_agent(agent)
plan = store.create_plan(
"Watch open negotiations",
kind="custom",
objective="Check UEX replies and recommend next action",
constraints={"instructions": "Pay attention to stale buyer replies."},
items=[],
)
result = await runner.run_plan(plan["id"])
snapshot = store.get_plan(plan["id"])
assert result["status"] == "ok"
assert "Custom plan checked notifications" in result["summary"]
assert plan["id"] in agent.prompts[0]
assert any(event["kind"] == "run" for event in snapshot["events"])
assert "Custom plan checked notifications" in memory.list_outbox()[0]["content"]
@pytest.mark.asyncio
async def test_scheduler_plan_run_survives_runner_error(tmp_path):
memory = MemoryStore(str(tmp_path / "memory.sqlite3"))
store = ContinualPlanStore(memory)
plan = store.create_plan(
"Broken plan",
objective="Test failure handling",
items=[{"item_name": "Wikelo Idris panel"}],
)
class FailingRunner:
def __init__(self, store):
self.store = store
async def run_plan(self, plan_id):
self.store.add_event(plan_id, "error", "boom")
memory.add_outbox("Broken plan: boom")
return {"error": "boom", "plan": self.store.get_plan(plan_id)}
scheduler = WakeScheduler(memory)
scheduler.bind_plan_runner(FailingRunner(store))
await scheduler._run_plan(plan["id"])
snapshot = store.get_plan(plan["id"])
assert snapshot["status"] == "active"
assert snapshot["events"][0]["kind"] == "error"
assert "boom" in memory.list_outbox()[0]["content"]
@pytest.mark.asyncio
async def test_scheduler_schedules_overdue_plan_catchup_on_start(tmp_path):
memory, store, _, runner, scheduler = plan_stack(tmp_path)
plan = store.create_plan(
"Overdue plan",
objective="Check after restart",
items=[{"item_name": "Wikelo Idris panel"}],
)
store.update_schedule(plan["id"], (utc_now() - timedelta(minutes=5)).isoformat())
scheduler.start()
try:
catchup = scheduler.scheduler.get_job(scheduler._plan_catchup_job_id(plan["id"]))
snapshot = store.get_plan(plan["id"])
finally:
scheduler.shutdown()
assert catchup is not None
assert any(event["kind"] == "catchup_scheduled" for event in snapshot["events"])
@pytest.mark.asyncio
async def test_tools_delete_continual_plan_removes_it(tmp_path):
_, store, tools, _, _ = plan_stack(tmp_path)
plan = store.create_plan(
"Delete through tools",
objective="Remove via registry",
items=[{"item_name": "Wikelo Idris panel"}],
)
result = await tools.delete_continual_plan(plan["id"])
assert result["deleted"] is True
assert result["plan_id"] == plan["id"]
assert store.get_plan(plan["id"]) is None