from __future__ import annotations from datetime import datetime from typing import Any from uuid import uuid4 from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from tzlocal import get_localzone from traderai.memory import MemoryStore, iso_now, time_since class WakeScheduler: def __init__(self, memory: MemoryStore) -> None: self.memory = memory self.scheduler = AsyncIOScheduler(timezone=get_localzone()) self.agent = None def bind_agent(self, agent: Any) -> None: self.agent = agent def start(self) -> None: if not self.scheduler.running: self.scheduler.start() for job in self.memory.list_jobs(): self._schedule_existing(job) def shutdown(self) -> None: if self.scheduler.running: self.scheduler.shutdown(wait=False) def schedule_date(self, run_at: str, prompt: str, job_id: str | None = None) -> dict[str, Any]: parsed = datetime.fromisoformat(run_at) job_id = job_id or f"wake-{uuid4()}" trigger = DateTrigger(run_date=parsed) self.scheduler.add_job(self._run_job, trigger=trigger, id=job_id, args=[job_id, prompt], replace_existing=True) return self.memory.add_job(job_id, prompt, "date", run_at, parsed.isoformat()) def schedule_cron(self, cron: str, prompt: str, job_id: str | None = None) -> dict[str, Any]: job_id = job_id or f"wake-{uuid4()}" trigger = CronTrigger.from_crontab(cron) self.scheduler.add_job(self._run_job, trigger=trigger, id=job_id, args=[job_id, prompt], replace_existing=True) next_run = self.scheduler.get_job(job_id).next_run_time return self.memory.add_job(job_id, prompt, "cron", cron, next_run.isoformat() if next_run else None) def list_jobs(self) -> list[dict[str, Any]]: return self.memory.list_jobs() def _schedule_existing(self, job: dict[str, Any]) -> None: if job["trigger_type"] == "cron": trigger = CronTrigger.from_crontab(job["trigger_value"]) elif job["trigger_type"] == "date": trigger = DateTrigger(run_date=datetime.fromisoformat(job["trigger_value"])) else: return self.scheduler.add_job( self._run_job, trigger=trigger, id=job["id"], args=[job["id"], job["prompt"]], replace_existing=True, ) async def _run_job(self, job_id: str, prompt: str) -> None: last = self.memory.last_interaction() last_text = f"{last['created_at']} ({time_since(last['created_at'])})" if last else "never" wake_message = ( f"Scheduled wake job fired. Current time is {iso_now()}. " f"The last chat interaction was {last_text}. Job instruction: {prompt}" ) if self.agent is None: self.memory.add_outbox(wake_message) return text = await self.agent.generate_wake_response(wake_message) self.memory.add_outbox(text) self.memory.mark_job_run(job_id)