Files
2026-06-09 11:24:15 -04:00

1927 lines
87 KiB
Python

from __future__ import annotations
import asyncio
import json
import re
import shutil
import subprocess
import tempfile
import uuid
from collections.abc import AsyncIterator
from contextlib import nullcontext
from pathlib import Path
from typing import Any
import httpx
from tzlocal import get_localzone
from traderai.memory import DEFAULT_THREAD_ID, MemoryStore, iso_now, iso_now_in_zone, time_since
from traderai.tools import ToolRegistry
from traderai.version import __version__
SYSTEM_PROMPT = """You are TraderAI, a sharp Star Citizen marketplace copilot for UEX work.
Sound like a competent player who knows the game and the market. Be natural, direct, and helpful. Avoid corporate filler, robotic phrasing, and meta notes.
Use tools when the user asks about UEX data, open/current listings, active negotiations, unread notifications, messages, offers, or posting ads.
Prefer locally synced negotiation tools before live UEX negotiation reads when local context is available.
Use continual plan tools when the user asks for multi-day or recurring marketplace work, such as finding several parts, watching for deals, tracking candidates, or coordinating negotiations over time.
UEX credentials are configured server-side when available. Never ask the user to provide UEX_SECRET_KEY or UEX_BEARER_TOKEN in chat; call the authenticated UEX tool and only mention credential configuration if the tool returns an authentication error.
Use the specific UEX tool for the needed endpoint, such as get_uex_commodities_prices or get_uex_vehicles. Use fields, limit, and summary mode so tool results stay compact.
When the user asks for history, trends, changes over time, or past prices, prefer the summarize_uex_*_history tools when available; use search_uex_api_index(history_only=true) if you need to discover history endpoints.
When you need missing Star Citizen knowledge to answer accurately, use Star Citizen Wiki tools during your reasoning instead of guessing.
Use SCMDB tools when the user asks about Star Citizen missions/contracts, mission rewards, payouts, reputation gains, item rewards, blueprint rewards, or hauling mission cargo. Prefer SCMDB live data unless the user asks for PTU or a specific game version.
Use Star Citizen Wiki tools for general game knowledge, ships and vehicles, store availability, purchase locations, ship prices, manufacturers, locations, and page summaries from starcitizen.tools.
Use Wikelo ship project tools when the user asks for Wikelo ship requirements, Wikelo build materials, or what items are needed for a Wikelo ship project.
Use Cornerstone tools when the user asks where an item is sold, which shops carry an item, item store locations, in-game item base prices, or Universal Item Finder data.
When drafting UEX marketplace item posts that need images, use Cornerstone media tools or draft_marketplace_listing_with_cornerstone_image so the pending listing can include UEX image_data sourced from Cornerstone.
Prefer open and current UEX marketplace information. Do not use historical sale data, completed sale records, or sale/average-history information unless the user explicitly asks for historical sales.
Treat UEX marketplace prices as in-game aUEC/UEC credits, never real-world dollars, unless the user explicitly says otherwise.
For marketplace writes, draft the exact pending action and tell the user what will be sent; never claim it was sent until approval succeeds.
When drafting negotiation messages or marketplace replies, write like a real player would. Keep messages human, concise, and purposeful. Never include internal notes like "Tone note".
The user can manually send their own negotiation messages directly from the negotiations workspace, but you must still use approval-gated draft actions for AI-authored replies and deal-close submissions.
For continual plans, never invent an unknown parts checklist. If the required items cannot be derived from provided details or tools, create the plan in a needs-input state and say what item list is missing.
When a scheduled wake job fires, always write a concise Inbox-ready result that says what you checked, the key findings, and the suggested next action.
Keep prices, listing ids, slugs, users, and UEX status codes precise. If data is missing, say what you need next."""
class OllamaAgent:
def __init__(
self,
base_url: str,
model: str,
tools: ToolRegistry,
memory: MemoryStore | None = None,
user_name: str | None = None,
num_ctx: int | None = None,
provider: str = "ollama",
api_key: str | None = None,
reasoning_effort: str = "medium",
) -> None:
self.base_url = base_url.rstrip("/")
self.model = model
self.tools = tools
self.memory = memory
self.user_name = user_name
self.num_ctx = num_ctx
self.provider = provider.strip().casefold() or "ollama"
self.api_key = api_key
self.reasoning_effort = reasoning_effort.strip().casefold() or "medium"
self.thread_messages: dict[str, list[dict[str, Any]]] = {}
async def health(self) -> dict[str, Any]:
if self._is_openai_compatible_provider():
return await self._openai_health()
if self.provider == "codex":
return await self._codex_health()
try:
async with httpx.AsyncClient(timeout=3) as client:
response = await client.get(f"{self.base_url}/api/tags")
response.raise_for_status()
body = response.json()
except (httpx.HTTPError, ValueError) as exc:
return {
"online": False,
"model": self.model,
"base_url": self.base_url,
"message": f"Ollama is offline or unreachable at {self.base_url}. Open the Ollama tab and use the recommended action.",
"detail": str(exc),
}
models = [model.get("name") or model.get("model") for model in body.get("models", [])]
return {
"online": True,
"model": self.model,
"base_url": self.base_url,
"model_available": self.model in models,
"models": models,
"message": "Ollama is online.",
}
async def ensure_available(self) -> None:
health = await self.health()
if not health["online"]:
raise OllamaUnavailable(health["message"])
if health.get("model_available") is False:
raise OllamaUnavailable(health["message"])
async def chat(
self,
content: str,
thread_id: str | None = DEFAULT_THREAD_ID,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
await self.ensure_available()
resolved_thread_id = self._thread_id(thread_id)
messages = self._messages_for_thread(resolved_thread_id)
previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None
normalized_images = self._normalize_images(images)
prompt_text = self._prompt_text(content, len(normalized_images))
memory_content = self._conversation_content(content, len(normalized_images))
if self.memory:
self.memory.add_conversation("user", memory_content, resolved_thread_id)
await self._title_first_message(resolved_thread_id, prompt_text, previous_interaction)
messages.append(self._user_message(prompt_text, normalized_images))
last_tool_results: list[dict[str, Any]] = []
image_scope = self.tools.chat_image_scope(normalized_images) if hasattr(self.tools, "chat_image_scope") else nullcontext()
with image_scope:
for _ in self._tool_rounds():
try:
response = await self._chat_once(
prompt_text,
messages,
previous_interaction=previous_interaction,
thread_id=resolved_thread_id,
)
except Exception as exc:
if not last_tool_results:
raise
answer = self._tool_result_fallback(
last_tool_results,
f"The {self._provider_label()} stopped after the tool call: {exc}",
)
messages.append({"role": "assistant", "content": answer})
if self.memory:
self.memory.add_conversation("assistant", answer, resolved_thread_id)
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
message = response.get("message") or {}
tool_calls = message.get("tool_calls") or []
if not tool_calls:
answer = message.get("content", "")
if not answer.strip():
answer = self._empty_response_fallback(last_tool_results)
messages.append({"role": "assistant", "content": answer})
if self.memory:
self.memory.add_conversation("assistant", answer, resolved_thread_id)
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
messages.append(message)
for call in tool_calls:
name, arguments = self._extract_call(call)
result = await self.tools.execute(name, arguments)
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "tool_call_id": call.get("id"), "content": json.dumps(result)})
fallback = self._tool_round_limit_message()
messages.append({"role": "assistant", "content": fallback})
if self.memory:
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
return {"message": fallback, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
async def chat_events(
self,
content: str,
thread_id: str | None = DEFAULT_THREAD_ID,
images: list[dict[str, Any]] | None = None,
) -> AsyncIterator[dict[str, Any]]:
health = await self.health()
if not health["online"]:
yield {"type": "warning", "message": health["message"]}
yield {"type": "done", "pending_actions": self._pending_payloads()}
return
resolved_thread_id = self._thread_id(thread_id)
messages = self._messages_for_thread(resolved_thread_id)
previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None
normalized_images = self._normalize_images(images)
prompt_text = self._prompt_text(content, len(normalized_images))
memory_content = self._conversation_content(content, len(normalized_images))
if self.memory:
self.memory.add_conversation("user", memory_content, resolved_thread_id)
await self._title_first_message(resolved_thread_id, prompt_text, previous_interaction)
messages.append(self._user_message(prompt_text, normalized_images))
yield {"type": "status", "message": "Thinking"}
last_tool_results: list[dict[str, Any]] = []
image_scope = self.tools.chat_image_scope(normalized_images) if hasattr(self.tools, "chat_image_scope") else nullcontext()
with image_scope:
for _ in self._tool_rounds():
assistant_message: dict[str, Any] = {"role": "assistant", "content": ""}
tool_calls: list[dict[str, Any]] = []
try:
async for event in self._chat_stream_once(
prompt_text,
messages,
previous_interaction=previous_interaction,
thread_id=resolved_thread_id,
):
if event.get("type") == "reasoning":
reasoning_chunk = event.get("content") or ""
if reasoning_chunk:
assistant_message["reasoning_content"] = assistant_message.get("reasoning_content", "") + reasoning_chunk
yield {"type": "reasoning", "content": reasoning_chunk}
continue
message = event.get("message") or {}
if message.get("reasoning_content"):
assistant_message["reasoning_content"] = message.get("reasoning_content")
chunk = message.get("content") or ""
if chunk:
assistant_message["content"] += chunk
yield {"type": "token", "content": chunk}
if message.get("tool_calls"):
tool_calls.extend(message["tool_calls"])
if event.get("done"):
metrics = self._stream_metrics(event)
if metrics:
yield {"type": "metrics", **metrics}
except Exception as exc:
if not last_tool_results:
yield {"type": "warning", "message": f"Chat failed before any tool result was available: {exc}"}
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
fallback = self._tool_result_fallback(
last_tool_results,
f"The {self._provider_label()} stopped after the tool call: {exc}",
)
assistant_message["content"] = fallback
messages.append(assistant_message)
if self.memory:
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
yield {"type": "token", "content": fallback}
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
if not tool_calls:
if not assistant_message.get("content", "").strip():
fallback = self._empty_response_fallback(last_tool_results)
assistant_message["content"] = fallback
yield {"type": "token", "content": fallback}
messages.append(assistant_message)
if self.memory:
self.memory.add_conversation("assistant", assistant_message.get("content", ""), resolved_thread_id)
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
assistant_message["tool_calls"] = tool_calls
messages.append(assistant_message)
for call in tool_calls:
name, arguments = self._extract_call(call)
yield {"type": "status", "message": self._tool_status(name)}
result = await self.tools.execute(name, arguments)
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "tool_call_id": call.get("id"), "content": json.dumps(result)})
yield {"type": "status", "message": "Writing response"}
fallback = self._tool_round_limit_message()
messages.append({"role": "assistant", "content": fallback})
if self.memory:
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
yield {"type": "token", "content": fallback}
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
async def generate_wake_response(self, wake_message: str) -> str:
await self.ensure_available()
messages = self._messages_for_thread("wake")
previous_interaction = self.memory.last_interaction("wake") if self.memory else None
messages.append({"role": "user", "content": wake_message})
last_tool_results: list[dict[str, Any]] = []
for _ in self._tool_rounds():
try:
response = await self._chat_once(
wake_message,
messages,
previous_interaction=previous_interaction,
thread_id="wake",
)
except Exception as exc:
if not last_tool_results:
raise
content = self._tool_result_fallback(
last_tool_results,
f"The {self._provider_label()} stopped after the wake-job tool call: {exc}",
)
messages.append({"role": "assistant", "content": content})
if self.memory:
self.memory.add_conversation("system", wake_message, "wake")
self.memory.add_conversation("assistant", content, "wake")
return content
message = response.get("message") or {}
tool_calls = message.get("tool_calls") or []
if not tool_calls:
content = message.get("content", "")
if not content.strip():
content = self._empty_response_fallback(last_tool_results)
messages.append({"role": "assistant", "content": content})
if self.memory:
self.memory.add_conversation("system", wake_message, "wake")
self.memory.add_conversation("assistant", content, "wake")
return content
messages.append(message)
for call in tool_calls:
name, arguments = self._extract_call(call)
result = await self.tools.execute(name, arguments)
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "tool_call_id": call.get("id"), "content": json.dumps(result)})
content = self._wake_tool_round_limit_message()
messages.append({"role": "assistant", "content": content})
if self.memory:
self.memory.add_conversation("system", wake_message, "wake")
self.memory.add_conversation("assistant", content, "wake")
return content
async def _chat_once(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> dict[str, Any]:
if self._is_openai_compatible_provider():
return await self._openai_chat(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
)
if self.provider == "codex":
return await self._codex_chat(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
)
return await self._ollama_chat(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
)
async def _chat_stream_once(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> AsyncIterator[dict[str, Any]]:
if self._is_openai_compatible_provider():
async for event in self._openai_chat_stream(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
):
yield event
return
if self.provider == "codex":
async for event in self._codex_chat_stream(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
):
yield event
return
async for event in self._ollama_chat_stream(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
):
yield event
async def _ollama_chat(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=120) as client:
response = await client.post(
f"{self.base_url}/api/chat",
json={
"model": self.model,
"messages": self._messages_with_context(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
),
"tools": self.tools.schemas,
"options": self._ollama_options(),
"stream": False,
},
)
response.raise_for_status()
return response.json()
async def _ollama_chat_stream(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> AsyncIterator[dict[str, Any]]:
async with httpx.AsyncClient(timeout=120) as client:
async with client.stream(
"POST",
f"{self.base_url}/api/chat",
json={
"model": self.model,
"messages": self._messages_with_context(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
),
"tools": self.tools.schemas,
"options": self._ollama_options(),
"stream": True,
},
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line:
yield json.loads(line)
async def _openai_chat(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=120) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=self._openai_headers(),
json={
"model": self.model,
"messages": self._openai_messages(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
),
"tools": self.tools.schemas,
"stream": False,
**self._openai_request_options(stream=False),
},
)
response.raise_for_status()
body = response.json()
choice = (body.get("choices") or [{}])[0]
message = choice.get("message") or {}
return {
"message": {
"role": message.get("role", "assistant"),
"reasoning_content": message.get("reasoning_content") or "",
"content": message.get("content") or "",
"tool_calls": message.get("tool_calls") or [],
}
}
async def _openai_chat_stream(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> AsyncIterator[dict[str, Any]]:
tool_calls: dict[int, dict[str, Any]] = {}
async with httpx.AsyncClient(timeout=120) as client:
async with client.stream(
"POST",
f"{self.base_url}/chat/completions",
headers=self._openai_headers(),
json={
"model": self.model,
"messages": self._openai_messages(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
),
"tools": self.tools.schemas,
"stream": True,
**self._openai_request_options(stream=True),
},
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if not line or not line.startswith("data:"):
continue
payload = line.removeprefix("data:").strip()
if not payload:
continue
if payload == "[DONE]":
break
event = json.loads(payload)
if event.get("usage"):
metrics = self._cloud_usage_metrics(event["usage"])
if metrics:
yield {"type": "metrics", **metrics}
choice = (event.get("choices") or [{}])[0]
delta = choice.get("delta") or {}
reasoning_content = delta.get("reasoning_content") or ""
if reasoning_content:
yield {"type": "reasoning", "content": reasoning_content}
content = delta.get("content") or ""
if content:
yield {"message": {"role": "assistant", "content": content}}
for tool_call in delta.get("tool_calls") or []:
self._merge_openai_tool_call(tool_calls, tool_call)
finish_reason = choice.get("finish_reason")
if finish_reason:
message = choice.get("message") or {}
yield {
"message": {
"role": "assistant",
"reasoning_content": message.get("reasoning_content") or "",
"content": "",
"tool_calls": self._ordered_tool_calls(tool_calls),
},
"done": True,
}
return
yield {
"message": {
"role": "assistant",
"content": "",
"tool_calls": self._ordered_tool_calls(tool_calls),
},
"done": True,
}
async def _codex_chat(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> dict[str, Any]:
result = await self._codex_cli_turn(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
)
return self._codex_structured_response(result)
async def _codex_chat_stream(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> AsyncIterator[dict[str, Any]]:
result = await self._codex_cli_turn(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
)
response = self._codex_structured_response(result)
message = response["message"]
if message.get("content"):
yield {"message": {"role": "assistant", "content": message["content"]}}
yield {
"message": {
"role": "assistant",
"content": "",
"tool_calls": message.get("tool_calls") or [],
},
"done": True,
}
def _messages_with_context(
self,
query: str,
messages: list[dict[str, Any]],
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> list[dict[str, Any]]:
attached_image_count = 0
for message in reversed(messages):
if message.get("role") != "user":
continue
attached_image_count = len(message.get("images") or [])
break
stable_context, volatile_context = self._runtime_context_parts(
query,
previous_interaction=previous_interaction,
thread_id=thread_id,
attached_image_count=attached_image_count,
)
contexts = [part for part in ("\n".join(stable_context), "\n".join(volatile_context)) if part]
if not contexts:
return messages
return [messages[0], *({"role": "system", "content": context} for context in contexts), *messages[1:]]
async def _openai_health(self) -> dict[str, Any]:
return await self._cloud_health(self.provider if self._is_openai_compatible_provider() else "openai")
async def _codex_health(self) -> dict[str, Any]:
command = self._codex_command()
if not command:
return {
"online": False,
"model": self.model,
"base_url": self.base_url,
"provider": "codex",
"model_available": False,
"models": [],
"message": "Codex CLI was not found on PATH.",
"detail": "",
}
try:
account, models = await self._codex_app_server_status()
except Exception as exc:
return {
"online": False,
"model": self.model,
"base_url": command,
"provider": "codex",
"model_available": False,
"models": [],
"message": "Codex App Server is installed, but TraderAI could not connect to it.",
"detail": str(exc),
}
logged_in = bool(account)
detail = f"Logged in as {account.get('email')}" if isinstance(account, dict) and account.get("email") else ""
return {
"online": logged_in,
"model": self.model,
"base_url": command,
"provider": "codex",
"model_available": self.model in models if models else bool(self.model),
"models": models,
"message": "Codex App Server is online." if logged_in else "Codex CLI is installed, but not logged in with ChatGPT.",
"detail": detail,
}
async def _cloud_health(self, provider: str) -> dict[str, Any]:
if not self.api_key:
return {
"online": False,
"model": self.model,
"base_url": self.base_url,
"provider": provider,
"model_available": False,
"models": [],
"message": f"{self._provider_label()} is selected, but no API key is configured.",
"detail": "",
}
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(f"{self.base_url}/models", headers=self._openai_headers())
response.raise_for_status()
body = response.json()
except (httpx.HTTPError, ValueError) as exc:
return {
"online": False,
"model": self.model,
"base_url": self.base_url,
"provider": provider,
"model_available": False,
"models": [],
"message": f"{self._provider_label()} is unreachable at {self.base_url} or rejected the API key.",
"detail": str(exc),
}
models = sorted(item.get("id") for item in body.get("data", []) if item.get("id"))
return {
"online": True,
"model": self.model,
"base_url": self.base_url,
"provider": provider,
"model_available": self.model in models,
"models": models,
"message": f"{self._provider_label()} is online.",
}
def _openai_headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self.api_key or ''}",
"Content-Type": "application/json",
}
def _openai_request_options(self, stream: bool) -> dict[str, Any]:
if self.provider == "deepseek":
options: dict[str, Any] = {}
if self.reasoning_effort in {"none", "minimal"}:
options["thinking"] = {"type": "disabled"}
else:
options["thinking"] = {"type": "enabled"}
options["reasoning_effort"] = "max" if self.reasoning_effort in {"xhigh", "max"} else "high"
if stream:
options["stream_options"] = {"include_usage": True}
return options
return {"reasoning_effort": self.reasoning_effort}
def _openai_messages(
self,
query: str,
messages: list[dict[str, Any]],
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for message in self._messages_with_context(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
):
role = message.get("role")
if role not in {"system", "user", "assistant", "tool"}:
continue
entry: dict[str, Any] = {"role": role, "content": message.get("content", "")}
if role == "user" and message.get("images"):
text_content = message.get("content", "")
content_parts: list[dict[str, Any]] = []
content_types = list(message.get("image_content_types") or [])
if text_content:
content_parts.append({"type": "text", "text": text_content})
for index, image_data in enumerate(message.get("images") or []):
content_type = content_types[index] if index < len(content_types) else "image/png"
content_parts.append(
{
"type": "image_url",
"image_url": {"url": f"data:{content_type};base64,{image_data}"},
}
)
entry["content"] = content_parts
if role == "assistant" and message.get("tool_calls"):
entry["tool_calls"] = message["tool_calls"]
if role == "assistant" and message.get("reasoning_content"):
entry["reasoning_content"] = message["reasoning_content"]
if role == "tool":
entry["tool_call_id"] = message.get("tool_call_id") or message.get("tool_name") or "tool"
normalized.append(entry)
return normalized
def _codex_tool_catalog(self) -> list[dict[str, Any]]:
tools: list[dict[str, Any]] = []
for schema in self.tools.schemas:
if schema.get("type") != "function":
continue
function = schema.get("function") or {}
tools.append(
{
"name": function.get("name", ""),
"description": function.get("description", ""),
"parameters": function.get("parameters") or {"type": "object", "properties": {}},
}
)
return tools
def _provider_label(self) -> str:
if self.provider == "openai":
return "OpenAI model"
if self.provider == "deepseek":
return "DeepSeek model"
if self.provider == "codex":
return "Codex model"
return "local model"
def _is_openai_compatible_provider(self) -> bool:
return self.provider in {"openai", "deepseek"}
def _tool_rounds(self):
if self.provider == "deepseek":
while True:
yield None
return
for _ in range(10):
yield None
def _tool_round_limit_message(self) -> str:
return "I hit the tool-call limit while working on that. Try narrowing the request or approve any pending action first."
def _wake_tool_round_limit_message(self) -> str:
return "I hit the tool-call limit while running this scheduled wake job. Check the job prompt or pending approvals."
@staticmethod
def _merge_openai_tool_call(target: dict[int, dict[str, Any]], delta: dict[str, Any]) -> None:
index = int(delta.get("index") or 0)
current = target.setdefault(index, {"id": delta.get("id"), "type": "function", "function": {"name": "", "arguments": ""}})
if delta.get("id"):
current["id"] = delta["id"]
function = delta.get("function") or {}
current_function = current.setdefault("function", {"name": "", "arguments": ""})
if function.get("name"):
current_function["name"] += function["name"]
if function.get("arguments"):
current_function["arguments"] += function["arguments"]
@staticmethod
def _ordered_tool_calls(tool_calls: dict[int, dict[str, Any]]) -> list[dict[str, Any]]:
return [tool_calls[index] for index in sorted(tool_calls)]
async def _codex_cli_turn(
self,
query: str,
messages: list[dict[str, Any]],
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> dict[str, Any]:
return await self._codex_app_server_turn(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
)
async def _codex_app_server_turn(
self,
query: str,
messages: list[dict[str, Any]],
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> dict[str, Any]:
prompt = self._codex_cli_prompt(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
)
final_text = ""
process = await self._start_codex_app_server()
request_id = 1
async def send_request(method: str, params: dict[str, Any] | None = None, timeout: int = 120) -> dict[str, Any]:
nonlocal request_id
current_id = request_id
request_id += 1
payload: dict[str, Any] = {"jsonrpc": "2.0", "id": current_id, "method": method}
if params is not None:
payload["params"] = params
await self._codex_app_server_write(process, payload)
while True:
message = await self._codex_app_server_read(process, timeout=timeout)
if message.get("id") == current_id:
if message.get("error"):
error = message["error"]
raise RuntimeError(error.get("message") or f"Codex App Server request failed: {error}")
return message.get("result") or {}
await self._handle_codex_app_server_message(process, message)
try:
await send_request(
"initialize",
{
"clientInfo": {"name": "TraderAI", "version": __version__},
"capabilities": {"experimentalApi": True},
},
timeout=30,
)
await self._codex_app_server_write(process, {"jsonrpc": "2.0", "method": "initialized", "params": {}})
thread = await send_request(
"thread/start",
{
"model": self.model,
"modelProvider": None,
"cwd": str(Path.cwd()),
"approvalPolicy": "never",
"sandbox": "read-only",
"baseInstructions": "You are TraderAI running through the local Codex App Server using ChatGPT OAuth.",
"developerInstructions": (
"Do not run shell commands, inspect files, or modify the workspace. "
"Answer only with JSON matching the requested output schema."
),
"ephemeral": True,
"experimentalRawEvents": False,
"persistExtendedHistory": False,
},
timeout=30,
)
thread_id_value = ((thread.get("thread") or {}).get("id") or thread.get("threadId") or "").strip()
if not thread_id_value:
raise RuntimeError(f"Codex App Server did not return a thread id: {thread!r}")
turn = await send_request(
"turn/start",
{
"threadId": thread_id_value,
"input": [{"type": "text", "text": prompt, "text_elements": []}],
"cwd": str(Path.cwd()),
"approvalPolicy": "never",
"sandboxPolicy": {"type": "readOnly", "access": {"type": "fullAccess"}},
"model": self.model,
"effort": self.reasoning_effort,
"summary": "none",
"outputSchema": self._codex_output_schema(),
},
timeout=60,
)
turn_id = ((turn.get("turn") or {}).get("id") or "").strip()
if not turn_id:
raise RuntimeError(f"Codex App Server did not return a turn id: {turn!r}")
while True:
message = await self._codex_app_server_read(process, timeout=240)
method = message.get("method")
params = message.get("params") or {}
if method == "item/agentMessage/delta" and params.get("turnId") == turn_id:
final_text += params.get("delta") or ""
elif method == "item/completed" and params.get("turnId") == turn_id:
item = params.get("item") or {}
if item.get("type") == "agentMessage":
final_text = item.get("text") or final_text
elif method == "turn/completed" and (params.get("turn") or {}).get("id") == turn_id:
turn_status = (params.get("turn") or {}).get("status")
if turn_status != "completed":
error = (params.get("turn") or {}).get("error") or {}
raise RuntimeError(error.get("message") or f"Codex App Server turn ended with status {turn_status}.")
break
elif method == "error":
error = params.get("message") or params.get("error") or params
raise RuntimeError(f"Codex App Server error: {error}")
else:
await self._handle_codex_app_server_message(process, message)
finally:
await self._stop_codex_app_server(process)
return self._parse_codex_app_server_text(final_text)
def _codex_cli_prompt(
self,
query: str,
messages: list[dict[str, Any]],
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> str:
conversation_lines: list[str] = []
for message in self._messages_with_context(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
):
role = message.get("role", "unknown")
content = message.get("content", "")
suffix = ""
if role == "user" and message.get("images"):
suffix = f" [attached images: {len(message.get('images') or [])}]"
if role == "tool":
suffix = f" [tool {message.get('tool_name') or ''}]"
if role == "assistant" and message.get("tool_calls"):
suffix = f" [tool calls: {json.dumps(message.get('tool_calls'), ensure_ascii=True)}]"
conversation_lines.append(f"{role}{suffix}: {content}")
tools_json = json.dumps(self._codex_tool_catalog(), ensure_ascii=True, indent=2)
return (
"You are TraderAI running through the local Codex App Server using ChatGPT OAuth.\n"
"Do not run shell commands, inspect files, or modify the workspace.\n"
"Your only job is to decide whether to answer directly or request exactly one TraderAI tool.\n\n"
"Return JSON that matches the provided schema.\n"
"- If you can answer now, set kind to final, put the user-facing reply in message, set tool_name to an empty string, and set arguments_json to '{}'.\n"
"- If you need a tool, set kind to tool_call, set tool_name to the exact tool name, set message to an empty string, and set arguments_json to a valid JSON object string.\n"
"- Never return more than one tool call at a time.\n"
"- Prefer the TraderAI tools over guessing.\n\n"
f"Available tools:\n{tools_json}\n\n"
"Conversation transcript:\n"
+ "\n".join(conversation_lines)
)
def _codex_structured_response(self, result: dict[str, Any]) -> dict[str, Any]:
if result.get("kind") == "tool_call":
tool_name = str(result.get("tool_name") or "").strip()
arguments_json = str(result.get("arguments_json") or "{}").strip() or "{}"
return {
"message": {
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": f"codex-{uuid.uuid4()}",
"type": "function",
"function": {
"name": tool_name,
"arguments": arguments_json,
},
}
],
}
}
return {
"message": {
"role": "assistant",
"content": str(result.get("message") or ""),
"tool_calls": [],
}
}
def _write_codex_schema(self) -> str:
schema = self._codex_output_schema()
with tempfile.NamedTemporaryFile("w", suffix="-traderai-codex-schema.json", delete=False, encoding="utf-8") as handle:
json.dump(schema, handle, ensure_ascii=True)
return handle.name
@staticmethod
def _codex_output_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {
"kind": {"type": "string", "enum": ["final", "tool_call"]},
"message": {"type": "string"},
"tool_name": {"type": "string"},
"arguments_json": {"type": "string"},
},
"required": ["kind", "message", "tool_name", "arguments_json"],
"additionalProperties": False,
}
def _parse_codex_app_server_text(self, final_text: str) -> dict[str, Any]:
if not final_text.strip():
raise RuntimeError("Codex App Server returned an empty response.")
try:
parsed = json.loads(final_text)
except ValueError as exc:
raise RuntimeError(f"Codex App Server returned non-JSON output: {final_text}") from exc
if parsed.get("kind") not in {"final", "tool_call"}:
raise RuntimeError(f"Codex App Server returned an invalid result kind: {parsed!r}")
return parsed
def _parse_codex_exec_output(self, output: dict[str, Any]) -> dict[str, Any]:
events = output.get("events") or []
final_text = ""
error_text = ""
for event in events:
if event.get("type") == "item.completed":
item = event.get("item") or {}
if item.get("type") == "agent_message":
final_text = item.get("text") or final_text
elif event.get("type") == "error":
error_text = event.get("message") or error_text
elif event.get("type") == "turn.failed":
details = event.get("error") or {}
error_text = details.get("message") or error_text
if output.get("returncode") != 0 and not final_text:
raise RuntimeError(error_text or output.get("stderr") or "Codex CLI failed.")
try:
parsed = json.loads(final_text)
except ValueError as exc:
raise RuntimeError(f"Codex CLI returned non-JSON output: {final_text}") from exc
if parsed.get("kind") not in {"final", "tool_call"}:
raise RuntimeError(f"Codex CLI returned an invalid result kind: {parsed!r}")
return parsed
def _codex_command(self, required: bool = False) -> str | None:
configured = self.base_url.strip() if self.base_url else "codex"
resolved = shutil.which(configured) or configured
if required and not Path(resolved).exists() and shutil.which(resolved) is None:
raise RuntimeError("Codex CLI was not found on PATH.")
return resolved
async def _codex_app_server_status(self) -> tuple[dict[str, Any] | None, list[str]]:
process = await self._start_codex_app_server()
request_id = 1
async def send_request(method: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
nonlocal request_id
current_id = request_id
request_id += 1
payload: dict[str, Any] = {"jsonrpc": "2.0", "id": current_id, "method": method}
if params is not None:
payload["params"] = params
await self._codex_app_server_write(process, payload)
while True:
message = await self._codex_app_server_read(process, timeout=30)
if message.get("id") == current_id:
if message.get("error"):
error = message["error"]
raise RuntimeError(error.get("message") or f"Codex App Server request failed: {error}")
return message.get("result") or {}
await self._handle_codex_app_server_message(process, message)
try:
await send_request(
"initialize",
{
"clientInfo": {"name": "TraderAI", "version": __version__},
"capabilities": {"experimentalApi": True},
},
)
await self._codex_app_server_write(process, {"jsonrpc": "2.0", "method": "initialized", "params": {}})
account_result = await send_request("account/read", {"refreshToken": False})
models: list[str] = []
cursor: str | None = None
for _ in range(20):
params: dict[str, Any] = {"limit": 50, "includeHidden": False}
if cursor:
params["cursor"] = cursor
page = await send_request("model/list", params)
for item in page.get("data") or []:
model = item.get("id") or item.get("model")
if model:
models.append(model)
cursor = page.get("nextCursor")
if not cursor:
break
return account_result.get("account"), sorted(set(models))
finally:
await self._stop_codex_app_server(process)
async def _start_codex_app_server(self) -> asyncio.subprocess.Process:
return await asyncio.create_subprocess_exec(
self._codex_command(required=True),
"app-server",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
async def _codex_app_server_write(self, process: asyncio.subprocess.Process, payload: dict[str, Any]) -> None:
if process.stdin is None:
raise RuntimeError("Codex App Server stdin is unavailable.")
process.stdin.write((json.dumps(payload, ensure_ascii=True) + "\n").encode("utf-8"))
await process.stdin.drain()
async def _codex_app_server_read(self, process: asyncio.subprocess.Process, timeout: int) -> dict[str, Any]:
if process.stdout is None:
raise RuntimeError("Codex App Server stdout is unavailable.")
try:
line = await asyncio.wait_for(process.stdout.readline(), timeout=timeout)
except TimeoutError as exc:
raise RuntimeError("Codex App Server timed out.") from exc
if not line:
stderr = ""
if process.stderr is not None:
try:
stderr = (await asyncio.wait_for(process.stderr.read(), timeout=1)).decode("utf-8", errors="replace").strip()
except TimeoutError:
stderr = ""
raise RuntimeError(stderr or "Codex App Server exited without a response.")
try:
return json.loads(line.decode("utf-8", errors="replace"))
except ValueError as exc:
raise RuntimeError(f"Codex App Server returned invalid JSON-RPC: {line!r}") from exc
async def _handle_codex_app_server_message(self, process: asyncio.subprocess.Process, message: dict[str, Any]) -> None:
if "id" not in message or "method" not in message:
return
method = message.get("method")
if method in {
"item/commandExecution/requestApproval",
"item/fileChange/requestApproval",
"applyPatchApproval",
"execCommandApproval",
}:
await self._codex_app_server_write(
process,
{
"jsonrpc": "2.0",
"id": message["id"],
"result": {
"decision": "deny",
"message": "TraderAI does not allow Codex to run commands or change files.",
},
},
)
return
await self._codex_app_server_write(
process,
{
"jsonrpc": "2.0",
"id": message["id"],
"error": {"code": -32601, "message": f"TraderAI does not handle Codex App Server request {method}."},
},
)
async def _stop_codex_app_server(self, process: asyncio.subprocess.Process) -> None:
if process.returncode is not None:
return
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=3)
except TimeoutError:
process.kill()
await process.wait()
async def _run_command(self, command: list[str], timeout: int = 120, stdin_text: str | None = None) -> dict[str, Any]:
process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.PIPE if stdin_text is not None else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
payload = stdin_text.encode("utf-8") if stdin_text is not None else None
stdout, stderr = await asyncio.wait_for(process.communicate(payload), timeout=timeout)
except TimeoutError:
process.kill()
await process.communicate()
raise RuntimeError(f"Command timed out: {' '.join(command[:3])}")
stdout_text = stdout.decode("utf-8", errors="replace")
stderr_text = stderr.decode("utf-8", errors="replace")
events = []
for line in stdout_text.splitlines():
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except ValueError:
events.append({"type": "stdout", "text": line})
return {
"returncode": process.returncode,
"stdout": stdout_text,
"stderr": stderr_text,
"events": events,
}
def _codex_model_cache(self) -> list[str]:
cache_path = Path.home() / ".codex" / "models_cache.json"
if not cache_path.exists():
return []
try:
body = json.loads(cache_path.read_text(encoding="utf-8"))
except (OSError, ValueError):
return []
models = []
for item in body.get("models", []):
slug = item.get("slug")
if slug:
models.append(slug)
return sorted(set(models))
def _runtime_context_parts(
self,
query: str,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
attached_image_count: int = 0,
) -> tuple[list[str], list[str]]:
local_zone = get_localzone()
stable_parts: list[str] = []
volatile_parts = [
f"Current local date/time: {iso_now()} UTC; {iso_now_in_zone(local_zone)} {local_zone}.",
]
if attached_image_count:
label = "image" if attached_image_count == 1 else "images"
volatile_parts.append(
f"Current user message includes {attached_image_count} pasted {label}. "
"You can inspect them visually. If the user wants one reused in a marketplace listing draft, "
"call draft_marketplace_listing or draft_marketplace_listing_with_cornerstone_image with "
"use_attached_image=true and attached_image_index when needed."
)
uex = getattr(self.tools, "uex", None)
if uex:
auth_methods = []
if uex.secret_key:
auth_methods.append("secret key")
if uex.bearer_token:
auth_methods.append("bearer token")
if auth_methods:
stable_parts.append(
"UEX API authentication is configured server-side with "
+ " and ".join(auth_methods)
+ "; use authenticated UEX tools directly and do not ask for tokens."
)
else:
stable_parts.append("UEX API authentication is not configured server-side.")
if self.user_name:
stable_parts.append(f"Known user name/handle: {self.user_name}.")
if self.memory is None:
return stable_parts, volatile_parts
profile = self.memory.get_profile()
if profile:
identity = self._profile_identity(profile)
if identity:
stable_parts.append(identity)
stable_parts.append(f"Known user profile JSON: {json.dumps(self._profile_for_prompt(profile), ensure_ascii=True)}.")
last = previous_interaction if previous_interaction is not None else self.memory.last_interaction(thread_id)
if last:
volatile_parts.append(
f"Previous interaction before this message: {last['created_at']} "
f"({time_since(last['created_at'])}, role {last['role']})."
)
else:
volatile_parts.append("Previous interaction before this message: none recorded.")
memories = self.memory.recall(query, limit=6)
if memories:
memory_text = "\n".join(
f"- [{item['kind']}, importance {item['importance']}] {item['content']}"
for item in memories
)
volatile_parts.append(f"Relevant long-term memories:\n{memory_text}")
return stable_parts, volatile_parts
def _runtime_context(
self,
query: str,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
attached_image_count: int = 0,
) -> str:
stable_parts, volatile_parts = self._runtime_context_parts(
query,
previous_interaction=previous_interaction,
thread_id=thread_id,
attached_image_count=attached_image_count,
)
return "\n".join(part for part in ("\n".join(stable_parts), "\n".join(volatile_parts)) if part)
def _messages_for_thread(self, thread_id: str | None) -> list[dict[str, Any]]:
resolved_thread_id = self._thread_id(thread_id)
if resolved_thread_id not in self.thread_messages:
messages: list[dict[str, Any]] = [{"role": "system", "content": SYSTEM_PROMPT}]
if self.memory:
self.memory.ensure_thread(resolved_thread_id)
for item in self.memory.recent_conversation(limit=30, thread_id=resolved_thread_id):
role = item.get("role")
if role in {"user", "assistant"} and item.get("content"):
messages.append({"role": role, "content": item["content"]})
self.thread_messages[resolved_thread_id] = messages
return self.thread_messages[resolved_thread_id]
async def _title_first_message(
self,
thread_id: str,
first_message: str,
previous_interaction: dict[str, Any] | None,
) -> None:
if self.memory is None or previous_interaction is not None:
return
thread = self.memory.get_thread(thread_id)
if not thread or thread.get("title") != "New chat":
return
title = await self._generate_chat_title(first_message)
self.memory.rename_thread(thread_id, title or MemoryStore._thread_title(first_message))
async def _generate_chat_title(self, first_message: str) -> str:
prompt = (
"Create a concise chat title for this first user message. "
"Use 2 to 6 words. No quotes, no punctuation at the end, no preamble.\n\n"
f"Message: {first_message[:800]}"
)
try:
if self._is_openai_compatible_provider():
async with httpx.AsyncClient(timeout=20) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=self._openai_headers(),
json={
"model": self.model,
"messages": [
{"role": "system", "content": "You write short chat titles."},
{"role": "user", "content": prompt},
],
"stream": False,
**self._openai_request_options(stream=False),
},
)
response.raise_for_status()
choice = (response.json().get("choices") or [{}])[0]
message = choice.get("message") or {}
return self._clean_generated_title(message.get("content", ""))
if self.provider == "codex":
result = await self._codex_app_server_turn(
prompt,
[
{"role": "system", "content": "You write short chat titles."},
{"role": "user", "content": prompt},
],
thread_id="title",
)
return self._clean_generated_title(result.get("message", ""))
async with httpx.AsyncClient(timeout=20) as client:
response = await client.post(
f"{self.base_url}/api/chat",
json={
"model": self.model,
"messages": [
{"role": "system", "content": "You write short chat titles."},
{"role": "user", "content": prompt},
],
"options": self._ollama_options(),
"stream": False,
},
)
response.raise_for_status()
message = response.json().get("message") or {}
return self._clean_generated_title(message.get("content", ""))
except Exception:
return ""
async def generate_plan_draft(
self,
title: str = "",
objective: str = "",
kind: str = "buying",
constraints: dict[str, Any] | None = None,
items: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
clean_title = str(title or "").strip()
clean_objective = str(objective or "").strip()
clean_kind = str(kind or "buying").strip().casefold() or "buying"
clean_constraints = dict(constraints or {})
clean_items = self._normalize_plan_items(items or [])
seed = {
"title": clean_title,
"objective": clean_objective,
"kind": clean_kind,
"constraints": clean_constraints,
"items": clean_items,
}
prompt = self._plan_draft_prompt(seed)
fallback = self._heuristic_plan_draft(seed)
try:
payload = await self._generate_plain_text(prompt, system_prompt="You draft structured continual plan JSON for TraderAI.")
return self._normalize_plan_draft(payload, seed, fallback)
except Exception:
return fallback
@staticmethod
def _thread_id(thread_id: str | None) -> str:
return (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID
@staticmethod
def _clean_generated_title(title: str) -> str:
text = re.sub(r"[\r\n]+", " ", title).strip().strip('"').strip("'")
text = re.sub(r"^(title|chat title)\s*:\s*", "", text, flags=re.IGNORECASE).strip()
text = text.rstrip(".!?;:-").strip()
if not text:
return ""
words = text.split()
if len(words) > 8:
text = " ".join(words[:8])
return text[:64]
async def _generate_plain_text(self, prompt: str, system_prompt: str) -> str:
if self._is_openai_compatible_provider():
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=self._openai_headers(),
json={
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
"stream": False,
**self._openai_request_options(stream=False),
},
)
response.raise_for_status()
choice = (response.json().get("choices") or [{}])[0]
message = choice.get("message") or {}
return str(message.get("content") or "")
if self.provider == "codex":
result = await self._codex_app_server_turn(
prompt,
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
thread_id="plan-draft",
)
return str(result.get("message") or "")
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
f"{self.base_url}/api/chat",
json={
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
"options": self._ollama_options(),
"stream": False,
},
)
response.raise_for_status()
message = response.json().get("message") or {}
return str(message.get("content") or "")
@classmethod
def _normalize_plan_draft(
cls,
raw_text: str,
seed: dict[str, Any],
fallback: dict[str, Any] | None = None,
) -> dict[str, Any]:
base = dict(fallback or cls._heuristic_plan_draft(seed))
payload = cls._parse_json_object(raw_text)
if not isinstance(payload, dict):
return base
title = str(payload.get("title") or seed.get("title") or base.get("title") or "").strip()
objective = str(payload.get("objective") or seed.get("objective") or base.get("objective") or "").strip()
kind = str(payload.get("kind") or seed.get("kind") or base.get("kind") or "buying").strip().casefold() or "buying"
cadence = cls._normalize_plan_cadence(payload.get("cadence")) or base.get("cadence")
constraints = cls._normalize_plan_constraints(payload.get("constraints"), seed.get("constraints") or {}, base.get("constraints") or {})
items = cls._normalize_plan_items(payload.get("items") or seed.get("items") or base.get("items") or [])
if kind == "buying" and not items:
items = list(base.get("items") or [])
if not constraints.get("instructions"):
constraints["instructions"] = (base.get("constraints") or {}).get("instructions") or cls._default_plan_instructions(kind)
if not constraints.get("message_tone"):
constraints["message_tone"] = (base.get("constraints") or {}).get("message_tone") or "friendly and direct"
return {
"title": title or base.get("title") or "Continual plan",
"objective": objective or base.get("objective") or title or "Continue this plan",
"kind": kind,
"cadence": cadence,
"constraints": constraints,
"items": items,
}
@classmethod
def _heuristic_plan_draft(cls, seed: dict[str, Any]) -> dict[str, Any]:
title = str(seed.get("title") or "").strip()
objective = str(seed.get("objective") or "").strip()
kind = str(seed.get("kind") or "buying").strip().casefold() or "buying"
constraints = cls._normalize_plan_constraints(seed.get("constraints"), {}, {})
items = cls._normalize_plan_items(seed.get("items") or [])
if not items and kind == "buying":
inferred_names = cls._infer_item_names(f"{title}\n{objective}")
items = [{"item_name": name, "desired_quantity": 1, "max_unit_price": None} for name in inferred_names[:8]]
if not constraints.get("message_tone"):
constraints["message_tone"] = "friendly and direct"
if not constraints.get("instructions"):
constraints["instructions"] = cls._default_plan_instructions(kind)
return {
"title": title or "Continual plan",
"objective": objective or title or "Continue this plan",
"kind": kind,
"cadence": cls._normalize_plan_cadence(seed.get("cadence")) or ("0 */6 * * *" if kind == "buying" else "0 */4 * * *"),
"constraints": constraints,
"items": items,
}
@staticmethod
def _default_plan_instructions(kind: str) -> str:
if kind == "custom":
return "Check for meaningful updates, summarize what changed, and suggest the next move."
return "Track the best active listings, avoid bad prices, and draft messages for approval when a strong candidate appears."
@staticmethod
def _plan_draft_prompt(seed: dict[str, Any]) -> str:
return (
"Draft a continual TraderAI plan as strict JSON.\n"
"Return one JSON object only with keys: title, objective, kind, cadence, constraints, items.\n"
"constraints may include message_tone, instructions, preferred_locations, excluded_sellers, max_unit_price.\n"
"items must be an array of objects with item_name, desired_quantity, max_unit_price.\n"
"If the request is vague, still fill cadence, message_tone, and instructions.\n"
"Only include checklist items when they can be reasonably inferred from the request or existing draft.\n"
"Do not wrap the JSON in markdown.\n\n"
f"Current draft seed: {json.dumps(seed, ensure_ascii=True)}"
)
@staticmethod
def _parse_json_object(raw_text: str) -> dict[str, Any] | None:
text = str(raw_text or "").strip()
if not text:
return None
try:
parsed = json.loads(text)
return parsed if isinstance(parsed, dict) else None
except ValueError:
pass
start = text.find("{")
end = text.rfind("}")
if start == -1 or end <= start:
return None
try:
parsed = json.loads(text[start : end + 1])
return parsed if isinstance(parsed, dict) else None
except ValueError:
return None
@classmethod
def _normalize_plan_constraints(cls, value: Any, seed: dict[str, Any], fallback: dict[str, Any]) -> dict[str, Any]:
merged: dict[str, Any] = {}
for source in (fallback, seed, value if isinstance(value, dict) else {}):
if not isinstance(source, dict):
continue
for key, item in source.items():
if item in (None, "", [], {}):
continue
if key in {"preferred_locations", "excluded_sellers"}:
if isinstance(item, list):
merged[key] = [str(entry).strip() for entry in item if str(entry).strip()]
elif key == "max_unit_price":
try:
merged[key] = float(item)
except (TypeError, ValueError):
continue
else:
merged[key] = str(item).strip()
return merged
@staticmethod
def _normalize_plan_cadence(value: Any) -> str | None:
text = str(value or "").strip()
if not text:
return None
parts = text.split()
return text if len(parts) == 5 else None
@classmethod
def _normalize_plan_items(cls, items: Any) -> list[dict[str, Any]]:
if not isinstance(items, list):
return []
normalized: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
name = str(item.get("item_name") or item.get("name") or "").strip()
if not name:
continue
normalized_item: dict[str, Any] = {"item_name": name}
try:
normalized_item["desired_quantity"] = max(1, int(item.get("desired_quantity") or item.get("quantity") or 1))
except (TypeError, ValueError):
normalized_item["desired_quantity"] = 1
try:
if item.get("max_unit_price") not in (None, ""):
normalized_item["max_unit_price"] = float(item.get("max_unit_price"))
elif item.get("max_price") not in (None, ""):
normalized_item["max_unit_price"] = float(item.get("max_price"))
else:
normalized_item["max_unit_price"] = None
except (TypeError, ValueError):
normalized_item["max_unit_price"] = None
normalized.append(normalized_item)
return normalized
@staticmethod
def _infer_item_names(text: str) -> list[str]:
source = str(text or "")
quoted = [match.strip() for match in re.findall(r'"([^"\n]{2,80})"|\'([^\'\n]{2,80})\'', source)]
names = [next((part for part in group if part), "") for group in quoted]
if names:
return [name for name in names if name]
lines = []
for raw_line in source.splitlines():
line = raw_line.strip(" -*\t")
if not line:
continue
if any(token in line for token in [",", ";", "/"]):
parts = re.split(r"[,;/]+", line)
lines.extend(part.strip() for part in parts if part.strip())
else:
lines.append(line)
stopwords = {
"need", "needs", "want", "wants", "find", "draft", "deal", "deals", "parts", "part", "items",
"watch", "track", "check", "buy", "buying", "for", "the", "and", "with", "from", "best", "cheapest",
}
inferred = []
for line in lines:
clean = re.sub(r"\s+", " ", line).strip().strip(".")
if len(clean) < 3:
continue
lowered = clean.casefold()
if lowered in stopwords:
continue
if any(phrase in lowered for phrase in ["find and draft", "check for", "continue this plan"]):
continue
inferred.append(clean[:120])
deduped = []
seen = set()
for item in inferred:
key = item.casefold()
if key in seen:
continue
seen.add(key)
deduped.append(item)
return deduped
def _pending_payloads(self) -> list[dict[str, Any]]:
return [
{
"id": action.id,
"label": action.label,
"method": action.method,
"endpoint": action.endpoint,
"payload": self.tools._display_payload(action.payload) if hasattr(self.tools, "_display_payload") else action.payload,
"metadata": action.metadata or {},
}
for action in self.tools.pending_actions.values()
]
def _ollama_options(self) -> dict[str, Any]:
if not self.num_ctx:
return {}
return {"num_ctx": self.num_ctx}
@staticmethod
def _empty_response_fallback(tool_results: list[dict[str, Any]]) -> str:
if not tool_results:
return "I did not get a usable response from the model. Please try again, or narrow the request a bit."
return OllamaAgent._tool_result_fallback(
tool_results,
"I completed the tool call, but the model did not write a final answer.",
)
@staticmethod
def _tool_result_fallback(tool_results: list[dict[str, Any]], reason: str) -> str:
last = tool_results[-1]
text = json.dumps(last, indent=2, ensure_ascii=True)
if len(text) > 1800:
text = text[:1800] + "\n..."
return (
f"{reason} "
"Here is the last tool result so you are not left staring at a blank response:\n\n"
f"```json\n{text}\n```"
)
@staticmethod
def _tool_status(name: str) -> str:
if name.startswith("get_uex_"):
return f"Fetching UEX {name.removeprefix('get_uex_')}"
if name.startswith("draft_uex_"):
return f"Drafting UEX {name.removeprefix('draft_uex_')} for approval"
if name.startswith("delete_uex_"):
return f"Drafting UEX {name.removeprefix('delete_uex_')} delete for approval"
labels = {
"search_uex_api_index": "Searching UEX API index",
"summarize_uex_commodity_price_history": "Summarizing commodity price history",
"summarize_uex_marketplace_price_history": "Summarizing marketplace price history",
"summarize_uex_currency_index_history": "Summarizing currency index history",
"list_scmdb_versions": "Checking SCMDB versions",
"search_scmdb_missions": "Searching SCMDB missions",
"get_scmdb_mission_rewards": "Fetching SCMDB mission rewards",
"search_scwiki_pages": "Searching Star Citizen Wiki",
"get_scwiki_page": "Reading Star Citizen Wiki page",
"search_scwiki_vehicles": "Searching Star Citizen Wiki vehicles",
"get_scwiki_vehicle": "Fetching Star Citizen Wiki vehicle",
"search_wikelo_ship_projects": "Searching Wikelo ship projects",
"get_wikelo_ship_project": "Fetching Wikelo ship requirements",
"search_cornerstone_items": "Searching Cornerstone items",
"get_cornerstone_item_locations": "Fetching Cornerstone item locations",
"get_cornerstone_item_media": "Fetching Cornerstone item media",
"uex_api_catalog": "Checking UEX API catalog",
"uex_get": "Fetching UEX data",
"uex_draft_post": "Drafting UEX write for approval",
"uex_draft_delete": "Drafting UEX delete for approval",
"search_marketplace_listings": "Searching UEX listings",
"get_marketplace_listing": "Fetching listing details",
"list_marketplace_negotiations": "Checking negotiations",
"list_local_negotiations": "Checking local negotiations",
"get_local_negotiation": "Reading local negotiation",
"search_local_negotiation_messages": "Searching local negotiation history",
"get_negotiation_messages": "Reading negotiation messages",
"draft_negotiation_message": "Drafting message for approval",
"draft_negotiation_close": "Drafting negotiation close for approval",
"draft_negotiation_rating": "Drafting negotiation rating for approval",
"draft_marketplace_listing": "Drafting listing for approval",
"draft_marketplace_listing_with_cornerstone_image": "Drafting listing with Cornerstone image",
"check_uex_notifications": "Checking UEX notifications",
}
return labels.get(name, f"Running {name}")
@staticmethod
def _stream_metrics(event: dict[str, Any]) -> dict[str, Any]:
prompt_tokens = int(event.get("prompt_eval_count") or 0)
prompt_duration = int(event.get("prompt_eval_duration") or 0)
output_tokens = int(event.get("eval_count") or 0)
output_duration = int(event.get("eval_duration") or 0)
def rate(tokens: int, duration_ns: int) -> float | None:
if not tokens or not duration_ns:
return None
return tokens / (duration_ns / 1_000_000_000)
return {
"reading_tokens": prompt_tokens,
"reading_tokens_per_second": rate(prompt_tokens, prompt_duration),
"writing_tokens": output_tokens,
"writing_tokens_per_second": rate(output_tokens, output_duration),
}
@staticmethod
def _cloud_usage_metrics(usage: dict[str, Any]) -> dict[str, Any]:
prompt_tokens = int(usage.get("prompt_tokens") or 0)
completion_tokens = int(usage.get("completion_tokens") or 0)
cache_hit_tokens = int(usage.get("prompt_cache_hit_tokens") or 0)
cache_miss_tokens = int(usage.get("prompt_cache_miss_tokens") or 0)
metrics = {
"reading_tokens": prompt_tokens,
"writing_tokens": completion_tokens,
}
if cache_hit_tokens or cache_miss_tokens:
metrics["cache_hit_tokens"] = cache_hit_tokens
metrics["cache_miss_tokens"] = cache_miss_tokens
return metrics
@staticmethod
def _profile_identity(profile: dict[str, Any]) -> str:
user = profile.get("uex_user")
if not isinstance(user, dict):
configured = profile.get("configured_name")
return f"You are speaking with {configured}." if configured else ""
username = user.get("username") or user.get("user_username")
name = user.get("name")
fields = []
if username and name and username != name:
fields.append(f"You are speaking with UEX user {username} ({name}).")
elif username or name:
fields.append(f"You are speaking with UEX user {username or name}.")
details = []
for key, label in [
("timezone", "timezone"),
("language", "preferred language"),
("specializations", "specializations"),
("languages", "languages"),
("archetypes", "archetypes"),
]:
value = user.get(key)
if value:
details.append(f"{label}: {value}")
if details:
fields.append("UEX profile details: " + "; ".join(details) + ".")
return " ".join(fields)
@staticmethod
def _profile_for_prompt(profile: dict[str, Any]) -> dict[str, Any]:
user = profile.get("uex_user")
if not isinstance(user, dict):
return profile
useful_user_fields = [
"id",
"name",
"username",
"avatar",
"bio",
"website_url",
"timezone",
"language",
"day_availability",
"time_availability",
"specializations",
"languages",
"archetypes",
"is_datarunner",
"is_staff",
"is_away_game",
"date_rsi_verified",
"date_twitch_verified",
]
prompt_profile = dict(profile)
prompt_profile["uex_user"] = {
key: user[key]
for key in useful_user_fields
if key in user and user[key] not in (None, "")
}
return prompt_profile
@staticmethod
def _extract_call(call: dict[str, Any]) -> tuple[str, dict[str, Any]]:
function = call.get("function") or {}
name = function.get("name") or call.get("name")
arguments = function.get("arguments") or call.get("arguments") or {}
if isinstance(arguments, str):
arguments = json.loads(arguments or "{}")
return name, arguments
@staticmethod
def _normalize_images(images: list[dict[str, Any]] | None) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for image in images or []:
if not isinstance(image, dict):
continue
image_data = str(image.get("image_data") or "").strip()
if not image_data:
continue
normalized.append(
{
"name": str(image.get("name") or "").strip() or "pasted-image.png",
"content_type": str(image.get("content_type") or "image/png").strip() or "image/png",
"image_data": image_data,
}
)
return normalized
@staticmethod
def _prompt_text(content: str, image_count: int) -> str:
text = content.strip()
if text:
return text
return "Please analyze the attached image." if image_count == 1 else "Please analyze the attached images."
@staticmethod
def _conversation_content(content: str, image_count: int) -> str:
text = content.strip()
if not image_count:
return text
note = f"[Attached {image_count} pasted image{'s' if image_count != 1 else ''}]"
return f"{text}\n\n{note}" if text else note
@staticmethod
def _user_message(content: str, images: list[dict[str, Any]]) -> dict[str, Any]:
message: dict[str, Any] = {"role": "user", "content": content}
if images:
message["images"] = [image["image_data"] for image in images]
message["image_content_types"] = [image["content_type"] for image in images]
return message
class OllamaUnavailable(RuntimeError):
pass