1920 lines
86 KiB
Python
1920 lines
86 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import uuid
|
|
from collections.abc import AsyncIterator
|
|
from contextlib import nullcontext
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from tzlocal import get_localzone
|
|
|
|
from traderai.memory import DEFAULT_THREAD_ID, MemoryStore, iso_now, iso_now_in_zone, time_since
|
|
from traderai.tools import ToolRegistry
|
|
from traderai.version import __version__
|
|
|
|
|
|
SYSTEM_PROMPT = """You are TraderAI, a sharp Star Citizen marketplace copilot for UEX work.
|
|
Sound like a competent player who knows the game and the market. Be natural, direct, and helpful. Avoid corporate filler, robotic phrasing, and meta notes.
|
|
Use tools when the user asks about UEX data, open/current listings, active negotiations, unread notifications, messages, offers, or posting ads.
|
|
Use continual plan tools when the user asks for multi-day or recurring marketplace work, such as finding several parts, watching for deals, tracking candidates, or coordinating negotiations over time.
|
|
UEX credentials are configured server-side when available. Never ask the user to provide UEX_SECRET_KEY or UEX_BEARER_TOKEN in chat; call the authenticated UEX tool and only mention credential configuration if the tool returns an authentication error.
|
|
Use the specific UEX tool for the needed endpoint, such as get_uex_commodities_prices or get_uex_vehicles. Use fields, limit, and summary mode so tool results stay compact.
|
|
When the user asks for history, trends, changes over time, or past prices, prefer the summarize_uex_*_history tools when available; use search_uex_api_index(history_only=true) if you need to discover history endpoints.
|
|
When you need missing Star Citizen knowledge to answer accurately, use Star Citizen Wiki tools during your reasoning instead of guessing.
|
|
Use SCMDB tools when the user asks about Star Citizen missions/contracts, mission rewards, payouts, reputation gains, item rewards, blueprint rewards, or hauling mission cargo. Prefer SCMDB live data unless the user asks for PTU or a specific game version.
|
|
Use Star Citizen Wiki tools for general game knowledge, ships and vehicles, store availability, purchase locations, ship prices, manufacturers, locations, and page summaries from starcitizen.tools.
|
|
Use Wikelo ship project tools when the user asks for Wikelo ship requirements, Wikelo build materials, or what items are needed for a Wikelo ship project.
|
|
Use Cornerstone tools when the user asks where an item is sold, which shops carry an item, item store locations, in-game item base prices, or Universal Item Finder data.
|
|
When drafting UEX marketplace item posts that need images, use Cornerstone media tools or draft_marketplace_listing_with_cornerstone_image so the pending listing can include UEX image_data sourced from Cornerstone.
|
|
Prefer open and current UEX marketplace information. Do not use historical sale data, completed sale records, or sale/average-history information unless the user explicitly asks for historical sales.
|
|
Treat UEX marketplace prices as in-game aUEC/UEC credits, never real-world dollars, unless the user explicitly says otherwise.
|
|
For marketplace writes, draft the exact pending action and tell the user what will be sent; never claim it was sent until approval succeeds.
|
|
When drafting negotiation messages or marketplace replies, write like a real player would. Keep messages human, concise, and purposeful. Never include internal notes like "Tone note".
|
|
For continual plans, never invent an unknown parts checklist. If the required items cannot be derived from provided details or tools, create the plan in a needs-input state and say what item list is missing.
|
|
When a scheduled wake job fires, always write a concise Inbox-ready result that says what you checked, the key findings, and the suggested next action.
|
|
Keep prices, listing ids, slugs, users, and UEX status codes precise. If data is missing, say what you need next."""
|
|
|
|
|
|
class OllamaAgent:
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
model: str,
|
|
tools: ToolRegistry,
|
|
memory: MemoryStore | None = None,
|
|
user_name: str | None = None,
|
|
num_ctx: int | None = None,
|
|
provider: str = "ollama",
|
|
api_key: str | None = None,
|
|
reasoning_effort: str = "medium",
|
|
) -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self.model = model
|
|
self.tools = tools
|
|
self.memory = memory
|
|
self.user_name = user_name
|
|
self.num_ctx = num_ctx
|
|
self.provider = provider.strip().casefold() or "ollama"
|
|
self.api_key = api_key
|
|
self.reasoning_effort = reasoning_effort.strip().casefold() or "medium"
|
|
self.thread_messages: dict[str, list[dict[str, Any]]] = {}
|
|
|
|
async def health(self) -> dict[str, Any]:
|
|
if self._is_openai_compatible_provider():
|
|
return await self._openai_health()
|
|
if self.provider == "codex":
|
|
return await self._codex_health()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=3) as client:
|
|
response = await client.get(f"{self.base_url}/api/tags")
|
|
response.raise_for_status()
|
|
body = response.json()
|
|
except (httpx.HTTPError, ValueError) as exc:
|
|
return {
|
|
"online": False,
|
|
"model": self.model,
|
|
"base_url": self.base_url,
|
|
"message": f"Ollama is offline or unreachable at {self.base_url}. Open the Ollama tab and use the recommended action.",
|
|
"detail": str(exc),
|
|
}
|
|
|
|
models = [model.get("name") or model.get("model") for model in body.get("models", [])]
|
|
return {
|
|
"online": True,
|
|
"model": self.model,
|
|
"base_url": self.base_url,
|
|
"model_available": self.model in models,
|
|
"models": models,
|
|
"message": "Ollama is online.",
|
|
}
|
|
|
|
async def ensure_available(self) -> None:
|
|
health = await self.health()
|
|
if not health["online"]:
|
|
raise OllamaUnavailable(health["message"])
|
|
if health.get("model_available") is False:
|
|
raise OllamaUnavailable(health["message"])
|
|
|
|
async def chat(
|
|
self,
|
|
content: str,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
images: list[dict[str, Any]] | None = None,
|
|
) -> dict[str, Any]:
|
|
await self.ensure_available()
|
|
resolved_thread_id = self._thread_id(thread_id)
|
|
messages = self._messages_for_thread(resolved_thread_id)
|
|
previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None
|
|
normalized_images = self._normalize_images(images)
|
|
prompt_text = self._prompt_text(content, len(normalized_images))
|
|
memory_content = self._conversation_content(content, len(normalized_images))
|
|
if self.memory:
|
|
self.memory.add_conversation("user", memory_content, resolved_thread_id)
|
|
await self._title_first_message(resolved_thread_id, prompt_text, previous_interaction)
|
|
messages.append(self._user_message(prompt_text, normalized_images))
|
|
last_tool_results: list[dict[str, Any]] = []
|
|
image_scope = self.tools.chat_image_scope(normalized_images) if hasattr(self.tools, "chat_image_scope") else nullcontext()
|
|
with image_scope:
|
|
for _ in self._tool_rounds():
|
|
try:
|
|
response = await self._chat_once(
|
|
prompt_text,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=resolved_thread_id,
|
|
)
|
|
except Exception as exc:
|
|
if not last_tool_results:
|
|
raise
|
|
answer = self._tool_result_fallback(
|
|
last_tool_results,
|
|
f"The {self._provider_label()} stopped after the tool call: {exc}",
|
|
)
|
|
messages.append({"role": "assistant", "content": answer})
|
|
if self.memory:
|
|
self.memory.add_conversation("assistant", answer, resolved_thread_id)
|
|
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
|
|
message = response.get("message") or {}
|
|
tool_calls = message.get("tool_calls") or []
|
|
if not tool_calls:
|
|
answer = message.get("content", "")
|
|
if not answer.strip():
|
|
answer = self._empty_response_fallback(last_tool_results)
|
|
messages.append({"role": "assistant", "content": answer})
|
|
if self.memory:
|
|
self.memory.add_conversation("assistant", answer, resolved_thread_id)
|
|
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
|
|
|
|
messages.append(message)
|
|
for call in tool_calls:
|
|
name, arguments = self._extract_call(call)
|
|
result = await self.tools.execute(name, arguments)
|
|
last_tool_results.append({"tool": name, "result": result})
|
|
messages.append({"role": "tool", "tool_name": name, "tool_call_id": call.get("id"), "content": json.dumps(result)})
|
|
fallback = self._tool_round_limit_message()
|
|
messages.append({"role": "assistant", "content": fallback})
|
|
if self.memory:
|
|
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
|
|
return {"message": fallback, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
|
|
|
|
async def chat_events(
|
|
self,
|
|
content: str,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
images: list[dict[str, Any]] | None = None,
|
|
) -> AsyncIterator[dict[str, Any]]:
|
|
health = await self.health()
|
|
if not health["online"]:
|
|
yield {"type": "warning", "message": health["message"]}
|
|
yield {"type": "done", "pending_actions": self._pending_payloads()}
|
|
return
|
|
|
|
resolved_thread_id = self._thread_id(thread_id)
|
|
messages = self._messages_for_thread(resolved_thread_id)
|
|
previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None
|
|
normalized_images = self._normalize_images(images)
|
|
prompt_text = self._prompt_text(content, len(normalized_images))
|
|
memory_content = self._conversation_content(content, len(normalized_images))
|
|
if self.memory:
|
|
self.memory.add_conversation("user", memory_content, resolved_thread_id)
|
|
await self._title_first_message(resolved_thread_id, prompt_text, previous_interaction)
|
|
messages.append(self._user_message(prompt_text, normalized_images))
|
|
yield {"type": "status", "message": "Thinking"}
|
|
last_tool_results: list[dict[str, Any]] = []
|
|
image_scope = self.tools.chat_image_scope(normalized_images) if hasattr(self.tools, "chat_image_scope") else nullcontext()
|
|
with image_scope:
|
|
for _ in self._tool_rounds():
|
|
assistant_message: dict[str, Any] = {"role": "assistant", "content": ""}
|
|
tool_calls: list[dict[str, Any]] = []
|
|
|
|
try:
|
|
async for event in self._chat_stream_once(
|
|
prompt_text,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=resolved_thread_id,
|
|
):
|
|
if event.get("type") == "reasoning":
|
|
reasoning_chunk = event.get("content") or ""
|
|
if reasoning_chunk:
|
|
assistant_message["reasoning_content"] = assistant_message.get("reasoning_content", "") + reasoning_chunk
|
|
yield {"type": "reasoning", "content": reasoning_chunk}
|
|
continue
|
|
message = event.get("message") or {}
|
|
if message.get("reasoning_content"):
|
|
assistant_message["reasoning_content"] = message.get("reasoning_content")
|
|
chunk = message.get("content") or ""
|
|
if chunk:
|
|
assistant_message["content"] += chunk
|
|
yield {"type": "token", "content": chunk}
|
|
if message.get("tool_calls"):
|
|
tool_calls.extend(message["tool_calls"])
|
|
if event.get("done"):
|
|
metrics = self._stream_metrics(event)
|
|
if metrics:
|
|
yield {"type": "metrics", **metrics}
|
|
except Exception as exc:
|
|
if not last_tool_results:
|
|
yield {"type": "warning", "message": f"Chat failed before any tool result was available: {exc}"}
|
|
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
|
|
return
|
|
fallback = self._tool_result_fallback(
|
|
last_tool_results,
|
|
f"The {self._provider_label()} stopped after the tool call: {exc}",
|
|
)
|
|
assistant_message["content"] = fallback
|
|
messages.append(assistant_message)
|
|
if self.memory:
|
|
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
|
|
yield {"type": "token", "content": fallback}
|
|
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
|
|
return
|
|
|
|
if not tool_calls:
|
|
if not assistant_message.get("content", "").strip():
|
|
fallback = self._empty_response_fallback(last_tool_results)
|
|
assistant_message["content"] = fallback
|
|
yield {"type": "token", "content": fallback}
|
|
messages.append(assistant_message)
|
|
if self.memory:
|
|
self.memory.add_conversation("assistant", assistant_message.get("content", ""), resolved_thread_id)
|
|
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
|
|
return
|
|
|
|
assistant_message["tool_calls"] = tool_calls
|
|
messages.append(assistant_message)
|
|
for call in tool_calls:
|
|
name, arguments = self._extract_call(call)
|
|
yield {"type": "status", "message": self._tool_status(name)}
|
|
result = await self.tools.execute(name, arguments)
|
|
last_tool_results.append({"tool": name, "result": result})
|
|
messages.append({"role": "tool", "tool_name": name, "tool_call_id": call.get("id"), "content": json.dumps(result)})
|
|
|
|
yield {"type": "status", "message": "Writing response"}
|
|
fallback = self._tool_round_limit_message()
|
|
messages.append({"role": "assistant", "content": fallback})
|
|
if self.memory:
|
|
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
|
|
yield {"type": "token", "content": fallback}
|
|
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
|
|
|
|
async def generate_wake_response(self, wake_message: str) -> str:
|
|
await self.ensure_available()
|
|
messages = self._messages_for_thread("wake")
|
|
previous_interaction = self.memory.last_interaction("wake") if self.memory else None
|
|
messages.append({"role": "user", "content": wake_message})
|
|
last_tool_results: list[dict[str, Any]] = []
|
|
for _ in self._tool_rounds():
|
|
try:
|
|
response = await self._chat_once(
|
|
wake_message,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id="wake",
|
|
)
|
|
except Exception as exc:
|
|
if not last_tool_results:
|
|
raise
|
|
content = self._tool_result_fallback(
|
|
last_tool_results,
|
|
f"The {self._provider_label()} stopped after the wake-job tool call: {exc}",
|
|
)
|
|
messages.append({"role": "assistant", "content": content})
|
|
if self.memory:
|
|
self.memory.add_conversation("system", wake_message, "wake")
|
|
self.memory.add_conversation("assistant", content, "wake")
|
|
return content
|
|
message = response.get("message") or {}
|
|
tool_calls = message.get("tool_calls") or []
|
|
if not tool_calls:
|
|
content = message.get("content", "")
|
|
if not content.strip():
|
|
content = self._empty_response_fallback(last_tool_results)
|
|
messages.append({"role": "assistant", "content": content})
|
|
if self.memory:
|
|
self.memory.add_conversation("system", wake_message, "wake")
|
|
self.memory.add_conversation("assistant", content, "wake")
|
|
return content
|
|
|
|
messages.append(message)
|
|
for call in tool_calls:
|
|
name, arguments = self._extract_call(call)
|
|
result = await self.tools.execute(name, arguments)
|
|
last_tool_results.append({"tool": name, "result": result})
|
|
messages.append({"role": "tool", "tool_name": name, "tool_call_id": call.get("id"), "content": json.dumps(result)})
|
|
content = self._wake_tool_round_limit_message()
|
|
messages.append({"role": "assistant", "content": content})
|
|
if self.memory:
|
|
self.memory.add_conversation("system", wake_message, "wake")
|
|
self.memory.add_conversation("assistant", content, "wake")
|
|
return content
|
|
|
|
async def _chat_once(
|
|
self,
|
|
query: str = "",
|
|
messages: list[dict[str, Any]] | None = None,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> dict[str, Any]:
|
|
if self._is_openai_compatible_provider():
|
|
return await self._openai_chat(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
)
|
|
if self.provider == "codex":
|
|
return await self._codex_chat(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
)
|
|
return await self._ollama_chat(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
)
|
|
|
|
async def _chat_stream_once(
|
|
self,
|
|
query: str = "",
|
|
messages: list[dict[str, Any]] | None = None,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> AsyncIterator[dict[str, Any]]:
|
|
if self._is_openai_compatible_provider():
|
|
async for event in self._openai_chat_stream(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
):
|
|
yield event
|
|
return
|
|
if self.provider == "codex":
|
|
async for event in self._codex_chat_stream(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
):
|
|
yield event
|
|
return
|
|
async for event in self._ollama_chat_stream(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
):
|
|
yield event
|
|
|
|
async def _ollama_chat(
|
|
self,
|
|
query: str = "",
|
|
messages: list[dict[str, Any]] | None = None,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=120) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/api/chat",
|
|
json={
|
|
"model": self.model,
|
|
"messages": self._messages_with_context(
|
|
query,
|
|
messages or self._messages_for_thread(thread_id),
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
),
|
|
"tools": self.tools.schemas,
|
|
"options": self._ollama_options(),
|
|
"stream": False,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def _ollama_chat_stream(
|
|
self,
|
|
query: str = "",
|
|
messages: list[dict[str, Any]] | None = None,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> AsyncIterator[dict[str, Any]]:
|
|
async with httpx.AsyncClient(timeout=120) as client:
|
|
async with client.stream(
|
|
"POST",
|
|
f"{self.base_url}/api/chat",
|
|
json={
|
|
"model": self.model,
|
|
"messages": self._messages_with_context(
|
|
query,
|
|
messages or self._messages_for_thread(thread_id),
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
),
|
|
"tools": self.tools.schemas,
|
|
"options": self._ollama_options(),
|
|
"stream": True,
|
|
},
|
|
) as response:
|
|
response.raise_for_status()
|
|
async for line in response.aiter_lines():
|
|
if line:
|
|
yield json.loads(line)
|
|
|
|
async def _openai_chat(
|
|
self,
|
|
query: str = "",
|
|
messages: list[dict[str, Any]] | None = None,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=120) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/chat/completions",
|
|
headers=self._openai_headers(),
|
|
json={
|
|
"model": self.model,
|
|
"messages": self._openai_messages(
|
|
query,
|
|
messages or self._messages_for_thread(thread_id),
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
),
|
|
"tools": self.tools.schemas,
|
|
"stream": False,
|
|
**self._openai_request_options(stream=False),
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
body = response.json()
|
|
choice = (body.get("choices") or [{}])[0]
|
|
message = choice.get("message") or {}
|
|
return {
|
|
"message": {
|
|
"role": message.get("role", "assistant"),
|
|
"reasoning_content": message.get("reasoning_content") or "",
|
|
"content": message.get("content") or "",
|
|
"tool_calls": message.get("tool_calls") or [],
|
|
}
|
|
}
|
|
|
|
async def _openai_chat_stream(
|
|
self,
|
|
query: str = "",
|
|
messages: list[dict[str, Any]] | None = None,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> AsyncIterator[dict[str, Any]]:
|
|
tool_calls: dict[int, dict[str, Any]] = {}
|
|
async with httpx.AsyncClient(timeout=120) as client:
|
|
async with client.stream(
|
|
"POST",
|
|
f"{self.base_url}/chat/completions",
|
|
headers=self._openai_headers(),
|
|
json={
|
|
"model": self.model,
|
|
"messages": self._openai_messages(
|
|
query,
|
|
messages or self._messages_for_thread(thread_id),
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
),
|
|
"tools": self.tools.schemas,
|
|
"stream": True,
|
|
**self._openai_request_options(stream=True),
|
|
},
|
|
) as response:
|
|
response.raise_for_status()
|
|
async for line in response.aiter_lines():
|
|
if not line or not line.startswith("data:"):
|
|
continue
|
|
payload = line.removeprefix("data:").strip()
|
|
if not payload:
|
|
continue
|
|
if payload == "[DONE]":
|
|
break
|
|
event = json.loads(payload)
|
|
if event.get("usage"):
|
|
metrics = self._cloud_usage_metrics(event["usage"])
|
|
if metrics:
|
|
yield {"type": "metrics", **metrics}
|
|
choice = (event.get("choices") or [{}])[0]
|
|
delta = choice.get("delta") or {}
|
|
reasoning_content = delta.get("reasoning_content") or ""
|
|
if reasoning_content:
|
|
yield {"type": "reasoning", "content": reasoning_content}
|
|
content = delta.get("content") or ""
|
|
if content:
|
|
yield {"message": {"role": "assistant", "content": content}}
|
|
for tool_call in delta.get("tool_calls") or []:
|
|
self._merge_openai_tool_call(tool_calls, tool_call)
|
|
finish_reason = choice.get("finish_reason")
|
|
if finish_reason:
|
|
message = choice.get("message") or {}
|
|
yield {
|
|
"message": {
|
|
"role": "assistant",
|
|
"reasoning_content": message.get("reasoning_content") or "",
|
|
"content": "",
|
|
"tool_calls": self._ordered_tool_calls(tool_calls),
|
|
},
|
|
"done": True,
|
|
}
|
|
return
|
|
yield {
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": self._ordered_tool_calls(tool_calls),
|
|
},
|
|
"done": True,
|
|
}
|
|
|
|
async def _codex_chat(
|
|
self,
|
|
query: str = "",
|
|
messages: list[dict[str, Any]] | None = None,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> dict[str, Any]:
|
|
result = await self._codex_cli_turn(
|
|
query,
|
|
messages or self._messages_for_thread(thread_id),
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
)
|
|
return self._codex_structured_response(result)
|
|
|
|
async def _codex_chat_stream(
|
|
self,
|
|
query: str = "",
|
|
messages: list[dict[str, Any]] | None = None,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> AsyncIterator[dict[str, Any]]:
|
|
result = await self._codex_cli_turn(
|
|
query,
|
|
messages or self._messages_for_thread(thread_id),
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
)
|
|
response = self._codex_structured_response(result)
|
|
message = response["message"]
|
|
if message.get("content"):
|
|
yield {"message": {"role": "assistant", "content": message["content"]}}
|
|
yield {
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": message.get("tool_calls") or [],
|
|
},
|
|
"done": True,
|
|
}
|
|
|
|
def _messages_with_context(
|
|
self,
|
|
query: str,
|
|
messages: list[dict[str, Any]],
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> list[dict[str, Any]]:
|
|
attached_image_count = 0
|
|
for message in reversed(messages):
|
|
if message.get("role") != "user":
|
|
continue
|
|
attached_image_count = len(message.get("images") or [])
|
|
break
|
|
stable_context, volatile_context = self._runtime_context_parts(
|
|
query,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
attached_image_count=attached_image_count,
|
|
)
|
|
contexts = [part for part in ("\n".join(stable_context), "\n".join(volatile_context)) if part]
|
|
if not contexts:
|
|
return messages
|
|
return [messages[0], *({"role": "system", "content": context} for context in contexts), *messages[1:]]
|
|
|
|
async def _openai_health(self) -> dict[str, Any]:
|
|
return await self._cloud_health(self.provider if self._is_openai_compatible_provider() else "openai")
|
|
|
|
async def _codex_health(self) -> dict[str, Any]:
|
|
command = self._codex_command()
|
|
if not command:
|
|
return {
|
|
"online": False,
|
|
"model": self.model,
|
|
"base_url": self.base_url,
|
|
"provider": "codex",
|
|
"model_available": False,
|
|
"models": [],
|
|
"message": "Codex CLI was not found on PATH.",
|
|
"detail": "",
|
|
}
|
|
try:
|
|
account, models = await self._codex_app_server_status()
|
|
except Exception as exc:
|
|
return {
|
|
"online": False,
|
|
"model": self.model,
|
|
"base_url": command,
|
|
"provider": "codex",
|
|
"model_available": False,
|
|
"models": [],
|
|
"message": "Codex App Server is installed, but TraderAI could not connect to it.",
|
|
"detail": str(exc),
|
|
}
|
|
logged_in = bool(account)
|
|
detail = f"Logged in as {account.get('email')}" if isinstance(account, dict) and account.get("email") else ""
|
|
return {
|
|
"online": logged_in,
|
|
"model": self.model,
|
|
"base_url": command,
|
|
"provider": "codex",
|
|
"model_available": self.model in models if models else bool(self.model),
|
|
"models": models,
|
|
"message": "Codex App Server is online." if logged_in else "Codex CLI is installed, but not logged in with ChatGPT.",
|
|
"detail": detail,
|
|
}
|
|
|
|
async def _cloud_health(self, provider: str) -> dict[str, Any]:
|
|
if not self.api_key:
|
|
return {
|
|
"online": False,
|
|
"model": self.model,
|
|
"base_url": self.base_url,
|
|
"provider": provider,
|
|
"model_available": False,
|
|
"models": [],
|
|
"message": f"{self._provider_label()} is selected, but no API key is configured.",
|
|
"detail": "",
|
|
}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
response = await client.get(f"{self.base_url}/models", headers=self._openai_headers())
|
|
response.raise_for_status()
|
|
body = response.json()
|
|
except (httpx.HTTPError, ValueError) as exc:
|
|
return {
|
|
"online": False,
|
|
"model": self.model,
|
|
"base_url": self.base_url,
|
|
"provider": provider,
|
|
"model_available": False,
|
|
"models": [],
|
|
"message": f"{self._provider_label()} is unreachable at {self.base_url} or rejected the API key.",
|
|
"detail": str(exc),
|
|
}
|
|
models = sorted(item.get("id") for item in body.get("data", []) if item.get("id"))
|
|
return {
|
|
"online": True,
|
|
"model": self.model,
|
|
"base_url": self.base_url,
|
|
"provider": provider,
|
|
"model_available": self.model in models,
|
|
"models": models,
|
|
"message": f"{self._provider_label()} is online.",
|
|
}
|
|
|
|
def _openai_headers(self) -> dict[str, str]:
|
|
return {
|
|
"Authorization": f"Bearer {self.api_key or ''}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
def _openai_request_options(self, stream: bool) -> dict[str, Any]:
|
|
if self.provider == "deepseek":
|
|
options: dict[str, Any] = {}
|
|
if self.reasoning_effort in {"none", "minimal"}:
|
|
options["thinking"] = {"type": "disabled"}
|
|
else:
|
|
options["thinking"] = {"type": "enabled"}
|
|
options["reasoning_effort"] = "max" if self.reasoning_effort in {"xhigh", "max"} else "high"
|
|
if stream:
|
|
options["stream_options"] = {"include_usage": True}
|
|
return options
|
|
return {"reasoning_effort": self.reasoning_effort}
|
|
|
|
def _openai_messages(
|
|
self,
|
|
query: str,
|
|
messages: list[dict[str, Any]],
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> list[dict[str, Any]]:
|
|
normalized: list[dict[str, Any]] = []
|
|
for message in self._messages_with_context(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
):
|
|
role = message.get("role")
|
|
if role not in {"system", "user", "assistant", "tool"}:
|
|
continue
|
|
entry: dict[str, Any] = {"role": role, "content": message.get("content", "")}
|
|
if role == "user" and message.get("images"):
|
|
text_content = message.get("content", "")
|
|
content_parts: list[dict[str, Any]] = []
|
|
content_types = list(message.get("image_content_types") or [])
|
|
if text_content:
|
|
content_parts.append({"type": "text", "text": text_content})
|
|
for index, image_data in enumerate(message.get("images") or []):
|
|
content_type = content_types[index] if index < len(content_types) else "image/png"
|
|
content_parts.append(
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {"url": f"data:{content_type};base64,{image_data}"},
|
|
}
|
|
)
|
|
entry["content"] = content_parts
|
|
if role == "assistant" and message.get("tool_calls"):
|
|
entry["tool_calls"] = message["tool_calls"]
|
|
if role == "assistant" and message.get("reasoning_content"):
|
|
entry["reasoning_content"] = message["reasoning_content"]
|
|
if role == "tool":
|
|
entry["tool_call_id"] = message.get("tool_call_id") or message.get("tool_name") or "tool"
|
|
normalized.append(entry)
|
|
return normalized
|
|
|
|
def _codex_tool_catalog(self) -> list[dict[str, Any]]:
|
|
tools: list[dict[str, Any]] = []
|
|
for schema in self.tools.schemas:
|
|
if schema.get("type") != "function":
|
|
continue
|
|
function = schema.get("function") or {}
|
|
tools.append(
|
|
{
|
|
"name": function.get("name", ""),
|
|
"description": function.get("description", ""),
|
|
"parameters": function.get("parameters") or {"type": "object", "properties": {}},
|
|
}
|
|
)
|
|
return tools
|
|
|
|
def _provider_label(self) -> str:
|
|
if self.provider == "openai":
|
|
return "OpenAI model"
|
|
if self.provider == "deepseek":
|
|
return "DeepSeek model"
|
|
if self.provider == "codex":
|
|
return "Codex model"
|
|
return "local model"
|
|
|
|
def _is_openai_compatible_provider(self) -> bool:
|
|
return self.provider in {"openai", "deepseek"}
|
|
|
|
def _tool_rounds(self):
|
|
if self.provider == "deepseek":
|
|
while True:
|
|
yield None
|
|
return
|
|
for _ in range(10):
|
|
yield None
|
|
|
|
def _tool_round_limit_message(self) -> str:
|
|
return "I hit the tool-call limit while working on that. Try narrowing the request or approve any pending action first."
|
|
|
|
def _wake_tool_round_limit_message(self) -> str:
|
|
return "I hit the tool-call limit while running this scheduled wake job. Check the job prompt or pending approvals."
|
|
|
|
@staticmethod
|
|
def _merge_openai_tool_call(target: dict[int, dict[str, Any]], delta: dict[str, Any]) -> None:
|
|
index = int(delta.get("index") or 0)
|
|
current = target.setdefault(index, {"id": delta.get("id"), "type": "function", "function": {"name": "", "arguments": ""}})
|
|
if delta.get("id"):
|
|
current["id"] = delta["id"]
|
|
function = delta.get("function") or {}
|
|
current_function = current.setdefault("function", {"name": "", "arguments": ""})
|
|
if function.get("name"):
|
|
current_function["name"] += function["name"]
|
|
if function.get("arguments"):
|
|
current_function["arguments"] += function["arguments"]
|
|
|
|
@staticmethod
|
|
def _ordered_tool_calls(tool_calls: dict[int, dict[str, Any]]) -> list[dict[str, Any]]:
|
|
return [tool_calls[index] for index in sorted(tool_calls)]
|
|
|
|
async def _codex_cli_turn(
|
|
self,
|
|
query: str,
|
|
messages: list[dict[str, Any]],
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> dict[str, Any]:
|
|
return await self._codex_app_server_turn(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
)
|
|
|
|
async def _codex_app_server_turn(
|
|
self,
|
|
query: str,
|
|
messages: list[dict[str, Any]],
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> dict[str, Any]:
|
|
prompt = self._codex_cli_prompt(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
)
|
|
final_text = ""
|
|
|
|
process = await self._start_codex_app_server()
|
|
request_id = 1
|
|
|
|
async def send_request(method: str, params: dict[str, Any] | None = None, timeout: int = 120) -> dict[str, Any]:
|
|
nonlocal request_id
|
|
current_id = request_id
|
|
request_id += 1
|
|
payload: dict[str, Any] = {"jsonrpc": "2.0", "id": current_id, "method": method}
|
|
if params is not None:
|
|
payload["params"] = params
|
|
await self._codex_app_server_write(process, payload)
|
|
while True:
|
|
message = await self._codex_app_server_read(process, timeout=timeout)
|
|
if message.get("id") == current_id:
|
|
if message.get("error"):
|
|
error = message["error"]
|
|
raise RuntimeError(error.get("message") or f"Codex App Server request failed: {error}")
|
|
return message.get("result") or {}
|
|
await self._handle_codex_app_server_message(process, message)
|
|
|
|
try:
|
|
await send_request(
|
|
"initialize",
|
|
{
|
|
"clientInfo": {"name": "TraderAI", "version": __version__},
|
|
"capabilities": {"experimentalApi": True},
|
|
},
|
|
timeout=30,
|
|
)
|
|
await self._codex_app_server_write(process, {"jsonrpc": "2.0", "method": "initialized", "params": {}})
|
|
thread = await send_request(
|
|
"thread/start",
|
|
{
|
|
"model": self.model,
|
|
"modelProvider": None,
|
|
"cwd": str(Path.cwd()),
|
|
"approvalPolicy": "never",
|
|
"sandbox": "read-only",
|
|
"baseInstructions": "You are TraderAI running through the local Codex App Server using ChatGPT OAuth.",
|
|
"developerInstructions": (
|
|
"Do not run shell commands, inspect files, or modify the workspace. "
|
|
"Answer only with JSON matching the requested output schema."
|
|
),
|
|
"ephemeral": True,
|
|
"experimentalRawEvents": False,
|
|
"persistExtendedHistory": False,
|
|
},
|
|
timeout=30,
|
|
)
|
|
thread_id_value = ((thread.get("thread") or {}).get("id") or thread.get("threadId") or "").strip()
|
|
if not thread_id_value:
|
|
raise RuntimeError(f"Codex App Server did not return a thread id: {thread!r}")
|
|
turn = await send_request(
|
|
"turn/start",
|
|
{
|
|
"threadId": thread_id_value,
|
|
"input": [{"type": "text", "text": prompt, "text_elements": []}],
|
|
"cwd": str(Path.cwd()),
|
|
"approvalPolicy": "never",
|
|
"sandboxPolicy": {"type": "readOnly", "access": {"type": "fullAccess"}},
|
|
"model": self.model,
|
|
"effort": self.reasoning_effort,
|
|
"summary": "none",
|
|
"outputSchema": self._codex_output_schema(),
|
|
},
|
|
timeout=60,
|
|
)
|
|
turn_id = ((turn.get("turn") or {}).get("id") or "").strip()
|
|
if not turn_id:
|
|
raise RuntimeError(f"Codex App Server did not return a turn id: {turn!r}")
|
|
while True:
|
|
message = await self._codex_app_server_read(process, timeout=240)
|
|
method = message.get("method")
|
|
params = message.get("params") or {}
|
|
if method == "item/agentMessage/delta" and params.get("turnId") == turn_id:
|
|
final_text += params.get("delta") or ""
|
|
elif method == "item/completed" and params.get("turnId") == turn_id:
|
|
item = params.get("item") or {}
|
|
if item.get("type") == "agentMessage":
|
|
final_text = item.get("text") or final_text
|
|
elif method == "turn/completed" and (params.get("turn") or {}).get("id") == turn_id:
|
|
turn_status = (params.get("turn") or {}).get("status")
|
|
if turn_status != "completed":
|
|
error = (params.get("turn") or {}).get("error") or {}
|
|
raise RuntimeError(error.get("message") or f"Codex App Server turn ended with status {turn_status}.")
|
|
break
|
|
elif method == "error":
|
|
error = params.get("message") or params.get("error") or params
|
|
raise RuntimeError(f"Codex App Server error: {error}")
|
|
else:
|
|
await self._handle_codex_app_server_message(process, message)
|
|
finally:
|
|
await self._stop_codex_app_server(process)
|
|
return self._parse_codex_app_server_text(final_text)
|
|
|
|
def _codex_cli_prompt(
|
|
self,
|
|
query: str,
|
|
messages: list[dict[str, Any]],
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
) -> str:
|
|
conversation_lines: list[str] = []
|
|
for message in self._messages_with_context(
|
|
query,
|
|
messages,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
):
|
|
role = message.get("role", "unknown")
|
|
content = message.get("content", "")
|
|
suffix = ""
|
|
if role == "user" and message.get("images"):
|
|
suffix = f" [attached images: {len(message.get('images') or [])}]"
|
|
if role == "tool":
|
|
suffix = f" [tool {message.get('tool_name') or ''}]"
|
|
if role == "assistant" and message.get("tool_calls"):
|
|
suffix = f" [tool calls: {json.dumps(message.get('tool_calls'), ensure_ascii=True)}]"
|
|
conversation_lines.append(f"{role}{suffix}: {content}")
|
|
tools_json = json.dumps(self._codex_tool_catalog(), ensure_ascii=True, indent=2)
|
|
return (
|
|
"You are TraderAI running through the local Codex App Server using ChatGPT OAuth.\n"
|
|
"Do not run shell commands, inspect files, or modify the workspace.\n"
|
|
"Your only job is to decide whether to answer directly or request exactly one TraderAI tool.\n\n"
|
|
"Return JSON that matches the provided schema.\n"
|
|
"- If you can answer now, set kind to final, put the user-facing reply in message, set tool_name to an empty string, and set arguments_json to '{}'.\n"
|
|
"- If you need a tool, set kind to tool_call, set tool_name to the exact tool name, set message to an empty string, and set arguments_json to a valid JSON object string.\n"
|
|
"- Never return more than one tool call at a time.\n"
|
|
"- Prefer the TraderAI tools over guessing.\n\n"
|
|
f"Available tools:\n{tools_json}\n\n"
|
|
"Conversation transcript:\n"
|
|
+ "\n".join(conversation_lines)
|
|
)
|
|
|
|
def _codex_structured_response(self, result: dict[str, Any]) -> dict[str, Any]:
|
|
if result.get("kind") == "tool_call":
|
|
tool_name = str(result.get("tool_name") or "").strip()
|
|
arguments_json = str(result.get("arguments_json") or "{}").strip() or "{}"
|
|
return {
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{
|
|
"id": f"codex-{uuid.uuid4()}",
|
|
"type": "function",
|
|
"function": {
|
|
"name": tool_name,
|
|
"arguments": arguments_json,
|
|
},
|
|
}
|
|
],
|
|
}
|
|
}
|
|
return {
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": str(result.get("message") or ""),
|
|
"tool_calls": [],
|
|
}
|
|
}
|
|
|
|
def _write_codex_schema(self) -> str:
|
|
schema = self._codex_output_schema()
|
|
with tempfile.NamedTemporaryFile("w", suffix="-traderai-codex-schema.json", delete=False, encoding="utf-8") as handle:
|
|
json.dump(schema, handle, ensure_ascii=True)
|
|
return handle.name
|
|
|
|
@staticmethod
|
|
def _codex_output_schema() -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"kind": {"type": "string", "enum": ["final", "tool_call"]},
|
|
"message": {"type": "string"},
|
|
"tool_name": {"type": "string"},
|
|
"arguments_json": {"type": "string"},
|
|
},
|
|
"required": ["kind", "message", "tool_name", "arguments_json"],
|
|
"additionalProperties": False,
|
|
}
|
|
|
|
def _parse_codex_app_server_text(self, final_text: str) -> dict[str, Any]:
|
|
if not final_text.strip():
|
|
raise RuntimeError("Codex App Server returned an empty response.")
|
|
try:
|
|
parsed = json.loads(final_text)
|
|
except ValueError as exc:
|
|
raise RuntimeError(f"Codex App Server returned non-JSON output: {final_text}") from exc
|
|
if parsed.get("kind") not in {"final", "tool_call"}:
|
|
raise RuntimeError(f"Codex App Server returned an invalid result kind: {parsed!r}")
|
|
return parsed
|
|
|
|
def _parse_codex_exec_output(self, output: dict[str, Any]) -> dict[str, Any]:
|
|
events = output.get("events") or []
|
|
final_text = ""
|
|
error_text = ""
|
|
for event in events:
|
|
if event.get("type") == "item.completed":
|
|
item = event.get("item") or {}
|
|
if item.get("type") == "agent_message":
|
|
final_text = item.get("text") or final_text
|
|
elif event.get("type") == "error":
|
|
error_text = event.get("message") or error_text
|
|
elif event.get("type") == "turn.failed":
|
|
details = event.get("error") or {}
|
|
error_text = details.get("message") or error_text
|
|
if output.get("returncode") != 0 and not final_text:
|
|
raise RuntimeError(error_text or output.get("stderr") or "Codex CLI failed.")
|
|
try:
|
|
parsed = json.loads(final_text)
|
|
except ValueError as exc:
|
|
raise RuntimeError(f"Codex CLI returned non-JSON output: {final_text}") from exc
|
|
if parsed.get("kind") not in {"final", "tool_call"}:
|
|
raise RuntimeError(f"Codex CLI returned an invalid result kind: {parsed!r}")
|
|
return parsed
|
|
|
|
def _codex_command(self, required: bool = False) -> str | None:
|
|
configured = self.base_url.strip() if self.base_url else "codex"
|
|
resolved = shutil.which(configured) or configured
|
|
if required and not Path(resolved).exists() and shutil.which(resolved) is None:
|
|
raise RuntimeError("Codex CLI was not found on PATH.")
|
|
return resolved
|
|
|
|
async def _codex_app_server_status(self) -> tuple[dict[str, Any] | None, list[str]]:
|
|
process = await self._start_codex_app_server()
|
|
request_id = 1
|
|
|
|
async def send_request(method: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
nonlocal request_id
|
|
current_id = request_id
|
|
request_id += 1
|
|
payload: dict[str, Any] = {"jsonrpc": "2.0", "id": current_id, "method": method}
|
|
if params is not None:
|
|
payload["params"] = params
|
|
await self._codex_app_server_write(process, payload)
|
|
while True:
|
|
message = await self._codex_app_server_read(process, timeout=30)
|
|
if message.get("id") == current_id:
|
|
if message.get("error"):
|
|
error = message["error"]
|
|
raise RuntimeError(error.get("message") or f"Codex App Server request failed: {error}")
|
|
return message.get("result") or {}
|
|
await self._handle_codex_app_server_message(process, message)
|
|
|
|
try:
|
|
await send_request(
|
|
"initialize",
|
|
{
|
|
"clientInfo": {"name": "TraderAI", "version": __version__},
|
|
"capabilities": {"experimentalApi": True},
|
|
},
|
|
)
|
|
await self._codex_app_server_write(process, {"jsonrpc": "2.0", "method": "initialized", "params": {}})
|
|
account_result = await send_request("account/read", {"refreshToken": False})
|
|
models: list[str] = []
|
|
cursor: str | None = None
|
|
for _ in range(20):
|
|
params: dict[str, Any] = {"limit": 50, "includeHidden": False}
|
|
if cursor:
|
|
params["cursor"] = cursor
|
|
page = await send_request("model/list", params)
|
|
for item in page.get("data") or []:
|
|
model = item.get("id") or item.get("model")
|
|
if model:
|
|
models.append(model)
|
|
cursor = page.get("nextCursor")
|
|
if not cursor:
|
|
break
|
|
return account_result.get("account"), sorted(set(models))
|
|
finally:
|
|
await self._stop_codex_app_server(process)
|
|
|
|
async def _start_codex_app_server(self) -> asyncio.subprocess.Process:
|
|
return await asyncio.create_subprocess_exec(
|
|
self._codex_command(required=True),
|
|
"app-server",
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
async def _codex_app_server_write(self, process: asyncio.subprocess.Process, payload: dict[str, Any]) -> None:
|
|
if process.stdin is None:
|
|
raise RuntimeError("Codex App Server stdin is unavailable.")
|
|
process.stdin.write((json.dumps(payload, ensure_ascii=True) + "\n").encode("utf-8"))
|
|
await process.stdin.drain()
|
|
|
|
async def _codex_app_server_read(self, process: asyncio.subprocess.Process, timeout: int) -> dict[str, Any]:
|
|
if process.stdout is None:
|
|
raise RuntimeError("Codex App Server stdout is unavailable.")
|
|
try:
|
|
line = await asyncio.wait_for(process.stdout.readline(), timeout=timeout)
|
|
except TimeoutError as exc:
|
|
raise RuntimeError("Codex App Server timed out.") from exc
|
|
if not line:
|
|
stderr = ""
|
|
if process.stderr is not None:
|
|
try:
|
|
stderr = (await asyncio.wait_for(process.stderr.read(), timeout=1)).decode("utf-8", errors="replace").strip()
|
|
except TimeoutError:
|
|
stderr = ""
|
|
raise RuntimeError(stderr or "Codex App Server exited without a response.")
|
|
try:
|
|
return json.loads(line.decode("utf-8", errors="replace"))
|
|
except ValueError as exc:
|
|
raise RuntimeError(f"Codex App Server returned invalid JSON-RPC: {line!r}") from exc
|
|
|
|
async def _handle_codex_app_server_message(self, process: asyncio.subprocess.Process, message: dict[str, Any]) -> None:
|
|
if "id" not in message or "method" not in message:
|
|
return
|
|
method = message.get("method")
|
|
if method in {
|
|
"item/commandExecution/requestApproval",
|
|
"item/fileChange/requestApproval",
|
|
"applyPatchApproval",
|
|
"execCommandApproval",
|
|
}:
|
|
await self._codex_app_server_write(
|
|
process,
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"id": message["id"],
|
|
"result": {
|
|
"decision": "deny",
|
|
"message": "TraderAI does not allow Codex to run commands or change files.",
|
|
},
|
|
},
|
|
)
|
|
return
|
|
await self._codex_app_server_write(
|
|
process,
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"id": message["id"],
|
|
"error": {"code": -32601, "message": f"TraderAI does not handle Codex App Server request {method}."},
|
|
},
|
|
)
|
|
|
|
async def _stop_codex_app_server(self, process: asyncio.subprocess.Process) -> None:
|
|
if process.returncode is not None:
|
|
return
|
|
process.terminate()
|
|
try:
|
|
await asyncio.wait_for(process.wait(), timeout=3)
|
|
except TimeoutError:
|
|
process.kill()
|
|
await process.wait()
|
|
|
|
async def _run_command(self, command: list[str], timeout: int = 120, stdin_text: str | None = None) -> dict[str, Any]:
|
|
process = await asyncio.create_subprocess_exec(
|
|
*command,
|
|
stdin=asyncio.subprocess.PIPE if stdin_text is not None else None,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
try:
|
|
payload = stdin_text.encode("utf-8") if stdin_text is not None else None
|
|
stdout, stderr = await asyncio.wait_for(process.communicate(payload), timeout=timeout)
|
|
except TimeoutError:
|
|
process.kill()
|
|
await process.communicate()
|
|
raise RuntimeError(f"Command timed out: {' '.join(command[:3])}")
|
|
stdout_text = stdout.decode("utf-8", errors="replace")
|
|
stderr_text = stderr.decode("utf-8", errors="replace")
|
|
events = []
|
|
for line in stdout_text.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
events.append(json.loads(line))
|
|
except ValueError:
|
|
events.append({"type": "stdout", "text": line})
|
|
return {
|
|
"returncode": process.returncode,
|
|
"stdout": stdout_text,
|
|
"stderr": stderr_text,
|
|
"events": events,
|
|
}
|
|
|
|
def _codex_model_cache(self) -> list[str]:
|
|
cache_path = Path.home() / ".codex" / "models_cache.json"
|
|
if not cache_path.exists():
|
|
return []
|
|
try:
|
|
body = json.loads(cache_path.read_text(encoding="utf-8"))
|
|
except (OSError, ValueError):
|
|
return []
|
|
models = []
|
|
for item in body.get("models", []):
|
|
slug = item.get("slug")
|
|
if slug:
|
|
models.append(slug)
|
|
return sorted(set(models))
|
|
|
|
def _runtime_context_parts(
|
|
self,
|
|
query: str,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
attached_image_count: int = 0,
|
|
) -> tuple[list[str], list[str]]:
|
|
local_zone = get_localzone()
|
|
stable_parts: list[str] = []
|
|
volatile_parts = [
|
|
f"Current local date/time: {iso_now()} UTC; {iso_now_in_zone(local_zone)} {local_zone}.",
|
|
]
|
|
if attached_image_count:
|
|
label = "image" if attached_image_count == 1 else "images"
|
|
volatile_parts.append(
|
|
f"Current user message includes {attached_image_count} pasted {label}. "
|
|
"You can inspect them visually. If the user wants one reused in a marketplace listing draft, "
|
|
"call draft_marketplace_listing or draft_marketplace_listing_with_cornerstone_image with "
|
|
"use_attached_image=true and attached_image_index when needed."
|
|
)
|
|
uex = getattr(self.tools, "uex", None)
|
|
if uex:
|
|
auth_methods = []
|
|
if uex.secret_key:
|
|
auth_methods.append("secret key")
|
|
if uex.bearer_token:
|
|
auth_methods.append("bearer token")
|
|
if auth_methods:
|
|
stable_parts.append(
|
|
"UEX API authentication is configured server-side with "
|
|
+ " and ".join(auth_methods)
|
|
+ "; use authenticated UEX tools directly and do not ask for tokens."
|
|
)
|
|
else:
|
|
stable_parts.append("UEX API authentication is not configured server-side.")
|
|
if self.user_name:
|
|
stable_parts.append(f"Known user name/handle: {self.user_name}.")
|
|
|
|
if self.memory is None:
|
|
return stable_parts, volatile_parts
|
|
|
|
profile = self.memory.get_profile()
|
|
if profile:
|
|
identity = self._profile_identity(profile)
|
|
if identity:
|
|
stable_parts.append(identity)
|
|
stable_parts.append(f"Known user profile JSON: {json.dumps(self._profile_for_prompt(profile), ensure_ascii=True)}.")
|
|
|
|
last = previous_interaction if previous_interaction is not None else self.memory.last_interaction(thread_id)
|
|
if last:
|
|
volatile_parts.append(
|
|
f"Previous interaction before this message: {last['created_at']} "
|
|
f"({time_since(last['created_at'])}, role {last['role']})."
|
|
)
|
|
else:
|
|
volatile_parts.append("Previous interaction before this message: none recorded.")
|
|
|
|
memories = self.memory.recall(query, limit=6)
|
|
if memories:
|
|
memory_text = "\n".join(
|
|
f"- [{item['kind']}, importance {item['importance']}] {item['content']}"
|
|
for item in memories
|
|
)
|
|
volatile_parts.append(f"Relevant long-term memories:\n{memory_text}")
|
|
|
|
return stable_parts, volatile_parts
|
|
|
|
def _runtime_context(
|
|
self,
|
|
query: str,
|
|
previous_interaction: dict[str, Any] | None = None,
|
|
thread_id: str | None = DEFAULT_THREAD_ID,
|
|
attached_image_count: int = 0,
|
|
) -> str:
|
|
stable_parts, volatile_parts = self._runtime_context_parts(
|
|
query,
|
|
previous_interaction=previous_interaction,
|
|
thread_id=thread_id,
|
|
attached_image_count=attached_image_count,
|
|
)
|
|
return "\n".join(part for part in ("\n".join(stable_parts), "\n".join(volatile_parts)) if part)
|
|
|
|
def _messages_for_thread(self, thread_id: str | None) -> list[dict[str, Any]]:
|
|
resolved_thread_id = self._thread_id(thread_id)
|
|
if resolved_thread_id not in self.thread_messages:
|
|
messages: list[dict[str, Any]] = [{"role": "system", "content": SYSTEM_PROMPT}]
|
|
if self.memory:
|
|
self.memory.ensure_thread(resolved_thread_id)
|
|
for item in self.memory.recent_conversation(limit=30, thread_id=resolved_thread_id):
|
|
role = item.get("role")
|
|
if role in {"user", "assistant"} and item.get("content"):
|
|
messages.append({"role": role, "content": item["content"]})
|
|
self.thread_messages[resolved_thread_id] = messages
|
|
return self.thread_messages[resolved_thread_id]
|
|
|
|
async def _title_first_message(
|
|
self,
|
|
thread_id: str,
|
|
first_message: str,
|
|
previous_interaction: dict[str, Any] | None,
|
|
) -> None:
|
|
if self.memory is None or previous_interaction is not None:
|
|
return
|
|
thread = self.memory.get_thread(thread_id)
|
|
if not thread or thread.get("title") != "New chat":
|
|
return
|
|
title = await self._generate_chat_title(first_message)
|
|
self.memory.rename_thread(thread_id, title or MemoryStore._thread_title(first_message))
|
|
|
|
async def _generate_chat_title(self, first_message: str) -> str:
|
|
prompt = (
|
|
"Create a concise chat title for this first user message. "
|
|
"Use 2 to 6 words. No quotes, no punctuation at the end, no preamble.\n\n"
|
|
f"Message: {first_message[:800]}"
|
|
)
|
|
try:
|
|
if self._is_openai_compatible_provider():
|
|
async with httpx.AsyncClient(timeout=20) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/chat/completions",
|
|
headers=self._openai_headers(),
|
|
json={
|
|
"model": self.model,
|
|
"messages": [
|
|
{"role": "system", "content": "You write short chat titles."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"stream": False,
|
|
**self._openai_request_options(stream=False),
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
choice = (response.json().get("choices") or [{}])[0]
|
|
message = choice.get("message") or {}
|
|
return self._clean_generated_title(message.get("content", ""))
|
|
if self.provider == "codex":
|
|
result = await self._codex_app_server_turn(
|
|
prompt,
|
|
[
|
|
{"role": "system", "content": "You write short chat titles."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
thread_id="title",
|
|
)
|
|
return self._clean_generated_title(result.get("message", ""))
|
|
async with httpx.AsyncClient(timeout=20) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/api/chat",
|
|
json={
|
|
"model": self.model,
|
|
"messages": [
|
|
{"role": "system", "content": "You write short chat titles."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"options": self._ollama_options(),
|
|
"stream": False,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
message = response.json().get("message") or {}
|
|
return self._clean_generated_title(message.get("content", ""))
|
|
except Exception:
|
|
return ""
|
|
|
|
async def generate_plan_draft(
|
|
self,
|
|
title: str = "",
|
|
objective: str = "",
|
|
kind: str = "buying",
|
|
constraints: dict[str, Any] | None = None,
|
|
items: list[dict[str, Any]] | None = None,
|
|
) -> dict[str, Any]:
|
|
clean_title = str(title or "").strip()
|
|
clean_objective = str(objective or "").strip()
|
|
clean_kind = str(kind or "buying").strip().casefold() or "buying"
|
|
clean_constraints = dict(constraints or {})
|
|
clean_items = self._normalize_plan_items(items or [])
|
|
seed = {
|
|
"title": clean_title,
|
|
"objective": clean_objective,
|
|
"kind": clean_kind,
|
|
"constraints": clean_constraints,
|
|
"items": clean_items,
|
|
}
|
|
prompt = self._plan_draft_prompt(seed)
|
|
fallback = self._heuristic_plan_draft(seed)
|
|
try:
|
|
payload = await self._generate_plain_text(prompt, system_prompt="You draft structured continual plan JSON for TraderAI.")
|
|
return self._normalize_plan_draft(payload, seed, fallback)
|
|
except Exception:
|
|
return fallback
|
|
|
|
@staticmethod
|
|
def _thread_id(thread_id: str | None) -> str:
|
|
return (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID
|
|
|
|
@staticmethod
|
|
def _clean_generated_title(title: str) -> str:
|
|
text = re.sub(r"[\r\n]+", " ", title).strip().strip('"').strip("'")
|
|
text = re.sub(r"^(title|chat title)\s*:\s*", "", text, flags=re.IGNORECASE).strip()
|
|
text = text.rstrip(".!?;:-").strip()
|
|
if not text:
|
|
return ""
|
|
words = text.split()
|
|
if len(words) > 8:
|
|
text = " ".join(words[:8])
|
|
return text[:64]
|
|
|
|
async def _generate_plain_text(self, prompt: str, system_prompt: str) -> str:
|
|
if self._is_openai_compatible_provider():
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/chat/completions",
|
|
headers=self._openai_headers(),
|
|
json={
|
|
"model": self.model,
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"stream": False,
|
|
**self._openai_request_options(stream=False),
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
choice = (response.json().get("choices") or [{}])[0]
|
|
message = choice.get("message") or {}
|
|
return str(message.get("content") or "")
|
|
if self.provider == "codex":
|
|
result = await self._codex_app_server_turn(
|
|
prompt,
|
|
[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
thread_id="plan-draft",
|
|
)
|
|
return str(result.get("message") or "")
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/api/chat",
|
|
json={
|
|
"model": self.model,
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"options": self._ollama_options(),
|
|
"stream": False,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
message = response.json().get("message") or {}
|
|
return str(message.get("content") or "")
|
|
|
|
@classmethod
|
|
def _normalize_plan_draft(
|
|
cls,
|
|
raw_text: str,
|
|
seed: dict[str, Any],
|
|
fallback: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
base = dict(fallback or cls._heuristic_plan_draft(seed))
|
|
payload = cls._parse_json_object(raw_text)
|
|
if not isinstance(payload, dict):
|
|
return base
|
|
|
|
title = str(payload.get("title") or seed.get("title") or base.get("title") or "").strip()
|
|
objective = str(payload.get("objective") or seed.get("objective") or base.get("objective") or "").strip()
|
|
kind = str(payload.get("kind") or seed.get("kind") or base.get("kind") or "buying").strip().casefold() or "buying"
|
|
cadence = cls._normalize_plan_cadence(payload.get("cadence")) or base.get("cadence")
|
|
constraints = cls._normalize_plan_constraints(payload.get("constraints"), seed.get("constraints") or {}, base.get("constraints") or {})
|
|
items = cls._normalize_plan_items(payload.get("items") or seed.get("items") or base.get("items") or [])
|
|
|
|
if kind == "buying" and not items:
|
|
items = list(base.get("items") or [])
|
|
if not constraints.get("instructions"):
|
|
constraints["instructions"] = (base.get("constraints") or {}).get("instructions") or cls._default_plan_instructions(kind)
|
|
if not constraints.get("message_tone"):
|
|
constraints["message_tone"] = (base.get("constraints") or {}).get("message_tone") or "friendly and direct"
|
|
|
|
return {
|
|
"title": title or base.get("title") or "Continual plan",
|
|
"objective": objective or base.get("objective") or title or "Continue this plan",
|
|
"kind": kind,
|
|
"cadence": cadence,
|
|
"constraints": constraints,
|
|
"items": items,
|
|
}
|
|
|
|
@classmethod
|
|
def _heuristic_plan_draft(cls, seed: dict[str, Any]) -> dict[str, Any]:
|
|
title = str(seed.get("title") or "").strip()
|
|
objective = str(seed.get("objective") or "").strip()
|
|
kind = str(seed.get("kind") or "buying").strip().casefold() or "buying"
|
|
constraints = cls._normalize_plan_constraints(seed.get("constraints"), {}, {})
|
|
items = cls._normalize_plan_items(seed.get("items") or [])
|
|
|
|
if not items and kind == "buying":
|
|
inferred_names = cls._infer_item_names(f"{title}\n{objective}")
|
|
items = [{"item_name": name, "desired_quantity": 1, "max_unit_price": None} for name in inferred_names[:8]]
|
|
|
|
if not constraints.get("message_tone"):
|
|
constraints["message_tone"] = "friendly and direct"
|
|
if not constraints.get("instructions"):
|
|
constraints["instructions"] = cls._default_plan_instructions(kind)
|
|
|
|
return {
|
|
"title": title or "Continual plan",
|
|
"objective": objective or title or "Continue this plan",
|
|
"kind": kind,
|
|
"cadence": cls._normalize_plan_cadence(seed.get("cadence")) or ("0 */6 * * *" if kind == "buying" else "0 */4 * * *"),
|
|
"constraints": constraints,
|
|
"items": items,
|
|
}
|
|
|
|
@staticmethod
|
|
def _default_plan_instructions(kind: str) -> str:
|
|
if kind == "custom":
|
|
return "Check for meaningful updates, summarize what changed, and suggest the next move."
|
|
return "Track the best active listings, avoid bad prices, and draft messages for approval when a strong candidate appears."
|
|
|
|
@staticmethod
|
|
def _plan_draft_prompt(seed: dict[str, Any]) -> str:
|
|
return (
|
|
"Draft a continual TraderAI plan as strict JSON.\n"
|
|
"Return one JSON object only with keys: title, objective, kind, cadence, constraints, items.\n"
|
|
"constraints may include message_tone, instructions, preferred_locations, excluded_sellers, max_unit_price.\n"
|
|
"items must be an array of objects with item_name, desired_quantity, max_unit_price.\n"
|
|
"If the request is vague, still fill cadence, message_tone, and instructions.\n"
|
|
"Only include checklist items when they can be reasonably inferred from the request or existing draft.\n"
|
|
"Do not wrap the JSON in markdown.\n\n"
|
|
f"Current draft seed: {json.dumps(seed, ensure_ascii=True)}"
|
|
)
|
|
|
|
@staticmethod
|
|
def _parse_json_object(raw_text: str) -> dict[str, Any] | None:
|
|
text = str(raw_text or "").strip()
|
|
if not text:
|
|
return None
|
|
try:
|
|
parsed = json.loads(text)
|
|
return parsed if isinstance(parsed, dict) else None
|
|
except ValueError:
|
|
pass
|
|
start = text.find("{")
|
|
end = text.rfind("}")
|
|
if start == -1 or end <= start:
|
|
return None
|
|
try:
|
|
parsed = json.loads(text[start : end + 1])
|
|
return parsed if isinstance(parsed, dict) else None
|
|
except ValueError:
|
|
return None
|
|
|
|
@classmethod
|
|
def _normalize_plan_constraints(cls, value: Any, seed: dict[str, Any], fallback: dict[str, Any]) -> dict[str, Any]:
|
|
merged: dict[str, Any] = {}
|
|
for source in (fallback, seed, value if isinstance(value, dict) else {}):
|
|
if not isinstance(source, dict):
|
|
continue
|
|
for key, item in source.items():
|
|
if item in (None, "", [], {}):
|
|
continue
|
|
if key in {"preferred_locations", "excluded_sellers"}:
|
|
if isinstance(item, list):
|
|
merged[key] = [str(entry).strip() for entry in item if str(entry).strip()]
|
|
elif key == "max_unit_price":
|
|
try:
|
|
merged[key] = float(item)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
else:
|
|
merged[key] = str(item).strip()
|
|
return merged
|
|
|
|
@staticmethod
|
|
def _normalize_plan_cadence(value: Any) -> str | None:
|
|
text = str(value or "").strip()
|
|
if not text:
|
|
return None
|
|
parts = text.split()
|
|
return text if len(parts) == 5 else None
|
|
|
|
@classmethod
|
|
def _normalize_plan_items(cls, items: Any) -> list[dict[str, Any]]:
|
|
if not isinstance(items, list):
|
|
return []
|
|
normalized: list[dict[str, Any]] = []
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
name = str(item.get("item_name") or item.get("name") or "").strip()
|
|
if not name:
|
|
continue
|
|
normalized_item: dict[str, Any] = {"item_name": name}
|
|
try:
|
|
normalized_item["desired_quantity"] = max(1, int(item.get("desired_quantity") or item.get("quantity") or 1))
|
|
except (TypeError, ValueError):
|
|
normalized_item["desired_quantity"] = 1
|
|
try:
|
|
if item.get("max_unit_price") not in (None, ""):
|
|
normalized_item["max_unit_price"] = float(item.get("max_unit_price"))
|
|
elif item.get("max_price") not in (None, ""):
|
|
normalized_item["max_unit_price"] = float(item.get("max_price"))
|
|
else:
|
|
normalized_item["max_unit_price"] = None
|
|
except (TypeError, ValueError):
|
|
normalized_item["max_unit_price"] = None
|
|
normalized.append(normalized_item)
|
|
return normalized
|
|
|
|
@staticmethod
|
|
def _infer_item_names(text: str) -> list[str]:
|
|
source = str(text or "")
|
|
quoted = [match.strip() for match in re.findall(r'"([^"\n]{2,80})"|\'([^\'\n]{2,80})\'', source)]
|
|
names = [next((part for part in group if part), "") for group in quoted]
|
|
if names:
|
|
return [name for name in names if name]
|
|
|
|
lines = []
|
|
for raw_line in source.splitlines():
|
|
line = raw_line.strip(" -*\t")
|
|
if not line:
|
|
continue
|
|
if any(token in line for token in [",", ";", "/"]):
|
|
parts = re.split(r"[,;/]+", line)
|
|
lines.extend(part.strip() for part in parts if part.strip())
|
|
else:
|
|
lines.append(line)
|
|
|
|
stopwords = {
|
|
"need", "needs", "want", "wants", "find", "draft", "deal", "deals", "parts", "part", "items",
|
|
"watch", "track", "check", "buy", "buying", "for", "the", "and", "with", "from", "best", "cheapest",
|
|
}
|
|
inferred = []
|
|
for line in lines:
|
|
clean = re.sub(r"\s+", " ", line).strip().strip(".")
|
|
if len(clean) < 3:
|
|
continue
|
|
lowered = clean.casefold()
|
|
if lowered in stopwords:
|
|
continue
|
|
if any(phrase in lowered for phrase in ["find and draft", "check for", "continue this plan"]):
|
|
continue
|
|
inferred.append(clean[:120])
|
|
deduped = []
|
|
seen = set()
|
|
for item in inferred:
|
|
key = item.casefold()
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
deduped.append(item)
|
|
return deduped
|
|
|
|
def _pending_payloads(self) -> list[dict[str, Any]]:
|
|
return [
|
|
{
|
|
"id": action.id,
|
|
"label": action.label,
|
|
"method": action.method,
|
|
"endpoint": action.endpoint,
|
|
"payload": self.tools._display_payload(action.payload) if hasattr(self.tools, "_display_payload") else action.payload,
|
|
"metadata": action.metadata or {},
|
|
}
|
|
for action in self.tools.pending_actions.values()
|
|
]
|
|
|
|
def _ollama_options(self) -> dict[str, Any]:
|
|
if not self.num_ctx:
|
|
return {}
|
|
return {"num_ctx": self.num_ctx}
|
|
|
|
@staticmethod
|
|
def _empty_response_fallback(tool_results: list[dict[str, Any]]) -> str:
|
|
if not tool_results:
|
|
return "I did not get a usable response from the model. Please try again, or narrow the request a bit."
|
|
return OllamaAgent._tool_result_fallback(
|
|
tool_results,
|
|
"I completed the tool call, but the model did not write a final answer.",
|
|
)
|
|
|
|
@staticmethod
|
|
def _tool_result_fallback(tool_results: list[dict[str, Any]], reason: str) -> str:
|
|
last = tool_results[-1]
|
|
text = json.dumps(last, indent=2, ensure_ascii=True)
|
|
if len(text) > 1800:
|
|
text = text[:1800] + "\n..."
|
|
return (
|
|
f"{reason} "
|
|
"Here is the last tool result so you are not left staring at a blank response:\n\n"
|
|
f"```json\n{text}\n```"
|
|
)
|
|
|
|
@staticmethod
|
|
def _tool_status(name: str) -> str:
|
|
if name.startswith("get_uex_"):
|
|
return f"Fetching UEX {name.removeprefix('get_uex_')}"
|
|
if name.startswith("draft_uex_"):
|
|
return f"Drafting UEX {name.removeprefix('draft_uex_')} for approval"
|
|
if name.startswith("delete_uex_"):
|
|
return f"Drafting UEX {name.removeprefix('delete_uex_')} delete for approval"
|
|
labels = {
|
|
"search_uex_api_index": "Searching UEX API index",
|
|
"summarize_uex_commodity_price_history": "Summarizing commodity price history",
|
|
"summarize_uex_marketplace_price_history": "Summarizing marketplace price history",
|
|
"summarize_uex_currency_index_history": "Summarizing currency index history",
|
|
"list_scmdb_versions": "Checking SCMDB versions",
|
|
"search_scmdb_missions": "Searching SCMDB missions",
|
|
"get_scmdb_mission_rewards": "Fetching SCMDB mission rewards",
|
|
"search_scwiki_pages": "Searching Star Citizen Wiki",
|
|
"get_scwiki_page": "Reading Star Citizen Wiki page",
|
|
"search_scwiki_vehicles": "Searching Star Citizen Wiki vehicles",
|
|
"get_scwiki_vehicle": "Fetching Star Citizen Wiki vehicle",
|
|
"search_wikelo_ship_projects": "Searching Wikelo ship projects",
|
|
"get_wikelo_ship_project": "Fetching Wikelo ship requirements",
|
|
"search_cornerstone_items": "Searching Cornerstone items",
|
|
"get_cornerstone_item_locations": "Fetching Cornerstone item locations",
|
|
"get_cornerstone_item_media": "Fetching Cornerstone item media",
|
|
"uex_api_catalog": "Checking UEX API catalog",
|
|
"uex_get": "Fetching UEX data",
|
|
"uex_draft_post": "Drafting UEX write for approval",
|
|
"uex_draft_delete": "Drafting UEX delete for approval",
|
|
"search_marketplace_listings": "Searching UEX listings",
|
|
"get_marketplace_listing": "Fetching listing details",
|
|
"list_marketplace_negotiations": "Checking negotiations",
|
|
"get_negotiation_messages": "Reading negotiation messages",
|
|
"draft_negotiation_message": "Drafting message for approval",
|
|
"draft_marketplace_listing": "Drafting listing for approval",
|
|
"draft_marketplace_listing_with_cornerstone_image": "Drafting listing with Cornerstone image",
|
|
"check_uex_notifications": "Checking UEX notifications",
|
|
}
|
|
return labels.get(name, f"Running {name}")
|
|
|
|
@staticmethod
|
|
def _stream_metrics(event: dict[str, Any]) -> dict[str, Any]:
|
|
prompt_tokens = int(event.get("prompt_eval_count") or 0)
|
|
prompt_duration = int(event.get("prompt_eval_duration") or 0)
|
|
output_tokens = int(event.get("eval_count") or 0)
|
|
output_duration = int(event.get("eval_duration") or 0)
|
|
|
|
def rate(tokens: int, duration_ns: int) -> float | None:
|
|
if not tokens or not duration_ns:
|
|
return None
|
|
return tokens / (duration_ns / 1_000_000_000)
|
|
|
|
return {
|
|
"reading_tokens": prompt_tokens,
|
|
"reading_tokens_per_second": rate(prompt_tokens, prompt_duration),
|
|
"writing_tokens": output_tokens,
|
|
"writing_tokens_per_second": rate(output_tokens, output_duration),
|
|
}
|
|
|
|
@staticmethod
|
|
def _cloud_usage_metrics(usage: dict[str, Any]) -> dict[str, Any]:
|
|
prompt_tokens = int(usage.get("prompt_tokens") or 0)
|
|
completion_tokens = int(usage.get("completion_tokens") or 0)
|
|
cache_hit_tokens = int(usage.get("prompt_cache_hit_tokens") or 0)
|
|
cache_miss_tokens = int(usage.get("prompt_cache_miss_tokens") or 0)
|
|
metrics = {
|
|
"reading_tokens": prompt_tokens,
|
|
"writing_tokens": completion_tokens,
|
|
}
|
|
if cache_hit_tokens or cache_miss_tokens:
|
|
metrics["cache_hit_tokens"] = cache_hit_tokens
|
|
metrics["cache_miss_tokens"] = cache_miss_tokens
|
|
return metrics
|
|
|
|
@staticmethod
|
|
def _profile_identity(profile: dict[str, Any]) -> str:
|
|
user = profile.get("uex_user")
|
|
if not isinstance(user, dict):
|
|
configured = profile.get("configured_name")
|
|
return f"You are speaking with {configured}." if configured else ""
|
|
|
|
username = user.get("username") or user.get("user_username")
|
|
name = user.get("name")
|
|
fields = []
|
|
if username and name and username != name:
|
|
fields.append(f"You are speaking with UEX user {username} ({name}).")
|
|
elif username or name:
|
|
fields.append(f"You are speaking with UEX user {username or name}.")
|
|
|
|
details = []
|
|
for key, label in [
|
|
("timezone", "timezone"),
|
|
("language", "preferred language"),
|
|
("specializations", "specializations"),
|
|
("languages", "languages"),
|
|
("archetypes", "archetypes"),
|
|
]:
|
|
value = user.get(key)
|
|
if value:
|
|
details.append(f"{label}: {value}")
|
|
if details:
|
|
fields.append("UEX profile details: " + "; ".join(details) + ".")
|
|
return " ".join(fields)
|
|
|
|
@staticmethod
|
|
def _profile_for_prompt(profile: dict[str, Any]) -> dict[str, Any]:
|
|
user = profile.get("uex_user")
|
|
if not isinstance(user, dict):
|
|
return profile
|
|
|
|
useful_user_fields = [
|
|
"id",
|
|
"name",
|
|
"username",
|
|
"avatar",
|
|
"bio",
|
|
"website_url",
|
|
"timezone",
|
|
"language",
|
|
"day_availability",
|
|
"time_availability",
|
|
"specializations",
|
|
"languages",
|
|
"archetypes",
|
|
"is_datarunner",
|
|
"is_staff",
|
|
"is_away_game",
|
|
"date_rsi_verified",
|
|
"date_twitch_verified",
|
|
]
|
|
prompt_profile = dict(profile)
|
|
prompt_profile["uex_user"] = {
|
|
key: user[key]
|
|
for key in useful_user_fields
|
|
if key in user and user[key] not in (None, "")
|
|
}
|
|
return prompt_profile
|
|
|
|
@staticmethod
|
|
def _extract_call(call: dict[str, Any]) -> tuple[str, dict[str, Any]]:
|
|
function = call.get("function") or {}
|
|
name = function.get("name") or call.get("name")
|
|
arguments = function.get("arguments") or call.get("arguments") or {}
|
|
if isinstance(arguments, str):
|
|
arguments = json.loads(arguments or "{}")
|
|
return name, arguments
|
|
|
|
@staticmethod
|
|
def _normalize_images(images: list[dict[str, Any]] | None) -> list[dict[str, Any]]:
|
|
normalized: list[dict[str, Any]] = []
|
|
for image in images or []:
|
|
if not isinstance(image, dict):
|
|
continue
|
|
image_data = str(image.get("image_data") or "").strip()
|
|
if not image_data:
|
|
continue
|
|
normalized.append(
|
|
{
|
|
"name": str(image.get("name") or "").strip() or "pasted-image.png",
|
|
"content_type": str(image.get("content_type") or "image/png").strip() or "image/png",
|
|
"image_data": image_data,
|
|
}
|
|
)
|
|
return normalized
|
|
|
|
@staticmethod
|
|
def _prompt_text(content: str, image_count: int) -> str:
|
|
text = content.strip()
|
|
if text:
|
|
return text
|
|
return "Please analyze the attached image." if image_count == 1 else "Please analyze the attached images."
|
|
|
|
@staticmethod
|
|
def _conversation_content(content: str, image_count: int) -> str:
|
|
text = content.strip()
|
|
if not image_count:
|
|
return text
|
|
note = f"[Attached {image_count} pasted image{'s' if image_count != 1 else ''}]"
|
|
return f"{text}\n\n{note}" if text else note
|
|
|
|
@staticmethod
|
|
def _user_message(content: str, images: list[dict[str, Any]]) -> dict[str, Any]:
|
|
message: dict[str, Any] = {"role": "user", "content": content}
|
|
if images:
|
|
message["images"] = [image["image_data"] for image in images]
|
|
message["image_content_types"] = [image["content_type"] for image in images]
|
|
return message
|
|
|
|
|
|
class OllamaUnavailable(RuntimeError):
|
|
pass
|