from __future__ import annotations import json import re from collections.abc import AsyncIterator from typing import Any import httpx from tzlocal import get_localzone from traderai.memory import DEFAULT_THREAD_ID, MemoryStore, iso_now, iso_now_in_zone, time_since from traderai.tools import ToolRegistry SYSTEM_PROMPT = """You are TraderAI, a local assistant for UEX marketplace work. Use tools when the user asks about UEX data, open/current listings, active negotiations, unread notifications, messages, offers, or posting ads. UEX credentials are configured server-side when available. Never ask the user to provide UEX_SECRET_KEY or UEX_BEARER_TOKEN in chat; call the authenticated UEX tool and only mention credential configuration if the tool returns an authentication error. Use the specific UEX tool for the needed endpoint, such as get_uex_commodities_prices or get_uex_vehicles. Use fields, limit, and summary mode so tool results stay compact. When the user asks for history, trends, changes over time, or past prices, prefer the summarize_uex_*_history tools when available; use search_uex_api_index(history_only=true) if you need to discover history endpoints. Prefer open and current UEX marketplace information. Do not use historical sale data, completed sale records, or sale/average-history information unless the user explicitly asks for historical sales. Treat UEX marketplace prices as in-game aUEC/UEC credits, never real-world dollars, unless the user explicitly says otherwise. For marketplace writes, draft the exact pending action and tell the user what will be sent; never claim it was sent until approval succeeds. When a scheduled wake job fires, always write a concise Inbox-ready result that says what you checked, the key findings, and the suggested next action. Keep prices, listing ids, slugs, users, and UEX status codes precise. If data is missing, say what you need next.""" class OllamaAgent: def __init__( self, base_url: str, model: str, tools: ToolRegistry, memory: MemoryStore | None = None, user_name: str | None = None, num_ctx: int | None = None, ) -> None: self.base_url = base_url.rstrip("/") self.model = model self.tools = tools self.memory = memory self.user_name = user_name self.num_ctx = num_ctx self.thread_messages: dict[str, list[dict[str, Any]]] = {} async def health(self) -> dict[str, Any]: try: async with httpx.AsyncClient(timeout=3) as client: response = await client.get(f"{self.base_url}/api/tags") response.raise_for_status() body = response.json() except (httpx.HTTPError, ValueError) as exc: return { "online": False, "model": self.model, "base_url": self.base_url, "message": f"Ollama is offline or unreachable at {self.base_url}. Open the Ollama tab and use the recommended action.", "detail": str(exc), } models = [model.get("name") or model.get("model") for model in body.get("models", [])] return { "online": True, "model": self.model, "base_url": self.base_url, "model_available": self.model in models, "models": models, "message": "Ollama is online.", } async def ensure_available(self) -> None: health = await self.health() if not health["online"]: raise OllamaUnavailable(health["message"]) async def chat(self, content: str, thread_id: str | None = DEFAULT_THREAD_ID) -> dict[str, Any]: await self.ensure_available() resolved_thread_id = self._thread_id(thread_id) messages = self._messages_for_thread(resolved_thread_id) previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None if self.memory: self.memory.add_conversation("user", content, resolved_thread_id) await self._title_first_message(resolved_thread_id, content, previous_interaction) messages.append({"role": "user", "content": content}) last_tool_results: list[dict[str, Any]] = [] for _ in range(5): try: response = await self._ollama_chat( content, messages, previous_interaction=previous_interaction, thread_id=resolved_thread_id, ) except Exception as exc: if not last_tool_results: raise answer = self._tool_result_fallback( last_tool_results, f"The local model stopped after the tool call: {exc}", ) messages.append({"role": "assistant", "content": answer}) if self.memory: self.memory.add_conversation("assistant", answer, resolved_thread_id) return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id} message = response.get("message") or {} tool_calls = message.get("tool_calls") or [] if not tool_calls: answer = message.get("content", "") if not answer.strip(): answer = self._empty_response_fallback(last_tool_results) messages.append({"role": "assistant", "content": answer}) if self.memory: self.memory.add_conversation("assistant", answer, resolved_thread_id) return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id} messages.append(message) for call in tool_calls: name, arguments = self._extract_call(call) result = await self.tools.execute(name, arguments) last_tool_results.append({"tool": name, "result": result}) messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)}) fallback = "I hit the tool-call limit while working on that. Try narrowing the request or approve any pending action first." messages.append({"role": "assistant", "content": fallback}) if self.memory: self.memory.add_conversation("assistant", fallback, resolved_thread_id) return {"message": fallback, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id} async def chat_events(self, content: str, thread_id: str | None = DEFAULT_THREAD_ID) -> AsyncIterator[dict[str, Any]]: health = await self.health() if not health["online"]: yield {"type": "warning", "message": health["message"]} yield {"type": "done", "pending_actions": self._pending_payloads()} return resolved_thread_id = self._thread_id(thread_id) messages = self._messages_for_thread(resolved_thread_id) previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None if self.memory: self.memory.add_conversation("user", content, resolved_thread_id) await self._title_first_message(resolved_thread_id, content, previous_interaction) messages.append({"role": "user", "content": content}) yield {"type": "status", "message": "Thinking"} last_tool_results: list[dict[str, Any]] = [] for _ in range(5): assistant_message: dict[str, Any] = {"role": "assistant", "content": ""} tool_calls: list[dict[str, Any]] = [] try: async for event in self._ollama_chat_stream( content, messages, previous_interaction=previous_interaction, thread_id=resolved_thread_id, ): message = event.get("message") or {} chunk = message.get("content") or "" if chunk: assistant_message["content"] += chunk yield {"type": "token", "content": chunk} if message.get("tool_calls"): tool_calls.extend(message["tool_calls"]) if event.get("done"): metrics = self._stream_metrics(event) if metrics: yield {"type": "metrics", **metrics} except Exception as exc: if not last_tool_results: yield {"type": "warning", "message": f"Chat failed before any tool result was available: {exc}"} yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id} return fallback = self._tool_result_fallback( last_tool_results, f"The local model stopped after the tool call: {exc}", ) assistant_message["content"] = fallback messages.append(assistant_message) if self.memory: self.memory.add_conversation("assistant", fallback, resolved_thread_id) yield {"type": "token", "content": fallback} yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id} return if not tool_calls: if not assistant_message.get("content", "").strip(): fallback = self._empty_response_fallback(last_tool_results) assistant_message["content"] = fallback yield {"type": "token", "content": fallback} messages.append(assistant_message) if self.memory: self.memory.add_conversation("assistant", assistant_message.get("content", ""), resolved_thread_id) yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id} return assistant_message["tool_calls"] = tool_calls messages.append(assistant_message) for call in tool_calls: name, arguments = self._extract_call(call) yield {"type": "status", "message": self._tool_status(name)} result = await self.tools.execute(name, arguments) last_tool_results.append({"tool": name, "result": result}) messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)}) yield {"type": "status", "message": "Writing response"} fallback = "I hit the tool-call limit while working on that. Try narrowing the request or approve any pending action first." messages.append({"role": "assistant", "content": fallback}) if self.memory: self.memory.add_conversation("assistant", fallback, resolved_thread_id) yield {"type": "token", "content": fallback} yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id} async def generate_wake_response(self, wake_message: str) -> str: await self.ensure_available() messages = self._messages_for_thread("wake") previous_interaction = self.memory.last_interaction("wake") if self.memory else None messages.append({"role": "user", "content": wake_message}) last_tool_results: list[dict[str, Any]] = [] for _ in range(5): try: response = await self._ollama_chat( wake_message, messages, previous_interaction=previous_interaction, thread_id="wake", ) except Exception as exc: if not last_tool_results: raise content = self._tool_result_fallback( last_tool_results, f"The local model stopped after the wake-job tool call: {exc}", ) messages.append({"role": "assistant", "content": content}) if self.memory: self.memory.add_conversation("system", wake_message, "wake") self.memory.add_conversation("assistant", content, "wake") return content message = response.get("message") or {} tool_calls = message.get("tool_calls") or [] if not tool_calls: content = message.get("content", "") if not content.strip(): content = self._empty_response_fallback(last_tool_results) messages.append({"role": "assistant", "content": content}) if self.memory: self.memory.add_conversation("system", wake_message, "wake") self.memory.add_conversation("assistant", content, "wake") return content messages.append(message) for call in tool_calls: name, arguments = self._extract_call(call) result = await self.tools.execute(name, arguments) last_tool_results.append({"tool": name, "result": result}) messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)}) content = "I hit the tool-call limit while running this scheduled wake job. Check the job prompt or pending approvals." messages.append({"role": "assistant", "content": content}) if self.memory: self.memory.add_conversation("system", wake_message, "wake") self.memory.add_conversation("assistant", content, "wake") return content async def _ollama_chat( self, query: str = "", messages: list[dict[str, Any]] | None = None, previous_interaction: dict[str, Any] | None = None, thread_id: str | None = DEFAULT_THREAD_ID, ) -> dict[str, Any]: async with httpx.AsyncClient(timeout=120) as client: response = await client.post( f"{self.base_url}/api/chat", json={ "model": self.model, "messages": self._messages_with_context( query, messages or self._messages_for_thread(thread_id), previous_interaction=previous_interaction, thread_id=thread_id, ), "tools": self.tools.schemas, "options": self._ollama_options(), "stream": False, }, ) response.raise_for_status() return response.json() async def _ollama_chat_stream( self, query: str = "", messages: list[dict[str, Any]] | None = None, previous_interaction: dict[str, Any] | None = None, thread_id: str | None = DEFAULT_THREAD_ID, ) -> AsyncIterator[dict[str, Any]]: async with httpx.AsyncClient(timeout=120) as client: async with client.stream( "POST", f"{self.base_url}/api/chat", json={ "model": self.model, "messages": self._messages_with_context( query, messages or self._messages_for_thread(thread_id), previous_interaction=previous_interaction, thread_id=thread_id, ), "tools": self.tools.schemas, "options": self._ollama_options(), "stream": True, }, ) as response: response.raise_for_status() async for line in response.aiter_lines(): if line: yield json.loads(line) def _messages_with_context( self, query: str, messages: list[dict[str, Any]], previous_interaction: dict[str, Any] | None = None, thread_id: str | None = DEFAULT_THREAD_ID, ) -> list[dict[str, Any]]: context = self._runtime_context(query, previous_interaction=previous_interaction, thread_id=thread_id) if not context: return messages return [messages[0], {"role": "system", "content": context}, *messages[1:]] def _runtime_context( self, query: str, previous_interaction: dict[str, Any] | None = None, thread_id: str | None = DEFAULT_THREAD_ID, ) -> str: local_zone = get_localzone() parts = [ f"Current local date/time: {iso_now()} UTC; {iso_now_in_zone(local_zone)} {local_zone}.", ] uex = getattr(self.tools, "uex", None) if uex: auth_methods = [] if uex.secret_key: auth_methods.append("secret key") if uex.bearer_token: auth_methods.append("bearer token") if auth_methods: parts.append( "UEX API authentication is configured server-side with " + " and ".join(auth_methods) + "; use authenticated UEX tools directly and do not ask for tokens." ) else: parts.append("UEX API authentication is not configured server-side.") if self.user_name: parts.append(f"Known user name/handle: {self.user_name}.") if self.memory is None: return "\n".join(parts) profile = self.memory.get_profile() if profile: identity = self._profile_identity(profile) if identity: parts.append(identity) parts.append(f"Known user profile JSON: {json.dumps(self._profile_for_prompt(profile), ensure_ascii=True)}.") last = previous_interaction if previous_interaction is not None else self.memory.last_interaction(thread_id) if last: parts.append( f"Previous interaction before this message: {last['created_at']} " f"({time_since(last['created_at'])}, role {last['role']})." ) else: parts.append("Previous interaction before this message: none recorded.") memories = self.memory.recall(query, limit=6) if memories: memory_text = "\n".join( f"- [{item['kind']}, importance {item['importance']}] {item['content']}" for item in memories ) parts.append(f"Relevant long-term memories:\n{memory_text}") recent = self.memory.recent_conversation(limit=6, thread_id=thread_id) if recent: recent_text = "\n".join( f"- {item['created_at']} {item['role']}: {item['content'][:500]}" for item in recent ) parts.append(f"Recent conversation excerpts from this chat:\n{recent_text}") return "\n".join(parts) def _messages_for_thread(self, thread_id: str | None) -> list[dict[str, Any]]: resolved_thread_id = self._thread_id(thread_id) if resolved_thread_id not in self.thread_messages: messages: list[dict[str, Any]] = [{"role": "system", "content": SYSTEM_PROMPT}] if self.memory: self.memory.ensure_thread(resolved_thread_id) for item in self.memory.recent_conversation(limit=30, thread_id=resolved_thread_id): role = item.get("role") if role in {"user", "assistant"} and item.get("content"): messages.append({"role": role, "content": item["content"]}) self.thread_messages[resolved_thread_id] = messages return self.thread_messages[resolved_thread_id] async def _title_first_message( self, thread_id: str, first_message: str, previous_interaction: dict[str, Any] | None, ) -> None: if self.memory is None or previous_interaction is not None: return thread = self.memory.get_thread(thread_id) if not thread or thread.get("title") != "New chat": return title = await self._generate_chat_title(first_message) self.memory.rename_thread(thread_id, title or MemoryStore._thread_title(first_message)) async def _generate_chat_title(self, first_message: str) -> str: prompt = ( "Create a concise chat title for this first user message. " "Use 2 to 6 words. No quotes, no punctuation at the end, no preamble.\n\n" f"Message: {first_message[:800]}" ) try: async with httpx.AsyncClient(timeout=20) as client: response = await client.post( f"{self.base_url}/api/chat", json={ "model": self.model, "messages": [ {"role": "system", "content": "You write short chat titles."}, {"role": "user", "content": prompt}, ], "options": self._ollama_options(), "stream": False, }, ) response.raise_for_status() message = response.json().get("message") or {} return self._clean_generated_title(message.get("content", "")) except Exception: return "" @staticmethod def _thread_id(thread_id: str | None) -> str: return (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID @staticmethod def _clean_generated_title(title: str) -> str: text = re.sub(r"[\r\n]+", " ", title).strip().strip('"').strip("'") text = re.sub(r"^(title|chat title)\s*:\s*", "", text, flags=re.IGNORECASE).strip() text = text.rstrip(".!?;:-").strip() if not text: return "" words = text.split() if len(words) > 8: text = " ".join(words[:8]) return text[:64] def _pending_payloads(self) -> list[dict[str, Any]]: return [ { "id": action.id, "label": action.label, "method": action.method, "endpoint": action.endpoint, "payload": action.payload, } for action in self.tools.pending_actions.values() ] def _ollama_options(self) -> dict[str, Any]: if not self.num_ctx: return {} return {"num_ctx": self.num_ctx} @staticmethod def _empty_response_fallback(tool_results: list[dict[str, Any]]) -> str: if not tool_results: return "I did not get a usable response from the local model. Please try again, or narrow the request a bit." return OllamaAgent._tool_result_fallback( tool_results, "I completed the tool call, but the local model did not write a final answer.", ) @staticmethod def _tool_result_fallback(tool_results: list[dict[str, Any]], reason: str) -> str: last = tool_results[-1] text = json.dumps(last, indent=2, ensure_ascii=True) if len(text) > 1800: text = text[:1800] + "\n..." return ( f"{reason} " "Here is the last tool result so you are not left staring at a blank response:\n\n" f"```json\n{text}\n```" ) @staticmethod def _tool_status(name: str) -> str: if name.startswith("get_uex_"): return f"Fetching UEX {name.removeprefix('get_uex_')}" if name.startswith("draft_uex_"): return f"Drafting UEX {name.removeprefix('draft_uex_')} for approval" if name.startswith("delete_uex_"): return f"Drafting UEX {name.removeprefix('delete_uex_')} delete for approval" labels = { "search_uex_api_index": "Searching UEX API index", "summarize_uex_commodity_price_history": "Summarizing commodity price history", "summarize_uex_marketplace_price_history": "Summarizing marketplace price history", "summarize_uex_currency_index_history": "Summarizing currency index history", "uex_api_catalog": "Checking UEX API catalog", "uex_get": "Fetching UEX data", "uex_draft_post": "Drafting UEX write for approval", "uex_draft_delete": "Drafting UEX delete for approval", "search_marketplace_listings": "Searching UEX listings", "get_marketplace_listing": "Fetching listing details", "list_marketplace_negotiations": "Checking negotiations", "get_negotiation_messages": "Reading negotiation messages", "draft_negotiation_message": "Drafting message for approval", "draft_marketplace_listing": "Drafting listing for approval", "check_uex_notifications": "Checking UEX notifications", } return labels.get(name, f"Running {name}") @staticmethod def _stream_metrics(event: dict[str, Any]) -> dict[str, Any]: prompt_tokens = int(event.get("prompt_eval_count") or 0) prompt_duration = int(event.get("prompt_eval_duration") or 0) output_tokens = int(event.get("eval_count") or 0) output_duration = int(event.get("eval_duration") or 0) def rate(tokens: int, duration_ns: int) -> float | None: if not tokens or not duration_ns: return None return tokens / (duration_ns / 1_000_000_000) return { "reading_tokens": prompt_tokens, "reading_tokens_per_second": rate(prompt_tokens, prompt_duration), "writing_tokens": output_tokens, "writing_tokens_per_second": rate(output_tokens, output_duration), } @staticmethod def _profile_identity(profile: dict[str, Any]) -> str: user = profile.get("uex_user") if not isinstance(user, dict): configured = profile.get("configured_name") return f"You are speaking with {configured}." if configured else "" username = user.get("username") or user.get("user_username") name = user.get("name") fields = [] if username and name and username != name: fields.append(f"You are speaking with UEX user {username} ({name}).") elif username or name: fields.append(f"You are speaking with UEX user {username or name}.") details = [] for key, label in [ ("timezone", "timezone"), ("language", "preferred language"), ("specializations", "specializations"), ("languages", "languages"), ("archetypes", "archetypes"), ]: value = user.get(key) if value: details.append(f"{label}: {value}") if details: fields.append("UEX profile details: " + "; ".join(details) + ".") return " ".join(fields) @staticmethod def _profile_for_prompt(profile: dict[str, Any]) -> dict[str, Any]: user = profile.get("uex_user") if not isinstance(user, dict): return profile useful_user_fields = [ "id", "name", "username", "avatar", "bio", "website_url", "timezone", "language", "day_availability", "time_availability", "specializations", "languages", "archetypes", "is_datarunner", "is_staff", "is_away_game", "date_rsi_verified", "date_twitch_verified", ] prompt_profile = dict(profile) prompt_profile["uex_user"] = { key: user[key] for key in useful_user_fields if key in user and user[key] not in (None, "") } return prompt_profile @staticmethod def _extract_call(call: dict[str, Any]) -> tuple[str, dict[str, Any]]: function = call.get("function") or {} name = function.get("name") or call.get("name") arguments = function.get("arguments") or call.get("arguments") or {} if isinstance(arguments, str): arguments = json.loads(arguments or "{}") return name, arguments class OllamaUnavailable(RuntimeError): pass