638 lines
23 KiB
Python
638 lines
23 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import webbrowser
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from fastapi import FastAPI
|
|
from fastapi import HTTPException
|
|
from fastapi.responses import FileResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
|
|
from traderai.agent import OllamaAgent, OllamaUnavailable
|
|
from traderai.config import save_settings, settings_payload
|
|
from traderai.config import get_settings
|
|
from traderai.memory import DEFAULT_THREAD_ID, MemoryStore
|
|
from traderai.scheduler import WakeScheduler
|
|
from traderai.scmdb_client import SCMDBClient
|
|
from traderai.tools import ToolRegistry
|
|
from traderai.uex_client import UEXClient
|
|
from traderai.version import RELEASES_API_URL, RELEASES_URL, __version__
|
|
|
|
|
|
def resource_path(*parts: str) -> Path:
|
|
base = Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent.parent))
|
|
return base.joinpath(*parts)
|
|
|
|
|
|
class ChatRequest(BaseModel):
|
|
message: str
|
|
thread_id: str | None = DEFAULT_THREAD_ID
|
|
|
|
|
|
class ChatThreadRequest(BaseModel):
|
|
title: str | None = None
|
|
|
|
|
|
class RenameChatThreadRequest(BaseModel):
|
|
title: str
|
|
|
|
|
|
class DirectNegotiationMessageRequest(BaseModel):
|
|
message: str
|
|
|
|
|
|
class ClearMemoryRequest(BaseModel):
|
|
include_memories: bool = True
|
|
include_conversations: bool = True
|
|
include_profile: bool = False
|
|
include_jobs: bool = False
|
|
include_outbox: bool = True
|
|
|
|
|
|
class ConfigUpdateRequest(BaseModel):
|
|
values: dict
|
|
|
|
|
|
class OllamaModelRequest(BaseModel):
|
|
model: str | None = None
|
|
|
|
|
|
OLLAMA_DOWNLOAD_URL = "https://ollama.com/download/windows"
|
|
UPDATE_ASSET_NAME = "TraderAI.exe"
|
|
|
|
|
|
def create_app() -> FastAPI:
|
|
settings = get_settings()
|
|
memory = MemoryStore(settings.traderai_memory_path)
|
|
scheduler = WakeScheduler(memory)
|
|
uex = UEXClient(settings.uex_base_url, settings.uex_secret_key, settings.uex_bearer_token)
|
|
scmdb = SCMDBClient(settings.scmdb_base_url)
|
|
tools = ToolRegistry(uex, settings.require_write_approval, memory=memory, scheduler=scheduler, scmdb=scmdb)
|
|
agent = OllamaAgent(
|
|
settings.ollama_base_url,
|
|
settings.ollama_model,
|
|
tools,
|
|
memory=memory,
|
|
user_name=settings.traderai_user_name,
|
|
num_ctx=settings.ollama_num_ctx,
|
|
)
|
|
scheduler.bind_agent(agent)
|
|
scheduler.bind_uex_notifications(uex, settings.uex_notification_poll_seconds)
|
|
|
|
app = FastAPI(title="TraderAI")
|
|
static_dir = resource_path("web")
|
|
app.mount("/static", StaticFiles(directory=static_dir), name="static")
|
|
|
|
@app.on_event("startup")
|
|
async def startup() -> None:
|
|
await refresh_user_profile()
|
|
scheduler.start()
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown() -> None:
|
|
scheduler.shutdown()
|
|
|
|
async def refresh_user_profile() -> None:
|
|
if settings.traderai_user_name:
|
|
memory.set_profile("configured_name", settings.traderai_user_name)
|
|
agent.user_name = agent.user_name or settings.traderai_user_name
|
|
|
|
try:
|
|
response = await uex.get_user(authenticated=True)
|
|
except Exception as exc:
|
|
memory.set_profile("uex_user_error", str(exc))
|
|
if settings.traderai_user_name:
|
|
try:
|
|
response = await uex.get_user(username=settings.traderai_user_name)
|
|
except Exception:
|
|
return
|
|
else:
|
|
return
|
|
|
|
data = response.get("user")
|
|
if data:
|
|
memory.set_profile("uex_user", data)
|
|
username = data.get("username") or data.get("user_username") or data.get("name")
|
|
if username:
|
|
agent.user_name = username
|
|
|
|
@app.get("/")
|
|
async def index() -> FileResponse:
|
|
return FileResponse(static_dir / "index.html")
|
|
|
|
@app.get("/api/health")
|
|
async def health() -> dict:
|
|
return {
|
|
"ollama": await agent.health(),
|
|
"user": memory.get_profile(),
|
|
"jobs": scheduler.list_jobs(),
|
|
"app_data_dir": settings_payload()["app_data_dir"],
|
|
"version": __version__,
|
|
}
|
|
|
|
@app.get("/api/config")
|
|
async def inspect_config() -> dict:
|
|
return settings_payload()
|
|
|
|
@app.post("/api/config")
|
|
async def update_config(request: ConfigUpdateRequest) -> dict:
|
|
updated = save_settings(request.values)
|
|
updated["restart_required"] = True
|
|
updated["message"] = "Configuration saved. Restart TraderAI for all settings to take effect."
|
|
return updated
|
|
|
|
@app.get("/api/ollama/status")
|
|
async def ollama_status() -> dict:
|
|
return await inspect_ollama()
|
|
|
|
@app.post("/api/ollama/launch")
|
|
async def launch_ollama() -> dict:
|
|
command = ollama_launch_command()
|
|
if not command:
|
|
raise HTTPException(status_code=404, detail="Ollama is not installed or was not found on PATH.")
|
|
try:
|
|
popen_hidden(command)
|
|
except OSError as exc:
|
|
raise HTTPException(status_code=500, detail=f"Could not launch Ollama: {exc}") from exc
|
|
status = await inspect_ollama()
|
|
status["message"] = "Ollama launch requested."
|
|
return status
|
|
|
|
@app.post("/api/ollama/pull")
|
|
async def pull_ollama_model(request: OllamaModelRequest) -> dict:
|
|
settings_now = get_settings()
|
|
model = (request.model or settings_now.ollama_model).strip()
|
|
if not model:
|
|
raise HTTPException(status_code=400, detail="No Ollama model is configured.")
|
|
cli = find_ollama_cli()
|
|
if not cli:
|
|
raise HTTPException(status_code=404, detail="Ollama CLI was not found.")
|
|
try:
|
|
popen_hidden([str(cli), "pull", model])
|
|
except OSError as exc:
|
|
raise HTTPException(status_code=500, detail=f"Could not start model install: {exc}") from exc
|
|
status = await inspect_ollama()
|
|
status["message"] = f"Started installing model {model}."
|
|
return status
|
|
|
|
@app.post("/api/ollama/install")
|
|
async def install_ollama() -> dict:
|
|
winget = shutil.which("winget")
|
|
if not winget:
|
|
return {
|
|
"started": False,
|
|
"message": "winget is not available on this system. Open the download page instead.",
|
|
"download_url": OLLAMA_DOWNLOAD_URL,
|
|
}
|
|
try:
|
|
popen_hidden(
|
|
[
|
|
winget,
|
|
"install",
|
|
"-e",
|
|
"--id",
|
|
"Ollama.Ollama",
|
|
"--accept-package-agreements",
|
|
"--accept-source-agreements",
|
|
]
|
|
)
|
|
except OSError as exc:
|
|
raise HTTPException(status_code=500, detail=f"Could not start Ollama install: {exc}") from exc
|
|
return {"started": True, "message": "Started Ollama install with winget.", "download_url": OLLAMA_DOWNLOAD_URL}
|
|
|
|
@app.post("/api/ollama/download")
|
|
async def download_ollama() -> dict:
|
|
webbrowser.open(OLLAMA_DOWNLOAD_URL)
|
|
return {"opened": True, "download_url": OLLAMA_DOWNLOAD_URL, "message": "Opened the Ollama download page."}
|
|
|
|
@app.get("/api/update/check")
|
|
async def check_update() -> dict:
|
|
return await inspect_update()
|
|
|
|
@app.post("/api/update/install")
|
|
async def install_update() -> dict:
|
|
update = await inspect_update()
|
|
if not update["available"]:
|
|
return {**update, "message": "TraderAI is already up to date."}
|
|
if not getattr(sys, "frozen", False):
|
|
return {
|
|
**update,
|
|
"started": False,
|
|
"message": "Update download is available, but self-update only runs from the packaged exe.",
|
|
}
|
|
|
|
asset_url = update.get("asset_download_url")
|
|
if not asset_url:
|
|
raise HTTPException(status_code=404, detail="The latest release does not include TraderAI.exe.")
|
|
|
|
downloaded = await download_update_asset(asset_url, update["latest_version"])
|
|
script = write_update_script(downloaded, Path(sys.executable))
|
|
updater_command = [
|
|
"powershell",
|
|
"-NoProfile",
|
|
"-ExecutionPolicy",
|
|
"Bypass",
|
|
"-File",
|
|
str(script),
|
|
"-ProcessId",
|
|
str(os.getpid()),
|
|
"-Source",
|
|
str(downloaded),
|
|
"-Target",
|
|
str(Path(sys.executable)),
|
|
]
|
|
updater_kwargs: dict[str, Any] = {"stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL}
|
|
if sys.platform == "win32":
|
|
updater_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
|
|
subprocess.Popen(updater_command, **updater_kwargs)
|
|
threading.Thread(target=exit_after_update_response, daemon=True).start()
|
|
return {**update, "started": True, "message": "Update downloaded. TraderAI will restart into the new version."}
|
|
|
|
@app.post("/api/chat")
|
|
async def chat(request: ChatRequest) -> dict:
|
|
try:
|
|
return await agent.chat(request.message, thread_id=request.thread_id)
|
|
except OllamaUnavailable as exc:
|
|
raise HTTPException(status_code=503, detail=str(exc)) from exc
|
|
|
|
@app.post("/api/chat/stream")
|
|
async def chat_stream(request: ChatRequest) -> StreamingResponse:
|
|
async def events():
|
|
async for event in agent.chat_events(request.message, thread_id=request.thread_id):
|
|
yield f"data: {json.dumps(event)}\n\n"
|
|
|
|
return StreamingResponse(events(), media_type="text/event-stream")
|
|
|
|
@app.get("/api/chats")
|
|
async def chats() -> dict:
|
|
return {"chats": memory.list_threads()}
|
|
|
|
@app.post("/api/chats")
|
|
async def create_chat(request: ChatThreadRequest) -> dict:
|
|
return {"chat": memory.create_thread(request.title)}
|
|
|
|
@app.get("/api/chats/{thread_id}/messages")
|
|
async def chat_messages(thread_id: str) -> dict:
|
|
memory.ensure_thread(thread_id)
|
|
return {"thread_id": thread_id, "messages": memory.recent_conversation(limit=200, thread_id=thread_id)}
|
|
|
|
@app.delete("/api/chats/{thread_id}")
|
|
async def delete_chat(thread_id: str) -> dict:
|
|
deleted = memory.delete_thread(thread_id)
|
|
return {"deleted": deleted, "chats": memory.list_threads()}
|
|
|
|
@app.patch("/api/chats/{thread_id}")
|
|
async def rename_chat(thread_id: str, request: RenameChatThreadRequest) -> dict:
|
|
chat = memory.rename_thread(thread_id, request.title)
|
|
if not chat:
|
|
raise HTTPException(status_code=400, detail="A non-empty chat title is required.")
|
|
return {"chat": chat, "chats": memory.list_threads()}
|
|
|
|
@app.get("/api/pending-actions")
|
|
async def pending_actions() -> dict:
|
|
return {"pending_actions": agent._pending_payloads()}
|
|
|
|
@app.get("/api/notifications")
|
|
async def notifications() -> dict:
|
|
return {"notifications": memory.undelivered_outbox()}
|
|
|
|
@app.get("/api/inbox")
|
|
async def inbox() -> dict:
|
|
return {"inbox": memory.list_outbox()}
|
|
|
|
@app.post("/api/inbox/{inbox_id}/continue")
|
|
async def continue_inbox(inbox_id: int) -> dict:
|
|
item = memory.get_outbox(inbox_id)
|
|
if not item:
|
|
raise HTTPException(status_code=404, detail="Inbox item not found.")
|
|
thread = memory.create_thread("Inbox follow-up")
|
|
memory.add_conversation("assistant", item["content"], thread["id"])
|
|
return {"chat": thread, "message": item}
|
|
|
|
@app.delete("/api/inbox/{inbox_id}")
|
|
async def delete_inbox(inbox_id: int) -> dict:
|
|
deleted = memory.delete_outbox(inbox_id)
|
|
return {"deleted": deleted, "inbox": memory.list_outbox()}
|
|
|
|
@app.get("/api/negotiations/{identifier}/messages")
|
|
async def negotiation_messages(identifier: str) -> dict:
|
|
params = negotiation_identifier_params(identifier)
|
|
return await uex.get("marketplace_negotiations_messages", params, authenticated=True)
|
|
|
|
@app.post("/api/negotiations/{identifier}/messages")
|
|
async def send_negotiation_message(identifier: str, request: DirectNegotiationMessageRequest) -> dict:
|
|
params = negotiation_identifier_params(identifier)
|
|
payload = {**params, "message": request.message, "is_production": 1}
|
|
return await uex.post("marketplace_negotiations_messages", payload, authenticated=True)
|
|
|
|
@app.get("/api/wake-jobs")
|
|
async def wake_jobs() -> dict:
|
|
return {"scheduled_jobs": scheduler.list_jobs()}
|
|
|
|
@app.get("/api/memory")
|
|
async def inspect_memory(limit: int = 50) -> dict:
|
|
return memory.inspect(max(1, min(limit, 200)))
|
|
|
|
@app.post("/api/memory/clear")
|
|
async def clear_memory(request: ClearMemoryRequest) -> dict:
|
|
if request.include_jobs:
|
|
scheduler.shutdown()
|
|
deleted = memory.clear(
|
|
include_memories=request.include_memories,
|
|
include_conversations=request.include_conversations,
|
|
include_profile=request.include_profile,
|
|
include_jobs=request.include_jobs,
|
|
include_outbox=request.include_outbox,
|
|
)
|
|
if request.include_jobs:
|
|
scheduler.start()
|
|
return {"deleted": deleted, "memory": memory.inspect(50)}
|
|
|
|
@app.post("/api/approve/{action_id}")
|
|
async def approve(action_id: str) -> dict:
|
|
return await tools.approve(action_id)
|
|
|
|
@app.post("/api/decline/{action_id}")
|
|
async def decline(action_id: str) -> dict:
|
|
return await tools.decline(action_id)
|
|
|
|
return app
|
|
|
|
|
|
def negotiation_identifier_params(identifier: str) -> dict[str, Any]:
|
|
value = identifier.strip()
|
|
if not value:
|
|
raise HTTPException(status_code=400, detail="Negotiation id or hash is required.")
|
|
if value.isdigit():
|
|
return {"id_negotiation": int(value)}
|
|
return {"hash": value}
|
|
|
|
|
|
async def inspect_ollama() -> dict[str, Any]:
|
|
settings = get_settings()
|
|
executable = find_ollama_executable()
|
|
cli = find_ollama_cli()
|
|
models: list[str] = []
|
|
online = False
|
|
detail = ""
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=3) as client:
|
|
response = await client.get(f"{settings.ollama_base_url.rstrip('/')}/api/tags")
|
|
response.raise_for_status()
|
|
body = response.json()
|
|
online = True
|
|
models = [item.get("name") or item.get("model") for item in body.get("models", [])]
|
|
models = [model for model in models if model]
|
|
except (httpx.HTTPError, ValueError) as exc:
|
|
detail = str(exc)
|
|
|
|
installed = bool(executable or cli)
|
|
model_available = settings.ollama_model in models
|
|
return {
|
|
"installed": installed,
|
|
"running": online,
|
|
"online": online,
|
|
"model_available": model_available,
|
|
"configured_model": settings.ollama_model,
|
|
"base_url": settings.ollama_base_url,
|
|
"num_ctx": settings.ollama_num_ctx,
|
|
"models": models,
|
|
"executable": str(executable) if executable else None,
|
|
"cli": str(cli) if cli else None,
|
|
"can_auto_install": bool(shutil.which("winget")),
|
|
"download_url": OLLAMA_DOWNLOAD_URL,
|
|
"message": ollama_status_message(installed, online, model_available, settings.ollama_model),
|
|
"detail": detail,
|
|
}
|
|
|
|
|
|
def ollama_status_message(installed: bool, running: bool, model_available: bool, model: str) -> str:
|
|
if not installed:
|
|
return "Ollama is not installed."
|
|
if not running:
|
|
return "Ollama is installed but not running."
|
|
if not model_available:
|
|
return f'Ollama is running, but model "{model}" is not installed.'
|
|
return "Ollama is ready."
|
|
|
|
|
|
def find_ollama_executable() -> Path | None:
|
|
candidates = [
|
|
shutil.which("ollama"),
|
|
os.environ.get("OLLAMA_EXE"),
|
|
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Ollama", "Ollama.exe"),
|
|
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Ollama", "ollama.exe"),
|
|
os.path.join(os.environ.get("ProgramFiles", ""), "Ollama", "Ollama.exe"),
|
|
os.path.join(os.environ.get("ProgramFiles", ""), "Ollama", "ollama.exe"),
|
|
os.path.join(os.environ.get("ProgramFiles(x86)", ""), "Ollama", "Ollama.exe"),
|
|
os.path.join(os.environ.get("ProgramFiles(x86)", ""), "Ollama", "ollama.exe"),
|
|
]
|
|
for candidate in candidates:
|
|
if not candidate:
|
|
continue
|
|
path = Path(candidate)
|
|
if path.exists():
|
|
return path
|
|
return None
|
|
|
|
|
|
def find_ollama_cli() -> Path | None:
|
|
candidates = [
|
|
shutil.which("ollama"),
|
|
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Ollama", "ollama.exe"),
|
|
os.path.join(os.environ.get("ProgramFiles", ""), "Ollama", "ollama.exe"),
|
|
os.path.join(os.environ.get("ProgramFiles(x86)", ""), "Ollama", "ollama.exe"),
|
|
]
|
|
for candidate in candidates:
|
|
if not candidate:
|
|
continue
|
|
path = Path(candidate)
|
|
if path.exists():
|
|
return path
|
|
return None
|
|
|
|
|
|
def ollama_launch_command() -> list[str] | None:
|
|
executable = find_ollama_executable()
|
|
if not executable:
|
|
return None
|
|
if executable.name == "Ollama.exe":
|
|
return [str(executable)]
|
|
return [str(executable), "serve"]
|
|
|
|
|
|
def popen_hidden(command: list[str]) -> subprocess.Popen:
|
|
kwargs: dict[str, Any] = {"stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL}
|
|
if sys.platform == "win32":
|
|
kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
|
|
return subprocess.Popen(command, **kwargs)
|
|
|
|
|
|
async def inspect_update() -> dict[str, Any]:
|
|
try:
|
|
latest = await latest_release()
|
|
except (httpx.HTTPError, ValueError) as exc:
|
|
return {
|
|
"current_version": __version__,
|
|
"latest_version": None,
|
|
"available": False,
|
|
"release_url": RELEASES_URL,
|
|
"message": f"Could not check releases: {exc}",
|
|
}
|
|
if not latest:
|
|
return {
|
|
"current_version": __version__,
|
|
"latest_version": None,
|
|
"available": False,
|
|
"release_url": RELEASES_URL,
|
|
"message": "No releases were found.",
|
|
}
|
|
|
|
latest_version = normalize_version(latest.get("tag_name") or latest.get("name") or "")
|
|
asset = release_asset(latest, UPDATE_ASSET_NAME)
|
|
available = latest_version is not None and compare_versions(latest_version, __version__) > 0
|
|
return {
|
|
"current_version": __version__,
|
|
"latest_version": latest_version,
|
|
"available": available,
|
|
"release_name": latest.get("name") or latest.get("tag_name"),
|
|
"release_url": latest.get("html_url") or RELEASES_URL,
|
|
"asset_name": asset.get("name") if asset else None,
|
|
"asset_download_url": asset.get("browser_download_url") if asset else None,
|
|
"packaged": bool(getattr(sys, "frozen", False)),
|
|
"message": update_message(available, latest_version, bool(asset)),
|
|
}
|
|
|
|
|
|
async def latest_release() -> dict[str, Any] | None:
|
|
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
|
|
response = await client.get(RELEASES_API_URL)
|
|
response.raise_for_status()
|
|
releases = response.json()
|
|
if not isinstance(releases, list):
|
|
return None
|
|
candidates = [
|
|
release
|
|
for release in releases
|
|
if isinstance(release, dict) and not release.get("draft") and not release.get("prerelease")
|
|
]
|
|
if not candidates:
|
|
candidates = [release for release in releases if isinstance(release, dict) and not release.get("draft")]
|
|
if not candidates:
|
|
return None
|
|
return max(candidates, key=lambda release: version_parts(str(release.get("tag_name") or release.get("name") or "0")))
|
|
|
|
|
|
def release_asset(release: dict[str, Any], name: str) -> dict[str, Any] | None:
|
|
assets = release.get("assets") or []
|
|
if not isinstance(assets, list):
|
|
return None
|
|
for asset in assets:
|
|
if isinstance(asset, dict) and str(asset.get("name", "")).casefold() == name.casefold():
|
|
return asset
|
|
for asset in assets:
|
|
download_url = str(asset.get("browser_download_url", "")) if isinstance(asset, dict) else ""
|
|
if download_url.casefold().endswith(f"/{name.casefold()}"):
|
|
return asset
|
|
return None
|
|
|
|
|
|
async def download_update_asset(url: str, version: str) -> Path:
|
|
updates_dir = Path(settings_payload()["app_data_dir"]) / "updates"
|
|
updates_dir.mkdir(parents=True, exist_ok=True)
|
|
path = updates_dir / f"TraderAI-{version}.exe"
|
|
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
|
|
async with client.stream("GET", url) as response:
|
|
response.raise_for_status()
|
|
with path.open("wb") as file:
|
|
async for chunk in response.aiter_bytes():
|
|
file.write(chunk)
|
|
return path
|
|
|
|
|
|
def write_update_script(source: Path, target: Path) -> Path:
|
|
updates_dir = Path(settings_payload()["app_data_dir"]) / "updates"
|
|
updates_dir.mkdir(parents=True, exist_ok=True)
|
|
script = updates_dir / "apply-update.ps1"
|
|
script.write_text(
|
|
"\n".join(
|
|
[
|
|
"param(",
|
|
" [Parameter(Mandatory=$true)][int]$ProcessId,",
|
|
" [Parameter(Mandatory=$true)][string]$Source,",
|
|
" [Parameter(Mandatory=$true)][string]$Target",
|
|
")",
|
|
"$ErrorActionPreference = 'Stop'",
|
|
"try { Wait-Process -Id $ProcessId -Timeout 60 -ErrorAction SilentlyContinue } catch {}",
|
|
"Start-Sleep -Seconds 1",
|
|
"Copy-Item -LiteralPath $Source -Destination $Target -Force",
|
|
"Start-Process -FilePath $Target",
|
|
]
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
return script
|
|
|
|
|
|
def exit_after_update_response() -> None:
|
|
time.sleep(1.5)
|
|
os._exit(0)
|
|
|
|
|
|
def update_message(available: bool, latest_version: str | None, has_asset: bool) -> str:
|
|
if not latest_version:
|
|
return "Could not determine the latest release version."
|
|
if not available:
|
|
return f"TraderAI {__version__} is up to date."
|
|
if not has_asset:
|
|
return f"TraderAI {latest_version} is available, but the release has no {UPDATE_ASSET_NAME} asset."
|
|
return f"TraderAI {latest_version} is available."
|
|
|
|
|
|
def normalize_version(value: str) -> str | None:
|
|
text = value.strip()
|
|
if text.startswith("v"):
|
|
text = text[1:]
|
|
parts = text.split(".")
|
|
if len(parts) < 2:
|
|
return None
|
|
return text
|
|
|
|
|
|
def compare_versions(left: str, right: str) -> int:
|
|
left_parts = version_parts(left)
|
|
right_parts = version_parts(right)
|
|
max_len = max(len(left_parts), len(right_parts))
|
|
left_parts.extend([0] * (max_len - len(left_parts)))
|
|
right_parts.extend([0] * (max_len - len(right_parts)))
|
|
return (left_parts > right_parts) - (left_parts < right_parts)
|
|
|
|
|
|
def version_parts(version: str) -> list[int]:
|
|
text = normalize_version(version) or "0"
|
|
core = text.replace("-", ".").replace("+", ".").split(".")
|
|
parts: list[int] = []
|
|
for item in core:
|
|
digits = ""
|
|
for char in item:
|
|
if not char.isdigit():
|
|
break
|
|
digits += char
|
|
parts.append(int(digits or 0))
|
|
return parts
|
|
|
|
|
|
app = create_app()
|