Files
TraderAI/traderai/server.py
T
2026-05-08 14:48:51 -04:00

830 lines
30 KiB
Python

from __future__ import annotations
import os
import json
import shutil
import subprocess
import sys
import threading
import time
import webbrowser
from pathlib import Path
from typing import Any
import httpx
from fastapi import FastAPI
from fastapi import HTTPException
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from traderai.agent import OllamaAgent, OllamaUnavailable
from traderai.config import save_settings, settings_payload
from traderai.config import get_settings
from traderai.cornerstone_client import CornerstoneClient
from traderai.memory import DEFAULT_THREAD_ID, MemoryStore
from traderai.plans import ContinualPlanRunner, ContinualPlanStore
from traderai.scheduler import WakeScheduler
from traderai.scmdb_client import SCMDBClient
from traderai.tools import ToolRegistry
from traderai.uex_client import UEXClient
from traderai.version import RELEASES_API_URL, RELEASES_URL, __version__
def resource_path(*parts: str) -> Path:
base = Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent.parent))
return base.joinpath(*parts)
class ChatRequest(BaseModel):
message: str
thread_id: str | None = DEFAULT_THREAD_ID
images: list["ChatImageRequest"] = []
class ChatImageRequest(BaseModel):
name: str = "pasted-image.png"
content_type: str = "image/png"
image_data: str
class ChatThreadRequest(BaseModel):
title: str | None = None
class RenameChatThreadRequest(BaseModel):
title: str
class DirectNegotiationMessageRequest(BaseModel):
message: str
class ClearMemoryRequest(BaseModel):
include_memories: bool = True
include_conversations: bool = True
include_profile: bool = False
include_jobs: bool = False
include_outbox: bool = True
class ContinualPlanItemRequest(BaseModel):
item_name: str
desired_quantity: int = 1
max_unit_price: float | None = None
class ContinualPlanCreateRequest(BaseModel):
title: str
objective: str
kind: str = "buying"
cadence: str | None = None
constraints: dict[str, Any] = {}
items: list[ContinualPlanItemRequest] = []
class ContinualPlanEventRequest(BaseModel):
kind: str = "note"
message: str
metadata: dict[str, Any] = {}
class ConfigUpdateRequest(BaseModel):
values: dict
class OllamaModelRequest(BaseModel):
model: str | None = None
OLLAMA_DOWNLOAD_URL = "https://ollama.com/download/windows"
UPDATE_ASSET_NAME = "TraderAI.exe"
def create_app() -> FastAPI:
settings = get_settings()
memory = MemoryStore(settings.traderai_memory_path)
plan_store = ContinualPlanStore(memory)
scheduler = WakeScheduler(memory)
uex = UEXClient(settings.uex_base_url, settings.uex_secret_key, settings.uex_bearer_token)
scmdb = SCMDBClient(settings.scmdb_base_url)
cornerstone = CornerstoneClient(settings.cornerstone_base_url)
tools = ToolRegistry(
uex,
settings.require_write_approval,
memory=memory,
scheduler=scheduler,
scmdb=scmdb,
cornerstone=cornerstone,
plan_store=plan_store,
)
plan_runner = ContinualPlanRunner(plan_store, tools, memory)
tools.plan_runner = plan_runner
agent = OllamaAgent(
settings.openai_base_url if settings.model_provider == "openai" else settings.ollama_base_url,
settings.openai_model if settings.model_provider == "openai" else settings.ollama_model,
tools,
memory=memory,
user_name=settings.traderai_user_name,
num_ctx=settings.ollama_num_ctx,
provider=settings.model_provider,
api_key=settings.openai_api_key,
)
plan_runner.bind_agent(agent)
scheduler.bind_agent(agent)
scheduler.bind_plan_runner(plan_runner)
scheduler.bind_uex_notifications(uex, settings.uex_notification_poll_seconds)
app = FastAPI(title="TraderAI")
static_dir = resource_path("web")
app.mount("/static", StaticFiles(directory=static_dir), name="static")
@app.on_event("startup")
async def startup() -> None:
await refresh_user_profile()
scheduler.start()
@app.on_event("shutdown")
async def shutdown() -> None:
scheduler.shutdown()
async def refresh_user_profile() -> None:
if settings.traderai_user_name:
memory.set_profile("configured_name", settings.traderai_user_name)
agent.user_name = agent.user_name or settings.traderai_user_name
try:
response = await uex.get_user(authenticated=True)
except Exception as exc:
memory.set_profile("uex_user_error", str(exc))
if settings.traderai_user_name:
try:
response = await uex.get_user(username=settings.traderai_user_name)
except Exception:
return
else:
return
data = response.get("user")
if data:
memory.set_profile("uex_user", data)
username = data.get("username") or data.get("user_username") or data.get("name")
if username:
agent.user_name = username
@app.get("/")
async def index() -> FileResponse:
return FileResponse(static_dir / "index.html")
@app.get("/api/health")
async def health() -> dict:
return {
"ollama": await agent.health(),
"model_provider": settings.model_provider,
"user": memory.get_profile(),
"jobs": scheduler.list_jobs(),
"app_data_dir": settings_payload()["app_data_dir"],
"version": __version__,
}
@app.get("/api/config")
async def inspect_config() -> dict:
return settings_payload()
@app.post("/api/config")
async def update_config(request: ConfigUpdateRequest) -> dict:
updated = save_settings(request.values)
updated["restart_required"] = True
updated["message"] = "Configuration saved. Restart TraderAI for all settings to take effect."
return updated
@app.get("/api/ollama/status")
async def ollama_status() -> dict:
return await inspect_model_provider()
@app.get("/api/openai/models")
async def openai_models() -> dict:
status = await inspect_openai()
return {
"provider": "openai",
"configured_model": status.get("configured_model"),
"models": status.get("models", []),
"message": status.get("message", ""),
"detail": status.get("detail", ""),
"online": status.get("online", False),
}
@app.post("/api/ollama/launch")
async def launch_ollama() -> dict:
command = ollama_launch_command()
if not command:
raise HTTPException(status_code=404, detail="Ollama is not installed or was not found on PATH.")
try:
popen_hidden(command)
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Could not launch Ollama: {exc}") from exc
status = await inspect_model_provider()
status["message"] = "Ollama launch requested."
return status
@app.post("/api/ollama/pull")
async def pull_ollama_model(request: OllamaModelRequest) -> dict:
settings_now = get_settings()
model = (request.model or settings_now.ollama_model).strip()
if not model:
raise HTTPException(status_code=400, detail="No Ollama model is configured.")
cli = find_ollama_cli()
if not cli:
raise HTTPException(status_code=404, detail="Ollama CLI was not found.")
try:
popen_hidden([str(cli), "pull", model])
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Could not start model install: {exc}") from exc
status = await inspect_model_provider()
status["message"] = f"Started installing model {model}."
return status
@app.post("/api/ollama/install")
async def install_ollama() -> dict:
winget = shutil.which("winget")
if not winget:
return {
"started": False,
"message": "winget is not available on this system. Open the download page instead.",
"download_url": OLLAMA_DOWNLOAD_URL,
}
try:
popen_hidden(
[
winget,
"install",
"-e",
"--id",
"Ollama.Ollama",
"--accept-package-agreements",
"--accept-source-agreements",
]
)
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Could not start Ollama install: {exc}") from exc
return {"started": True, "message": "Started Ollama install with winget.", "download_url": OLLAMA_DOWNLOAD_URL}
@app.post("/api/ollama/download")
async def download_ollama() -> dict:
webbrowser.open(OLLAMA_DOWNLOAD_URL)
return {"opened": True, "download_url": OLLAMA_DOWNLOAD_URL, "message": "Opened the Ollama download page."}
@app.get("/api/update/check")
async def check_update() -> dict:
return await inspect_update()
@app.post("/api/update/install")
async def install_update() -> dict:
update = await inspect_update()
if not update["available"]:
return {**update, "message": "TraderAI is already up to date."}
if not getattr(sys, "frozen", False):
return {
**update,
"started": False,
"message": "Update download is available, but self-update only runs from the packaged exe.",
}
asset_url = update.get("asset_download_url")
if not asset_url:
raise HTTPException(status_code=404, detail="The latest release does not include TraderAI.exe.")
downloaded = await download_update_asset(asset_url, update["latest_version"])
script = write_update_script(downloaded, Path(sys.executable))
updater_command = [
"powershell",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-File",
str(script),
"-ProcessId",
str(os.getpid()),
"-Source",
str(downloaded),
"-Target",
str(Path(sys.executable)),
]
updater_kwargs: dict[str, Any] = {"stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL}
if sys.platform == "win32":
updater_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
subprocess.Popen(updater_command, **updater_kwargs)
threading.Thread(target=exit_after_update_response, daemon=True).start()
return {**update, "started": True, "message": "Update downloaded. TraderAI will restart into the new version."}
@app.post("/api/chat")
async def chat(request: ChatRequest) -> dict:
try:
return await agent.chat(
request.message,
thread_id=request.thread_id,
images=[image.model_dump() for image in request.images],
)
except OllamaUnavailable as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest) -> StreamingResponse:
async def events():
async for event in agent.chat_events(
request.message,
thread_id=request.thread_id,
images=[image.model_dump() for image in request.images],
):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(events(), media_type="text/event-stream")
@app.get("/api/chats")
async def chats() -> dict:
return {"chats": memory.list_threads()}
@app.post("/api/chats")
async def create_chat(request: ChatThreadRequest) -> dict:
return {"chat": memory.create_thread(request.title)}
@app.get("/api/chats/{thread_id}/messages")
async def chat_messages(thread_id: str) -> dict:
memory.ensure_thread(thread_id)
return {"thread_id": thread_id, "messages": memory.recent_conversation(limit=200, thread_id=thread_id)}
@app.delete("/api/chats/{thread_id}")
async def delete_chat(thread_id: str) -> dict:
deleted = memory.delete_thread(thread_id)
return {"deleted": deleted, "chats": memory.list_threads()}
@app.patch("/api/chats/{thread_id}")
async def rename_chat(thread_id: str, request: RenameChatThreadRequest) -> dict:
chat = memory.rename_thread(thread_id, request.title)
if not chat:
raise HTTPException(status_code=400, detail="A non-empty chat title is required.")
return {"chat": chat, "chats": memory.list_threads()}
@app.get("/api/pending-actions")
async def pending_actions() -> dict:
return {"pending_actions": agent._pending_payloads()}
@app.get("/api/notifications")
async def notifications() -> dict:
return {"notifications": memory.undelivered_outbox()}
@app.get("/api/inbox")
async def inbox() -> dict:
return {"inbox": memory.list_outbox()}
@app.post("/api/inbox/{inbox_id}/continue")
async def continue_inbox(inbox_id: int) -> dict:
item = memory.get_outbox(inbox_id)
if not item:
raise HTTPException(status_code=404, detail="Inbox item not found.")
thread = memory.create_thread("Inbox follow-up")
memory.add_conversation("assistant", item["content"], thread["id"])
return {"chat": thread, "message": item}
@app.delete("/api/inbox/{inbox_id}")
async def delete_inbox(inbox_id: int) -> dict:
deleted = memory.delete_outbox(inbox_id)
return {"deleted": deleted, "inbox": memory.list_outbox()}
@app.get("/api/negotiations/{identifier}/messages")
async def negotiation_messages(identifier: str) -> dict:
params = negotiation_identifier_params(identifier)
return await uex.get("marketplace_negotiations_messages", params, authenticated=True)
@app.post("/api/negotiations/{identifier}/messages")
async def send_negotiation_message(identifier: str, request: DirectNegotiationMessageRequest) -> dict:
params = negotiation_identifier_params(identifier)
payload = {**params, "message": request.message, "is_production": 1}
return await uex.post("marketplace_negotiations_messages", payload, authenticated=True)
@app.get("/api/wake-jobs")
async def wake_jobs() -> dict:
return {"scheduled_jobs": scheduler.list_jobs()}
@app.get("/api/plans")
async def continual_plans(include_inactive: bool = True) -> dict:
return {"plans": plan_store.list_plans(include_inactive=include_inactive)}
@app.post("/api/plans")
async def create_continual_plan(request: ContinualPlanCreateRequest) -> dict:
result = await tools.create_continual_plan(
title=request.title,
objective=request.objective,
kind=request.kind,
items=[item.model_dump() for item in request.items],
constraints=request.constraints,
cadence=request.cadence,
)
if result.get("error"):
raise HTTPException(status_code=400, detail=result["error"])
return result
@app.get("/api/plans/{plan_id}")
async def continual_plan(plan_id: str) -> dict:
plan = plan_store.get_plan(plan_id)
if not plan:
raise HTTPException(status_code=404, detail="Plan not found.")
return {"plan": plan}
@app.post("/api/plans/{plan_id}/pause")
async def pause_continual_plan(plan_id: str) -> dict:
result = await tools.pause_continual_plan(plan_id)
if result.get("error"):
raise HTTPException(status_code=404, detail=result["error"])
return result
@app.post("/api/plans/{plan_id}/resume")
async def resume_continual_plan(plan_id: str) -> dict:
result = await tools.resume_continual_plan(plan_id)
if result.get("error"):
raise HTTPException(status_code=404, detail=result["error"])
return result
@app.post("/api/plans/{plan_id}/cancel")
async def cancel_continual_plan(plan_id: str) -> dict:
result = await tools.cancel_continual_plan(plan_id)
if result.get("error"):
raise HTTPException(status_code=404, detail=result["error"])
return result
@app.post("/api/plans/{plan_id}/run")
async def run_continual_plan(plan_id: str) -> dict:
result = await tools.run_continual_plan_now(plan_id)
if result.get("error"):
raise HTTPException(status_code=400, detail=result["error"])
return result
@app.post("/api/plans/{plan_id}/events")
async def add_continual_plan_event(plan_id: str, request: ContinualPlanEventRequest) -> dict:
if not plan_store.get_plan(plan_id):
raise HTTPException(status_code=404, detail="Plan not found.")
event = plan_store.add_event(plan_id, request.kind, request.message, request.metadata)
return {"event": event, "plan": plan_store.get_plan(plan_id)}
@app.get("/api/memory")
async def inspect_memory(limit: int = 50) -> dict:
return memory.inspect(max(1, min(limit, 200)))
@app.post("/api/memory/clear")
async def clear_memory(request: ClearMemoryRequest) -> dict:
if request.include_jobs:
scheduler.shutdown()
deleted = memory.clear(
include_memories=request.include_memories,
include_conversations=request.include_conversations,
include_profile=request.include_profile,
include_jobs=request.include_jobs,
include_outbox=request.include_outbox,
)
if request.include_jobs:
scheduler.start()
return {"deleted": deleted, "memory": memory.inspect(50)}
@app.post("/api/approve/{action_id}")
async def approve(action_id: str) -> dict:
return await tools.approve(action_id)
@app.post("/api/decline/{action_id}")
async def decline(action_id: str) -> dict:
return await tools.decline(action_id)
return app
def negotiation_identifier_params(identifier: str) -> dict[str, Any]:
value = identifier.strip()
if not value:
raise HTTPException(status_code=400, detail="Negotiation id or hash is required.")
if value.isdigit():
return {"id_negotiation": int(value)}
return {"hash": value}
async def inspect_model_provider() -> dict[str, Any]:
settings = get_settings()
if settings.model_provider == "openai":
return await inspect_openai()
return await inspect_ollama()
async def inspect_openai() -> dict[str, Any]:
settings = get_settings()
models: list[str] = []
online = False
detail = ""
if not settings.openai_api_key:
return {
"installed": True,
"running": False,
"online": False,
"provider": "openai",
"model_available": False,
"configured_model": settings.openai_model,
"base_url": settings.openai_base_url,
"models": [],
"message": "OpenAI is selected, but no API key is configured.",
"detail": "",
}
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(
f"{settings.openai_base_url.rstrip('/')}/models",
headers={"Authorization": f"Bearer {settings.openai_api_key}"},
)
response.raise_for_status()
body = response.json()
online = True
models = sorted(item.get("id") for item in body.get("data", []) if item.get("id"))
except (httpx.HTTPError, ValueError) as exc:
detail = str(exc)
model_available = settings.openai_model in models
return {
"installed": True,
"running": online,
"online": online,
"provider": "openai",
"model_available": model_available,
"configured_model": settings.openai_model,
"base_url": settings.openai_base_url,
"models": models,
"message": openai_status_message(online, bool(settings.openai_api_key), model_available, settings.openai_model),
"detail": detail,
}
async def inspect_ollama() -> dict[str, Any]:
settings = get_settings()
executable = find_ollama_executable()
cli = find_ollama_cli()
models: list[str] = []
online = False
detail = ""
try:
async with httpx.AsyncClient(timeout=3) as client:
response = await client.get(f"{settings.ollama_base_url.rstrip('/')}/api/tags")
response.raise_for_status()
body = response.json()
online = True
models = [item.get("name") or item.get("model") for item in body.get("models", [])]
models = [model for model in models if model]
except (httpx.HTTPError, ValueError) as exc:
detail = str(exc)
installed = bool(executable or cli)
model_available = settings.ollama_model in models
return {
"installed": installed,
"running": online,
"online": online,
"provider": "ollama",
"model_available": model_available,
"configured_model": settings.ollama_model,
"base_url": settings.ollama_base_url,
"num_ctx": settings.ollama_num_ctx,
"models": models,
"executable": str(executable) if executable else None,
"cli": str(cli) if cli else None,
"can_auto_install": bool(shutil.which("winget")),
"download_url": OLLAMA_DOWNLOAD_URL,
"message": ollama_status_message(installed, online, model_available, settings.ollama_model),
"detail": detail,
}
def openai_status_message(running: bool, configured: bool, model_available: bool, model: str) -> str:
if not configured:
return "OpenAI API key is not configured."
if not running:
return "OpenAI is not reachable with the configured key."
if not model_available:
return f'OpenAI is reachable, but model "{model}" was not returned by the API.'
return "OpenAI is ready."
def ollama_status_message(installed: bool, running: bool, model_available: bool, model: str) -> str:
if not installed:
return "Ollama is not installed."
if not running:
return "Ollama is installed but not running."
if not model_available:
return f'Ollama is running, but model "{model}" is not installed.'
return "Ollama is ready."
def find_ollama_executable() -> Path | None:
candidates = [
shutil.which("ollama"),
os.environ.get("OLLAMA_EXE"),
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Ollama", "Ollama.exe"),
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Ollama", "ollama.exe"),
os.path.join(os.environ.get("ProgramFiles", ""), "Ollama", "Ollama.exe"),
os.path.join(os.environ.get("ProgramFiles", ""), "Ollama", "ollama.exe"),
os.path.join(os.environ.get("ProgramFiles(x86)", ""), "Ollama", "Ollama.exe"),
os.path.join(os.environ.get("ProgramFiles(x86)", ""), "Ollama", "ollama.exe"),
]
for candidate in candidates:
if not candidate:
continue
path = Path(candidate)
if path.exists():
return path
return None
def find_ollama_cli() -> Path | None:
candidates = [
shutil.which("ollama"),
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Ollama", "ollama.exe"),
os.path.join(os.environ.get("ProgramFiles", ""), "Ollama", "ollama.exe"),
os.path.join(os.environ.get("ProgramFiles(x86)", ""), "Ollama", "ollama.exe"),
]
for candidate in candidates:
if not candidate:
continue
path = Path(candidate)
if path.exists():
return path
return None
def ollama_launch_command() -> list[str] | None:
executable = find_ollama_executable()
if not executable:
return None
if executable.name == "Ollama.exe":
return [str(executable)]
return [str(executable), "serve"]
def popen_hidden(command: list[str]) -> subprocess.Popen:
kwargs: dict[str, Any] = {"stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL}
if sys.platform == "win32":
kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
return subprocess.Popen(command, **kwargs)
async def inspect_update() -> dict[str, Any]:
try:
latest = await latest_release()
except (httpx.HTTPError, ValueError) as exc:
return {
"current_version": __version__,
"latest_version": None,
"available": False,
"release_url": RELEASES_URL,
"message": f"Could not check releases: {exc}",
}
if not latest:
return {
"current_version": __version__,
"latest_version": None,
"available": False,
"release_url": RELEASES_URL,
"message": "No releases were found.",
}
latest_version = normalize_version(latest.get("tag_name") or latest.get("name") or "")
asset = release_asset(latest, UPDATE_ASSET_NAME)
available = latest_version is not None and compare_versions(latest_version, __version__) > 0
return {
"current_version": __version__,
"latest_version": latest_version,
"available": available,
"release_name": latest.get("name") or latest.get("tag_name"),
"release_url": latest.get("html_url") or RELEASES_URL,
"asset_name": asset.get("name") if asset else None,
"asset_download_url": asset.get("browser_download_url") if asset else None,
"packaged": bool(getattr(sys, "frozen", False)),
"message": update_message(available, latest_version, bool(asset)),
}
async def latest_release() -> dict[str, Any] | None:
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
response = await client.get(RELEASES_API_URL)
response.raise_for_status()
releases = response.json()
if not isinstance(releases, list):
return None
candidates = [
release
for release in releases
if isinstance(release, dict) and not release.get("draft") and not release.get("prerelease")
]
if not candidates:
candidates = [release for release in releases if isinstance(release, dict) and not release.get("draft")]
if not candidates:
return None
return max(candidates, key=lambda release: version_parts(str(release.get("tag_name") or release.get("name") or "0")))
def release_asset(release: dict[str, Any], name: str) -> dict[str, Any] | None:
assets = release.get("assets") or []
if not isinstance(assets, list):
return None
for asset in assets:
if isinstance(asset, dict) and str(asset.get("name", "")).casefold() == name.casefold():
return asset
for asset in assets:
download_url = str(asset.get("browser_download_url", "")) if isinstance(asset, dict) else ""
if download_url.casefold().endswith(f"/{name.casefold()}"):
return asset
return None
async def download_update_asset(url: str, version: str) -> Path:
updates_dir = Path(settings_payload()["app_data_dir"]) / "updates"
updates_dir.mkdir(parents=True, exist_ok=True)
path = updates_dir / f"TraderAI-{version}.exe"
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
async with client.stream("GET", url) as response:
response.raise_for_status()
with path.open("wb") as file:
async for chunk in response.aiter_bytes():
file.write(chunk)
return path
def write_update_script(source: Path, target: Path) -> Path:
updates_dir = Path(settings_payload()["app_data_dir"]) / "updates"
updates_dir.mkdir(parents=True, exist_ok=True)
script = updates_dir / "apply-update.ps1"
script.write_text(
"\n".join(
[
"param(",
" [Parameter(Mandatory=$true)][int]$ProcessId,",
" [Parameter(Mandatory=$true)][string]$Source,",
" [Parameter(Mandatory=$true)][string]$Target",
")",
"$ErrorActionPreference = 'Stop'",
"try { Wait-Process -Id $ProcessId -Timeout 60 -ErrorAction SilentlyContinue } catch {}",
"Start-Sleep -Seconds 1",
"Copy-Item -LiteralPath $Source -Destination $Target -Force",
"Start-Process -FilePath $Target",
]
)
+ "\n",
encoding="utf-8",
)
return script
def exit_after_update_response() -> None:
time.sleep(1.5)
os._exit(0)
def update_message(available: bool, latest_version: str | None, has_asset: bool) -> str:
if not latest_version:
return "Could not determine the latest release version."
if not available:
return f"TraderAI {__version__} is up to date."
if not has_asset:
return f"TraderAI {latest_version} is available, but the release has no {UPDATE_ASSET_NAME} asset."
return f"TraderAI {latest_version} is available."
def normalize_version(value: str) -> str | None:
text = value.strip()
if text.startswith("v"):
text = text[1:]
parts = text.split(".")
if len(parts) < 2:
return None
return text
def compare_versions(left: str, right: str) -> int:
left_parts = version_parts(left)
right_parts = version_parts(right)
max_len = max(len(left_parts), len(right_parts))
left_parts.extend([0] * (max_len - len(left_parts)))
right_parts.extend([0] * (max_len - len(right_parts)))
return (left_parts > right_parts) - (left_parts < right_parts)
def version_parts(version: str) -> list[int]:
text = normalize_version(version) or "0"
core = text.replace("-", ".").replace("+", ".").split(".")
parts: list[int] = []
for item in core:
digits = ""
for char in item:
if not char.isdigit():
break
digits += char
parts.append(int(digits or 0))
return parts
app = create_app()