feat: plans - longrunning tasks
This commit is contained in:
+73
-2
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
@@ -10,7 +10,7 @@ from apscheduler.triggers.date import DateTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from tzlocal import get_localzone
|
||||
|
||||
from traderai.memory import MemoryStore, iso_now, time_since
|
||||
from traderai.memory import MemoryStore, iso_now, parse_iso, time_since, utc_now
|
||||
|
||||
|
||||
UEX_NOTIFICATION_JOB_ID = "uex-notification-poll"
|
||||
@@ -22,11 +22,15 @@ class WakeScheduler:
|
||||
self.scheduler = AsyncIOScheduler(timezone=get_localzone())
|
||||
self.agent = None
|
||||
self.uex = None
|
||||
self.plan_runner = None
|
||||
self.notification_poll_seconds = 60
|
||||
|
||||
def bind_agent(self, agent: Any) -> None:
|
||||
self.agent = agent
|
||||
|
||||
def bind_plan_runner(self, plan_runner: Any) -> None:
|
||||
self.plan_runner = plan_runner
|
||||
|
||||
def bind_uex_notifications(self, uex: Any, poll_seconds: int = 60) -> None:
|
||||
self.uex = uex
|
||||
self.notification_poll_seconds = max(15, poll_seconds)
|
||||
@@ -37,6 +41,9 @@ class WakeScheduler:
|
||||
self._schedule_notification_poll()
|
||||
for job in self.memory.list_jobs():
|
||||
self._schedule_existing(job)
|
||||
if self.plan_runner is not None:
|
||||
for plan in self.plan_runner.store.list_plans(include_inactive=False):
|
||||
self.schedule_plan(plan)
|
||||
|
||||
def shutdown(self) -> None:
|
||||
if self.scheduler.running:
|
||||
@@ -59,6 +66,70 @@ class WakeScheduler:
|
||||
def list_jobs(self) -> list[dict[str, Any]]:
|
||||
return self.memory.list_jobs()
|
||||
|
||||
def schedule_plan(self, plan: dict[str, Any]) -> dict[str, Any]:
|
||||
if self.plan_runner is None or plan.get("status") != "active":
|
||||
return plan
|
||||
job_id = self._plan_job_id(plan["id"])
|
||||
previous_next_run = plan.get("next_run_at")
|
||||
trigger = CronTrigger.from_crontab(plan.get("cadence") or "0 */6 * * *")
|
||||
self.scheduler.add_job(self._run_plan, trigger=trigger, id=job_id, args=[plan["id"]], replace_existing=True)
|
||||
job = self.scheduler.get_job(job_id)
|
||||
next_run = job.next_run_time if job else None
|
||||
self.plan_runner.store.update_schedule(plan["id"], next_run.isoformat() if next_run else None)
|
||||
if self._plan_is_overdue(previous_next_run):
|
||||
catchup_id = self._plan_catchup_job_id(plan["id"])
|
||||
self.scheduler.add_job(
|
||||
self._run_plan,
|
||||
trigger=DateTrigger(run_date=datetime.now() + timedelta(seconds=5)),
|
||||
id=catchup_id,
|
||||
args=[plan["id"]],
|
||||
replace_existing=True,
|
||||
)
|
||||
self.plan_runner.store.add_event(
|
||||
plan["id"],
|
||||
"catchup_scheduled",
|
||||
"Plan was overdue while the app was closed, so a one-time catch-up run was scheduled after startup.",
|
||||
{"previous_next_run_at": previous_next_run},
|
||||
)
|
||||
return self.plan_runner.store.get_plan(plan["id"]) or plan
|
||||
|
||||
def unschedule_plan(self, plan_id: str) -> None:
|
||||
job_id = self._plan_job_id(plan_id)
|
||||
if self.scheduler.get_job(job_id):
|
||||
self.scheduler.remove_job(job_id)
|
||||
catchup_id = self._plan_catchup_job_id(plan_id)
|
||||
if self.scheduler.get_job(catchup_id):
|
||||
self.scheduler.remove_job(catchup_id)
|
||||
if self.plan_runner is not None:
|
||||
self.plan_runner.store.update_schedule(plan_id, None)
|
||||
|
||||
async def _run_plan(self, plan_id: str) -> None:
|
||||
if self.plan_runner is None:
|
||||
return
|
||||
result = await self.plan_runner.run_plan(plan_id)
|
||||
plan = result.get("plan") or self.plan_runner.store.get_plan(plan_id)
|
||||
if plan and plan.get("status") == "active":
|
||||
job = self.scheduler.get_job(self._plan_job_id(plan_id))
|
||||
next_run = job.next_run_time if job else None
|
||||
self.plan_runner.store.update_schedule(plan_id, next_run.isoformat() if next_run else None)
|
||||
|
||||
@staticmethod
|
||||
def _plan_job_id(plan_id: str) -> str:
|
||||
return f"continual-{plan_id}"
|
||||
|
||||
@staticmethod
|
||||
def _plan_catchup_job_id(plan_id: str) -> str:
|
||||
return f"continual-catchup-{plan_id}"
|
||||
|
||||
@staticmethod
|
||||
def _plan_is_overdue(next_run_at: str | None) -> bool:
|
||||
if not next_run_at:
|
||||
return False
|
||||
try:
|
||||
return parse_iso(next_run_at) <= utc_now()
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def _schedule_existing(self, job: dict[str, Any]) -> None:
|
||||
if job["trigger_type"] == "cron":
|
||||
trigger = CronTrigger.from_crontab(job["trigger_value"])
|
||||
|
||||
Reference in New Issue
Block a user