Files
TraderAI/traderai/server.py
T
HRiggs 454bb57484
Build Release EXE / build-windows-exe (release) Successful in 1m2s
feat: deepseek
2026-06-08 23:41:46 -04:00

1337 lines
50 KiB
Python

from __future__ import annotations
import os
import asyncio
import json
import shutil
import subprocess
import sys
import threading
import time
import webbrowser
from pathlib import Path
from typing import Any
import httpx
from fastapi import FastAPI
from fastapi import HTTPException
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from traderai.agent import OllamaAgent, OllamaUnavailable
from traderai.config import save_settings, settings_payload
from traderai.config import get_settings
from traderai.cornerstone_client import CornerstoneClient
from traderai.memory import DEFAULT_THREAD_ID, MemoryStore
from traderai.plans import ContinualPlanRunner, ContinualPlanStore
from traderai.scheduler import WakeScheduler
from traderai.scmdb_client import SCMDBClient
from traderai.starcitizen_wiki_client import StarCitizenWikiClient
from traderai.tools import ToolRegistry
from traderai.uex_client import UEXClient
from traderai.version import RELEASES_API_URL, RELEASES_URL, __version__
from traderai.wikelo_projects_client import WikeloProjectsClient
def resource_path(*parts: str) -> Path:
base = Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent.parent))
return base.joinpath(*parts)
class ChatRequest(BaseModel):
message: str
thread_id: str | None = DEFAULT_THREAD_ID
images: list["ChatImageRequest"] = []
class ChatImageRequest(BaseModel):
name: str = "pasted-image.png"
content_type: str = "image/png"
image_data: str
class ChatThreadRequest(BaseModel):
title: str | None = None
class RenameChatThreadRequest(BaseModel):
title: str
class DirectNegotiationMessageRequest(BaseModel):
message: str
class ClearMemoryRequest(BaseModel):
include_memories: bool = True
include_conversations: bool = True
include_profile: bool = False
include_jobs: bool = False
include_outbox: bool = True
class ContinualPlanItemRequest(BaseModel):
item_name: str
desired_quantity: int = 1
max_unit_price: float | None = None
class ContinualPlanCreateRequest(BaseModel):
title: str
objective: str
kind: str = "buying"
cadence: str | None = None
constraints: dict[str, Any] = {}
items: list[ContinualPlanItemRequest] = []
class ContinualPlanDraftRequest(BaseModel):
title: str = ""
objective: str = ""
kind: str = "buying"
constraints: dict[str, Any] = {}
items: list[ContinualPlanItemRequest] = []
class ContinualPlanEventRequest(BaseModel):
kind: str = "note"
message: str
metadata: dict[str, Any] = {}
class ConfigUpdateRequest(BaseModel):
values: dict
class OllamaModelRequest(BaseModel):
model: str | None = None
OLLAMA_DOWNLOAD_URL = "https://ollama.com/download/windows"
UPDATE_ASSET_NAME = "TraderAI.exe"
def create_app() -> FastAPI:
settings = get_settings()
memory = MemoryStore(settings.traderai_memory_path)
plan_store = ContinualPlanStore(memory)
scheduler = WakeScheduler(memory)
runtime: dict[str, Any] = {}
def configure_runtime(current_settings: Any) -> None:
uex = UEXClient(current_settings.uex_base_url, current_settings.uex_secret_key, current_settings.uex_bearer_token)
scmdb = SCMDBClient(current_settings.scmdb_base_url)
cornerstone = CornerstoneClient(current_settings.cornerstone_base_url)
scwiki = StarCitizenWikiClient(current_settings.scwiki_base_url, current_settings.scwiki_api_base_url)
wikelo = WikeloProjectsClient()
tools = ToolRegistry(
uex,
current_settings.require_write_approval,
memory=memory,
scheduler=scheduler,
scmdb=scmdb,
cornerstone=cornerstone,
scwiki=scwiki,
wikelo=wikelo,
plan_store=plan_store,
)
plan_runner = ContinualPlanRunner(plan_store, tools, memory)
tools.plan_runner = plan_runner
provider_base_url, provider_model, provider_api_key = provider_settings(current_settings)
agent = OllamaAgent(
provider_base_url,
provider_model,
tools,
memory=memory,
user_name=current_settings.traderai_user_name,
num_ctx=current_settings.ollama_num_ctx,
provider=current_settings.model_provider,
api_key=provider_api_key,
reasoning_effort=current_settings.model_reasoning_effort,
)
plan_runner.bind_agent(agent)
scheduler.bind_agent(agent)
scheduler.bind_plan_runner(plan_runner)
scheduler.bind_uex_notifications(uex, current_settings.uex_notification_poll_seconds)
runtime.update(
{
"settings": current_settings,
"uex": uex,
"tools": tools,
"plan_runner": plan_runner,
"agent": agent,
}
)
configure_runtime(settings)
app = FastAPI(title="TraderAI")
static_dir = resource_path("web")
app.mount("/static", StaticFiles(directory=static_dir), name="static")
@app.on_event("startup")
async def startup() -> None:
await refresh_user_profile()
scheduler.start()
@app.on_event("shutdown")
async def shutdown() -> None:
scheduler.shutdown()
async def refresh_user_profile() -> None:
current_settings = get_settings()
agent = runtime["agent"]
uex = runtime["uex"]
if current_settings.traderai_user_name:
memory.set_profile("configured_name", current_settings.traderai_user_name)
agent.user_name = agent.user_name or current_settings.traderai_user_name
try:
response = await uex.get_user(authenticated=True)
except Exception as exc:
memory.set_profile("uex_user_error", str(exc))
if current_settings.traderai_user_name:
try:
response = await uex.get_user(username=current_settings.traderai_user_name)
except Exception:
return
else:
return
data = response.get("user")
if data:
memory.set_profile("uex_user", data)
username = data.get("username") or data.get("user_username") or data.get("name")
if username:
agent.user_name = username
@app.get("/")
async def index() -> FileResponse:
return FileResponse(static_dir / "index.html")
@app.get("/api/health")
async def health() -> dict:
agent = runtime["agent"]
current_settings = get_settings()
inference = await agent.health()
return {
"inference": inference,
"ollama": inference,
"model_provider": current_settings.model_provider,
"user": memory.get_profile(),
"jobs": scheduler.list_jobs(),
"app_data_dir": settings_payload()["app_data_dir"],
"version": __version__,
}
@app.get("/api/config")
async def inspect_config() -> dict:
return settings_payload()
@app.post("/api/config")
async def update_config(request: ConfigUpdateRequest) -> dict:
previous_settings = get_settings()
updated = save_settings(request.values)
current_settings = get_settings()
configure_runtime(current_settings)
await refresh_user_profile()
restart_required = (
"traderai_memory_path" in request.values
and str(request.values.get("traderai_memory_path") or "").strip() != str(previous_settings.traderai_memory_path)
)
updated["restart_required"] = restart_required
updated["message"] = (
"Configuration saved. Restart TraderAI to switch memory databases."
if restart_required
else "Configuration saved and applied."
)
return updated
@app.get("/api/ollama/status")
async def ollama_status() -> dict:
return await inspect_model_provider()
@app.get("/api/provider/models")
async def provider_models(provider: str | None = None) -> dict:
status = await inspect_provider_models(provider)
return {
"provider": status.get("provider", "openai"),
"configured_model": status.get("configured_model"),
"models": status.get("models", []),
"reasoning_efforts": status.get("reasoning_efforts", reasoning_effort_options()),
"configured_reasoning_effort": status.get("configured_reasoning_effort", get_settings().model_reasoning_effort),
"message": status.get("message", ""),
"detail": status.get("detail", ""),
"online": status.get("online", False),
}
@app.post("/api/codex/login")
async def launch_codex_login() -> dict:
current_settings = get_settings()
command = find_codex_cli(current_settings.codex_command)
if not command:
raise HTTPException(status_code=404, detail="Codex CLI was not found on PATH.")
try:
login = await start_codex_browser_login(command)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Codex App Server login failed: {exception_detail(exc)}") from exc
return {
"installed": True,
"running": False,
"online": False,
"provider": "codex",
"login_id": login.get("loginId"),
"auth_url": login.get("authUrl"),
"base_url": str(command),
"message": "Opened Codex App Server sign-in in your browser. Finish the flow, then TraderAI will detect the new login.",
}
@app.post("/api/ollama/launch")
async def launch_ollama() -> dict:
command = ollama_launch_command()
if not command:
raise HTTPException(status_code=404, detail="Ollama is not installed or was not found on PATH.")
try:
popen_hidden(command)
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Could not launch Ollama: {exc}") from exc
status = await inspect_model_provider()
status["message"] = "Ollama launch requested."
return status
@app.post("/api/ollama/pull")
async def pull_ollama_model(request: OllamaModelRequest) -> dict:
settings_now = get_settings()
model = (request.model or settings_now.ollama_model).strip()
if not model:
raise HTTPException(status_code=400, detail="No Ollama model is configured.")
cli = find_ollama_cli()
if not cli:
raise HTTPException(status_code=404, detail="Ollama CLI was not found.")
try:
popen_hidden([str(cli), "pull", model])
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Could not start model install: {exc}") from exc
status = await inspect_model_provider()
status["message"] = f"Started installing model {model}."
return status
@app.post("/api/ollama/install")
async def install_ollama() -> dict:
winget = shutil.which("winget")
if not winget:
return {
"started": False,
"message": "winget is not available on this system. Open the download page instead.",
"download_url": OLLAMA_DOWNLOAD_URL,
}
try:
popen_hidden(
[
winget,
"install",
"-e",
"--id",
"Ollama.Ollama",
"--accept-package-agreements",
"--accept-source-agreements",
]
)
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Could not start Ollama install: {exc}") from exc
return {"started": True, "message": "Started Ollama install with winget.", "download_url": OLLAMA_DOWNLOAD_URL}
@app.post("/api/ollama/download")
async def download_ollama() -> dict:
webbrowser.open(OLLAMA_DOWNLOAD_URL)
return {"opened": True, "download_url": OLLAMA_DOWNLOAD_URL, "message": "Opened the Ollama download page."}
@app.get("/api/update/check")
async def check_update() -> dict:
return await inspect_update()
@app.post("/api/update/install")
async def install_update() -> dict:
update = await inspect_update()
if not update["available"]:
return {**update, "message": "TraderAI is already up to date."}
if not getattr(sys, "frozen", False):
return {
**update,
"started": False,
"message": "Update download is available, but self-update only runs from the packaged exe.",
}
asset_url = update.get("asset_download_url")
if not asset_url:
raise HTTPException(status_code=404, detail="The latest release does not include TraderAI.exe.")
downloaded = await download_update_asset(asset_url, update["latest_version"])
script = write_update_script(downloaded, Path(sys.executable))
updater_command = [
"powershell",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-File",
str(script),
"-ProcessId",
str(os.getpid()),
"-Source",
str(downloaded),
"-Target",
str(Path(sys.executable)),
]
updater_kwargs: dict[str, Any] = {"stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL}
if sys.platform == "win32":
updater_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
subprocess.Popen(updater_command, **updater_kwargs)
threading.Thread(target=exit_after_update_response, daemon=True).start()
return {**update, "started": True, "message": "Update downloaded. TraderAI will restart into the new version."}
@app.post("/api/chat")
async def chat(request: ChatRequest) -> dict:
agent = runtime["agent"]
try:
return await agent.chat(
request.message,
thread_id=request.thread_id,
images=[image.model_dump() for image in request.images],
)
except OllamaUnavailable as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest) -> StreamingResponse:
agent = runtime["agent"]
async def events():
async for event in agent.chat_events(
request.message,
thread_id=request.thread_id,
images=[image.model_dump() for image in request.images],
):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(events(), media_type="text/event-stream")
@app.get("/api/chats")
async def chats() -> dict:
return {"chats": memory.list_threads()}
@app.post("/api/chats")
async def create_chat(request: ChatThreadRequest) -> dict:
return {"chat": memory.create_thread(request.title)}
@app.get("/api/chats/{thread_id}/messages")
async def chat_messages(thread_id: str) -> dict:
memory.ensure_thread(thread_id)
return {"thread_id": thread_id, "messages": memory.recent_conversation(limit=200, thread_id=thread_id)}
@app.delete("/api/chats/{thread_id}")
async def delete_chat(thread_id: str) -> dict:
deleted = memory.delete_thread(thread_id)
return {"deleted": deleted, "chats": memory.list_threads()}
@app.patch("/api/chats/{thread_id}")
async def rename_chat(thread_id: str, request: RenameChatThreadRequest) -> dict:
chat = memory.rename_thread(thread_id, request.title)
if not chat:
raise HTTPException(status_code=400, detail="A non-empty chat title is required.")
return {"chat": chat, "chats": memory.list_threads()}
@app.get("/api/pending-actions")
async def pending_actions() -> dict:
agent = runtime["agent"]
return {"pending_actions": agent._pending_payloads()}
@app.get("/api/notifications")
async def notifications() -> dict:
return {"notifications": memory.undelivered_outbox()}
@app.get("/api/inbox")
async def inbox() -> dict:
return {"inbox": memory.list_outbox()}
@app.post("/api/inbox/{inbox_id}/continue")
async def continue_inbox(inbox_id: int) -> dict:
item = memory.get_outbox(inbox_id)
if not item:
raise HTTPException(status_code=404, detail="Inbox item not found.")
thread = memory.create_thread("Inbox follow-up")
memory.add_conversation("assistant", item["content"], thread["id"])
return {"chat": thread, "message": item}
@app.delete("/api/inbox/{inbox_id}")
async def delete_inbox(inbox_id: int) -> dict:
deleted = memory.delete_outbox(inbox_id)
return {"deleted": deleted, "inbox": memory.list_outbox()}
@app.get("/api/negotiations/{identifier}/messages")
async def negotiation_messages(identifier: str) -> dict:
uex = runtime["uex"]
params = negotiation_identifier_params(identifier)
return await uex.get("marketplace_negotiations_messages", params, authenticated=True)
@app.post("/api/negotiations/{identifier}/messages")
async def send_negotiation_message(identifier: str, request: DirectNegotiationMessageRequest) -> dict:
uex = runtime["uex"]
params = negotiation_identifier_params(identifier)
payload = {**params, "message": request.message, "is_production": 1}
return await uex.post("marketplace_negotiations_messages", payload, authenticated=True)
@app.get("/api/wake-jobs")
async def wake_jobs() -> dict:
return {"scheduled_jobs": scheduler.list_jobs()}
@app.get("/api/plans")
async def continual_plans(include_inactive: bool = True) -> dict:
return {"plans": plan_store.list_plans(include_inactive=include_inactive)}
@app.post("/api/plans")
async def create_continual_plan(request: ContinualPlanCreateRequest) -> dict:
tools = runtime["tools"]
result = await tools.create_continual_plan(
title=request.title,
objective=request.objective,
kind=request.kind,
items=[item.model_dump() for item in request.items],
constraints=request.constraints,
cadence=request.cadence,
)
if result.get("error"):
raise HTTPException(status_code=400, detail=result["error"])
return result
@app.post("/api/plans/draft")
async def draft_continual_plan(request: ContinualPlanDraftRequest) -> dict:
agent = runtime["agent"]
draft = await agent.generate_plan_draft(
title=request.title,
objective=request.objective,
kind=request.kind,
constraints=request.constraints,
items=[item.model_dump() for item in request.items],
)
return {"draft": draft}
@app.get("/api/plans/{plan_id}")
async def continual_plan(plan_id: str) -> dict:
plan = plan_store.get_plan(plan_id)
if not plan:
raise HTTPException(status_code=404, detail="Plan not found.")
return {"plan": plan}
@app.post("/api/plans/{plan_id}/pause")
async def pause_continual_plan(plan_id: str) -> dict:
tools = runtime["tools"]
result = await tools.pause_continual_plan(plan_id)
if result.get("error"):
raise HTTPException(status_code=404, detail=result["error"])
return result
@app.post("/api/plans/{plan_id}/resume")
async def resume_continual_plan(plan_id: str) -> dict:
tools = runtime["tools"]
result = await tools.resume_continual_plan(plan_id)
if result.get("error"):
raise HTTPException(status_code=404, detail=result["error"])
return result
@app.post("/api/plans/{plan_id}/cancel")
async def cancel_continual_plan(plan_id: str) -> dict:
tools = runtime["tools"]
result = await tools.cancel_continual_plan(plan_id)
if result.get("error"):
raise HTTPException(status_code=404, detail=result["error"])
return result
@app.delete("/api/plans/{plan_id}")
async def delete_continual_plan(plan_id: str) -> dict:
tools = runtime["tools"]
result = await tools.delete_continual_plan(plan_id)
if result.get("error"):
raise HTTPException(status_code=404, detail=result["error"])
return result
@app.post("/api/plans/{plan_id}/run")
async def run_continual_plan(plan_id: str) -> dict:
tools = runtime["tools"]
result = await tools.run_continual_plan_now(plan_id)
if result.get("error"):
raise HTTPException(status_code=400, detail=result["error"])
return result
@app.post("/api/plans/{plan_id}/events")
async def add_continual_plan_event(plan_id: str, request: ContinualPlanEventRequest) -> dict:
if not plan_store.get_plan(plan_id):
raise HTTPException(status_code=404, detail="Plan not found.")
event = plan_store.add_event(plan_id, request.kind, request.message, request.metadata)
return {"event": event, "plan": plan_store.get_plan(plan_id)}
@app.get("/api/memory")
async def inspect_memory(limit: int = 50) -> dict:
return memory.inspect(max(1, min(limit, 200)))
@app.post("/api/memory/clear")
async def clear_memory(request: ClearMemoryRequest) -> dict:
if request.include_jobs:
scheduler.shutdown()
deleted = memory.clear(
include_memories=request.include_memories,
include_conversations=request.include_conversations,
include_profile=request.include_profile,
include_jobs=request.include_jobs,
include_outbox=request.include_outbox,
)
if request.include_jobs:
scheduler.start()
return {"deleted": deleted, "memory": memory.inspect(50)}
@app.post("/api/approve/{action_id}")
async def approve(action_id: str) -> dict:
tools = runtime["tools"]
return await tools.approve(action_id)
@app.post("/api/decline/{action_id}")
async def decline(action_id: str) -> dict:
tools = runtime["tools"]
return await tools.decline(action_id)
return app
def negotiation_identifier_params(identifier: str) -> dict[str, Any]:
value = identifier.strip()
if not value:
raise HTTPException(status_code=400, detail="Negotiation id or hash is required.")
if value.isdigit():
return {"id_negotiation": int(value)}
return {"hash": value}
async def inspect_model_provider() -> dict[str, Any]:
settings = get_settings()
if settings.model_provider == "openai":
return await inspect_openai()
if settings.model_provider == "deepseek":
return await inspect_deepseek()
if settings.model_provider == "codex":
return await inspect_codex()
return await inspect_ollama()
async def inspect_openai() -> dict[str, Any]:
settings = get_settings()
return await inspect_cloud_provider_config("openai", settings.openai_base_url, settings.openai_api_key, settings.openai_model)
async def inspect_deepseek() -> dict[str, Any]:
settings = get_settings()
return await inspect_cloud_provider_config(
"deepseek",
settings.deepseek_base_url,
settings.deepseek_api_key,
settings.deepseek_model,
)
async def inspect_codex() -> dict[str, Any]:
settings = get_settings()
command = find_codex_cli(settings.codex_command)
detail = ""
online = False
models: list[str] = []
effort_map: dict[str, list[str]] = {}
if command:
try:
account, models, effort_map = await inspect_codex_app_server(command)
online = bool(account)
detail = f"Logged in as {account.get('email')}" if isinstance(account, dict) and account.get("email") else ""
except (OSError, RuntimeError, asyncio.TimeoutError) as exc:
detail = str(exc)
configured_model = settings.codex_model
model_available = configured_model in models if models else bool(configured_model)
return {
"installed": bool(command),
"running": online,
"online": online,
"provider": "codex",
"model_available": model_available,
"configured_model": configured_model,
"configured_reasoning_effort": settings.model_reasoning_effort,
"reasoning_efforts": codex_reasoning_efforts(configured_model, effort_map),
"base_url": str(command) if command else settings.codex_command,
"models": models,
"message": codex_status_message(bool(command), online, model_available, configured_model),
"detail": detail,
}
async def inspect_cloud_provider() -> dict[str, Any]:
settings = get_settings()
if settings.model_provider == "codex":
return await inspect_codex()
if settings.model_provider == "deepseek":
return await inspect_deepseek()
return await inspect_openai()
async def inspect_provider_models(provider: str | None = None) -> dict[str, Any]:
normalized = str(provider or get_settings().model_provider).strip().casefold()
if normalized == "codex":
return await inspect_codex()
if normalized == "ollama":
return await inspect_ollama()
if normalized == "deepseek":
return await inspect_deepseek()
return await inspect_openai()
async def inspect_cloud_provider_config(
provider: str,
base_url: str,
api_key: str | None,
model: str,
) -> dict[str, Any]:
settings = get_settings()
models: list[str] = []
online = False
detail = ""
provider_name = provider_display_name(provider)
if not api_key:
return {
"installed": True,
"running": False,
"online": False,
"provider": provider,
"model_available": False,
"configured_model": model,
"configured_reasoning_effort": canonical_provider_reasoning_effort(provider, settings.model_reasoning_effort),
"reasoning_efforts": provider_reasoning_efforts(provider, model),
"base_url": base_url,
"models": [],
"message": f"{provider_name} is selected, but no API key is configured.",
"detail": "",
}
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(
f"{base_url.rstrip('/')}/models",
headers={"Authorization": f"Bearer {api_key}"},
)
response.raise_for_status()
body = response.json()
online = True
models = sorted(item.get("id") for item in body.get("data", []) if item.get("id"))
except (httpx.HTTPError, ValueError) as exc:
detail = str(exc)
model_available = model in models
return {
"installed": True,
"running": online,
"online": online,
"provider": provider,
"model_available": model_available,
"configured_model": model,
"configured_reasoning_effort": canonical_provider_reasoning_effort(provider, settings.model_reasoning_effort),
"reasoning_efforts": provider_reasoning_efforts(provider, model),
"base_url": base_url,
"models": models,
"message": cloud_status_message(provider, online, bool(api_key), model_available, model),
"detail": detail,
}
async def inspect_ollama() -> dict[str, Any]:
settings = get_settings()
executable = find_ollama_executable()
cli = find_ollama_cli()
models: list[str] = []
online = False
detail = ""
try:
async with httpx.AsyncClient(timeout=3) as client:
response = await client.get(f"{settings.ollama_base_url.rstrip('/')}/api/tags")
response.raise_for_status()
body = response.json()
online = True
models = [item.get("name") or item.get("model") for item in body.get("models", [])]
models = [model for model in models if model]
except (httpx.HTTPError, ValueError) as exc:
detail = str(exc)
installed = bool(executable or cli)
model_available = settings.ollama_model in models
return {
"installed": installed,
"running": online,
"online": online,
"provider": "ollama",
"model_available": model_available,
"configured_model": settings.ollama_model,
"configured_reasoning_effort": settings.model_reasoning_effort,
"reasoning_efforts": reasoning_effort_options(),
"base_url": settings.ollama_base_url,
"num_ctx": settings.ollama_num_ctx,
"models": models,
"executable": str(executable) if executable else None,
"cli": str(cli) if cli else None,
"can_auto_install": bool(shutil.which("winget")),
"download_url": OLLAMA_DOWNLOAD_URL,
"message": ollama_status_message(installed, online, model_available, settings.ollama_model),
"detail": detail,
}
def cloud_status_message(provider: str, running: bool, configured: bool, model_available: bool, model: str) -> str:
provider_name = provider_display_name(provider)
if not configured:
return f"{provider_name} API key is not configured."
if not running:
return f"{provider_name} is not reachable with the configured key."
if not model_available:
return f'{provider_name} is reachable, but model "{model}" was not returned by the API.'
return f"{provider_name} is ready."
def ollama_status_message(installed: bool, running: bool, model_available: bool, model: str) -> str:
if not installed:
return "Ollama is not installed."
if not running:
return "Ollama is installed but not running."
if not model_available:
return f'Ollama is running, but model "{model}" is not installed.'
return "Ollama is ready."
def codex_status_message(installed: bool, logged_in: bool, model_available: bool, model: str) -> str:
if not installed:
return "Codex CLI is not installed."
if not logged_in:
return "Codex CLI is installed, but the Codex App Server is not logged in with ChatGPT."
if not model_available:
return f'Codex App Server is logged in, but model "{model}" was not returned by the model list.'
return "Codex App Server is ready."
def provider_settings(settings: Any) -> tuple[str, str, str | None]:
if settings.model_provider == "openai":
return settings.openai_base_url, settings.openai_model, settings.openai_api_key
if settings.model_provider == "deepseek":
return settings.deepseek_base_url, settings.deepseek_model, settings.deepseek_api_key
if settings.model_provider == "codex":
return settings.codex_command, settings.codex_model, None
return settings.ollama_base_url, settings.ollama_model, None
def provider_display_name(provider: str) -> str:
return {"openai": "OpenAI", "deepseek": "DeepSeek", "codex": "Codex"}.get(provider, "Ollama")
def find_codex_cli(configured_command: str | None = None) -> Path | None:
candidates = [configured_command, shutil.which("codex"), os.path.join(os.environ.get("USERPROFILE", ""), ".codex", ".sandbox-bin", "codex.exe")]
for candidate in candidates:
if not candidate:
continue
resolved = shutil.which(candidate) if Path(candidate).name == candidate else candidate
if not resolved:
continue
path = Path(resolved)
if path.exists():
return path
return None
_codex_login_tasks: set[asyncio.Task] = set()
async def start_codex_browser_login(command: Path) -> dict[str, Any]:
process = await asyncio.create_subprocess_exec(
str(command),
"app-server",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0,
)
request_id = 1
async def write(payload: dict[str, Any]) -> None:
if process.stdin is None:
raise RuntimeError("Codex App Server stdin is unavailable.")
process.stdin.write((json.dumps(payload, ensure_ascii=True) + "\n").encode("utf-8"))
await process.stdin.drain()
async def read(timeout: int = 30) -> dict[str, Any]:
if process.stdout is None:
raise RuntimeError("Codex App Server stdout is unavailable.")
try:
line = await asyncio.wait_for(process.stdout.readline(), timeout=timeout)
except asyncio.TimeoutError as exc:
raise RuntimeError("Codex App Server timed out while starting browser login.") from exc
if not line:
stderr = ""
if process.stderr is not None:
try:
stderr = (await asyncio.wait_for(process.stderr.read(), timeout=1)).decode("utf-8", errors="replace").strip()
except asyncio.TimeoutError:
stderr = ""
raise RuntimeError(stderr or "Codex App Server exited before login completed.")
return json.loads(line.decode("utf-8", errors="replace"))
async def send(method: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
nonlocal request_id
current_id = request_id
request_id += 1
payload: dict[str, Any] = {"jsonrpc": "2.0", "id": current_id, "method": method}
if params is not None:
payload["params"] = params
await write(payload)
while True:
message = await read()
if message.get("id") == current_id:
if message.get("error"):
error = message["error"]
raise RuntimeError(error.get("message") or f"Codex App Server request failed: {error}")
return message.get("result") or {}
await answer_codex_login_server_request(write, message)
try:
await send(
"initialize",
{
"clientInfo": {"name": "TraderAI", "version": __version__},
"capabilities": {"experimentalApi": True},
},
)
await write({"jsonrpc": "2.0", "method": "initialized", "params": {}})
login = await send("account/login/start", {"type": "chatgpt"})
if login.get("type") != "chatgpt" or not login.get("authUrl"):
raise RuntimeError(f"Codex App Server did not return a browser login URL: {login!r}")
task = asyncio.create_task(watch_codex_browser_login(process, read, write, login.get("loginId")))
_codex_login_tasks.add(task)
task.add_done_callback(_codex_login_tasks.discard)
return login
except Exception:
await stop_process(process)
raise
async def answer_codex_login_server_request(write: Any, message: dict[str, Any]) -> None:
if "id" not in message or "method" not in message:
return
await write(
{
"jsonrpc": "2.0",
"id": message["id"],
"error": {"code": -32601, "message": "TraderAI login does not handle server requests."},
}
)
async def watch_codex_browser_login(process: asyncio.subprocess.Process, read: Any, write: Any, login_id: str | None) -> None:
try:
while True:
message = await read(timeout=300)
if message.get("method") == "account/login/completed":
params = message.get("params") or {}
if login_id is None or params.get("loginId") == login_id:
return
await answer_codex_login_server_request(write, message)
except Exception:
return
finally:
await stop_process(process)
async def stop_process(process: asyncio.subprocess.Process) -> None:
if process.returncode is not None:
return
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=3)
except asyncio.TimeoutError:
process.kill()
await process.wait()
async def inspect_codex_app_server(command: Path) -> tuple[dict[str, Any] | None, list[str], dict[str, list[str]]]:
process = await asyncio.create_subprocess_exec(
str(command),
"app-server",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0,
)
request_id = 1
async def write(payload: dict[str, Any]) -> None:
if process.stdin is None:
raise RuntimeError("Codex App Server stdin is unavailable.")
process.stdin.write((json.dumps(payload, ensure_ascii=True) + "\n").encode("utf-8"))
await process.stdin.drain()
async def read(timeout: int = 30) -> dict[str, Any]:
if process.stdout is None:
raise RuntimeError("Codex App Server stdout is unavailable.")
line = await asyncio.wait_for(process.stdout.readline(), timeout=timeout)
if not line:
stderr = ""
if process.stderr is not None:
try:
stderr = (await asyncio.wait_for(process.stderr.read(), timeout=1)).decode("utf-8", errors="replace").strip()
except asyncio.TimeoutError:
stderr = ""
raise RuntimeError(stderr or "Codex App Server exited without a response.")
return json.loads(line.decode("utf-8", errors="replace"))
async def send(method: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
nonlocal request_id
current_id = request_id
request_id += 1
payload: dict[str, Any] = {"jsonrpc": "2.0", "id": current_id, "method": method}
if params is not None:
payload["params"] = params
await write(payload)
while True:
message = await read()
if message.get("id") == current_id:
if message.get("error"):
error = message["error"]
raise RuntimeError(error.get("message") or f"Codex App Server request failed: {error}")
return message.get("result") or {}
if "id" in message and "method" in message:
await write(
{
"jsonrpc": "2.0",
"id": message["id"],
"error": {"code": -32601, "message": "TraderAI status checks do not handle server requests."},
}
)
try:
await send(
"initialize",
{
"clientInfo": {"name": "TraderAI", "version": __version__},
"capabilities": {"experimentalApi": True},
},
)
await write({"jsonrpc": "2.0", "method": "initialized", "params": {}})
account_result = await send("account/read", {"refreshToken": False})
models: list[str] = []
effort_map: dict[str, list[str]] = {}
cursor: str | None = None
for _ in range(20):
params: dict[str, Any] = {"limit": 50, "includeHidden": False}
if cursor:
params["cursor"] = cursor
page = await send("model/list", params)
for item in page.get("data") or []:
model = item.get("id") or item.get("model")
if not model:
continue
models.append(model)
efforts = [
effort.get("reasoningEffort")
for effort in item.get("supportedReasoningEfforts", [])
if effort.get("reasoningEffort")
]
if efforts:
effort_map[model] = efforts
cursor = page.get("nextCursor")
if not cursor:
break
return account_result.get("account"), sorted(set(models)), effort_map
finally:
if process.returncode is None:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=3)
except asyncio.TimeoutError:
process.kill()
await process.wait()
def codex_models() -> list[str]:
cache_path = Path.home() / ".codex" / "models_cache.json"
if not cache_path.exists():
return []
try:
body = json.loads(cache_path.read_text(encoding="utf-8"))
except (OSError, ValueError):
return []
models = []
for item in body.get("models", []):
slug = item.get("slug")
if slug:
models.append(slug)
return sorted(set(models))
def codex_reasoning_efforts(model: str, effort_map: dict[str, list[str]] | None = None) -> list[str]:
if effort_map and effort_map.get(model):
return effort_map[model]
cache_path = Path.home() / ".codex" / "models_cache.json"
if not cache_path.exists():
return reasoning_effort_options()
try:
body = json.loads(cache_path.read_text(encoding="utf-8"))
except (OSError, ValueError):
return reasoning_effort_options()
for item in body.get("models", []):
if item.get("slug") != model:
continue
efforts = [entry.get("effort") for entry in item.get("supported_reasoning_levels", []) if entry.get("effort")]
return efforts or reasoning_effort_options()
return reasoning_effort_options()
def reasoning_effort_options() -> list[str]:
return ["none", "minimal", "low", "medium", "high", "xhigh"]
def deepseek_reasoning_efforts(model: str) -> list[str]:
supported_models = {"deepseek-v4-flash", "deepseek-v4-pro", "deepseek-chat", "deepseek-reasoner"}
return ["none", "high", "max"] if model in supported_models else ["none", "high"]
def provider_reasoning_efforts(provider: str, model: str) -> list[str]:
if provider == "deepseek":
return deepseek_reasoning_efforts(model)
return reasoning_effort_options()
def canonical_provider_reasoning_effort(provider: str, effort: str) -> str:
normalized = str(effort or "medium").strip().casefold()
if provider != "deepseek":
return normalized
if normalized in {"none", "minimal"}:
return "none"
if normalized in {"xhigh", "max"}:
return "max"
return "high"
def find_ollama_executable() -> Path | None:
candidates = [
shutil.which("ollama"),
os.environ.get("OLLAMA_EXE"),
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Ollama", "Ollama.exe"),
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Ollama", "ollama.exe"),
os.path.join(os.environ.get("ProgramFiles", ""), "Ollama", "Ollama.exe"),
os.path.join(os.environ.get("ProgramFiles", ""), "Ollama", "ollama.exe"),
os.path.join(os.environ.get("ProgramFiles(x86)", ""), "Ollama", "Ollama.exe"),
os.path.join(os.environ.get("ProgramFiles(x86)", ""), "Ollama", "ollama.exe"),
]
for candidate in candidates:
if not candidate:
continue
path = Path(candidate)
if path.exists():
return path
return None
def find_ollama_cli() -> Path | None:
candidates = [
shutil.which("ollama"),
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Ollama", "ollama.exe"),
os.path.join(os.environ.get("ProgramFiles", ""), "Ollama", "ollama.exe"),
os.path.join(os.environ.get("ProgramFiles(x86)", ""), "Ollama", "ollama.exe"),
]
for candidate in candidates:
if not candidate:
continue
path = Path(candidate)
if path.exists():
return path
return None
def ollama_launch_command() -> list[str] | None:
executable = find_ollama_executable()
if not executable:
return None
if executable.name == "Ollama.exe":
return [str(executable)]
return [str(executable), "serve"]
def popen_hidden(command: list[str]) -> subprocess.Popen:
kwargs: dict[str, Any] = {"stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL}
if sys.platform == "win32":
kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
return subprocess.Popen(command, **kwargs)
def exception_detail(exc: BaseException) -> str:
text = str(exc).strip()
if text:
return text
return f"{type(exc).__name__}: {exc!r}"
async def inspect_update() -> dict[str, Any]:
try:
latest = await latest_release()
except (httpx.HTTPError, ValueError) as exc:
return {
"current_version": __version__,
"latest_version": None,
"available": False,
"release_url": RELEASES_URL,
"message": f"Could not check releases: {exc}",
}
if not latest:
return {
"current_version": __version__,
"latest_version": None,
"available": False,
"release_url": RELEASES_URL,
"message": "No releases were found.",
}
latest_version = normalize_version(latest.get("tag_name") or latest.get("name") or "")
asset = release_asset(latest, UPDATE_ASSET_NAME)
available = latest_version is not None and compare_versions(latest_version, __version__) > 0
return {
"current_version": __version__,
"latest_version": latest_version,
"available": available,
"release_name": latest.get("name") or latest.get("tag_name"),
"release_url": latest.get("html_url") or RELEASES_URL,
"asset_name": asset.get("name") if asset else None,
"asset_download_url": asset.get("browser_download_url") if asset else None,
"packaged": bool(getattr(sys, "frozen", False)),
"message": update_message(available, latest_version, bool(asset)),
}
async def latest_release() -> dict[str, Any] | None:
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
response = await client.get(RELEASES_API_URL)
response.raise_for_status()
releases = response.json()
if not isinstance(releases, list):
return None
candidates = [
release
for release in releases
if isinstance(release, dict) and not release.get("draft") and not release.get("prerelease")
]
if not candidates:
candidates = [release for release in releases if isinstance(release, dict) and not release.get("draft")]
if not candidates:
return None
return max(candidates, key=lambda release: version_parts(str(release.get("tag_name") or release.get("name") or "0")))
def release_asset(release: dict[str, Any], name: str) -> dict[str, Any] | None:
assets = release.get("assets") or []
if not isinstance(assets, list):
return None
for asset in assets:
if isinstance(asset, dict) and str(asset.get("name", "")).casefold() == name.casefold():
return asset
for asset in assets:
download_url = str(asset.get("browser_download_url", "")) if isinstance(asset, dict) else ""
if download_url.casefold().endswith(f"/{name.casefold()}"):
return asset
return None
async def download_update_asset(url: str, version: str) -> Path:
updates_dir = Path(settings_payload()["app_data_dir"]) / "updates"
updates_dir.mkdir(parents=True, exist_ok=True)
path = updates_dir / f"TraderAI-{version}.exe"
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
async with client.stream("GET", url) as response:
response.raise_for_status()
with path.open("wb") as file:
async for chunk in response.aiter_bytes():
file.write(chunk)
return path
def write_update_script(source: Path, target: Path) -> Path:
updates_dir = Path(settings_payload()["app_data_dir"]) / "updates"
updates_dir.mkdir(parents=True, exist_ok=True)
script = updates_dir / "apply-update.ps1"
script.write_text(
"\n".join(
[
"param(",
" [Parameter(Mandatory=$true)][int]$ProcessId,",
" [Parameter(Mandatory=$true)][string]$Source,",
" [Parameter(Mandatory=$true)][string]$Target",
")",
"$ErrorActionPreference = 'Stop'",
"try { Wait-Process -Id $ProcessId -Timeout 60 -ErrorAction SilentlyContinue } catch {}",
"Start-Sleep -Seconds 1",
"Copy-Item -LiteralPath $Source -Destination $Target -Force",
"Start-Process -FilePath $Target",
]
)
+ "\n",
encoding="utf-8",
)
return script
def exit_after_update_response() -> None:
time.sleep(1.5)
os._exit(0)
def update_message(available: bool, latest_version: str | None, has_asset: bool) -> str:
if not latest_version:
return "Could not determine the latest release version."
if not available:
return f"TraderAI {__version__} is up to date."
if not has_asset:
return f"TraderAI {latest_version} is available, but the release has no {UPDATE_ASSET_NAME} asset."
return f"TraderAI {latest_version} is available."
def normalize_version(value: str) -> str | None:
text = value.strip()
if text.startswith("v"):
text = text[1:]
parts = text.split(".")
if len(parts) < 2:
return None
return text
def compare_versions(left: str, right: str) -> int:
left_parts = version_parts(left)
right_parts = version_parts(right)
max_len = max(len(left_parts), len(right_parts))
left_parts.extend([0] * (max_len - len(left_parts)))
right_parts.extend([0] * (max_len - len(right_parts)))
return (left_parts > right_parts) - (left_parts < right_parts)
def version_parts(version: str) -> list[int]:
text = normalize_version(version) or "0"
core = text.replace("-", ".").replace("+", ".").split(".")
parts: list[int] = []
for item in core:
digits = ""
for char in item:
if not char.isdigit():
break
digits += char
parts.append(int(digits or 0))
return parts
app = create_app()