versioning: 0.0.6, ux: move buttons, feat: add cloud providers, feat: increese tool call limit
Build Release EXE / build-windows-exe (release) Successful in 51s

This commit is contained in:
2026-05-08 14:48:51 -04:00
parent a5a718b3e4
commit 6bd1e81a51
14 changed files with 1103 additions and 198 deletions
+458 -109
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
import json
import re
from collections.abc import AsyncIterator
from contextlib import nullcontext
from typing import Any
import httpx
@@ -38,6 +39,8 @@ class OllamaAgent:
memory: MemoryStore | None = None,
user_name: str | None = None,
num_ctx: int | None = None,
provider: str = "ollama",
api_key: str | None = None,
) -> None:
self.base_url = base_url.rstrip("/")
self.model = model
@@ -45,9 +48,13 @@ class OllamaAgent:
self.memory = memory
self.user_name = user_name
self.num_ctx = num_ctx
self.provider = provider.strip().casefold() or "ollama"
self.api_key = api_key
self.thread_messages: dict[str, list[dict[str, Any]]] = {}
async def health(self) -> dict[str, Any]:
if self.provider == "openai":
return await self._openai_health()
try:
async with httpx.AsyncClient(timeout=3) as client:
response = await client.get(f"{self.base_url}/api/tags")
@@ -77,60 +84,74 @@ class OllamaAgent:
if not health["online"]:
raise OllamaUnavailable(health["message"])
async def chat(self, content: str, thread_id: str | None = DEFAULT_THREAD_ID) -> dict[str, Any]:
async def chat(
self,
content: str,
thread_id: str | None = DEFAULT_THREAD_ID,
images: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
await self.ensure_available()
resolved_thread_id = self._thread_id(thread_id)
messages = self._messages_for_thread(resolved_thread_id)
previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None
normalized_images = self._normalize_images(images)
prompt_text = self._prompt_text(content, len(normalized_images))
memory_content = self._conversation_content(content, len(normalized_images))
if self.memory:
self.memory.add_conversation("user", content, resolved_thread_id)
await self._title_first_message(resolved_thread_id, content, previous_interaction)
messages.append({"role": "user", "content": content})
self.memory.add_conversation("user", memory_content, resolved_thread_id)
await self._title_first_message(resolved_thread_id, prompt_text, previous_interaction)
messages.append(self._user_message(prompt_text, normalized_images))
last_tool_results: list[dict[str, Any]] = []
for _ in range(5):
try:
response = await self._ollama_chat(
content,
messages,
previous_interaction=previous_interaction,
thread_id=resolved_thread_id,
)
except Exception as exc:
if not last_tool_results:
raise
answer = self._tool_result_fallback(
last_tool_results,
f"The local model stopped after the tool call: {exc}",
)
messages.append({"role": "assistant", "content": answer})
if self.memory:
self.memory.add_conversation("assistant", answer, resolved_thread_id)
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
message = response.get("message") or {}
tool_calls = message.get("tool_calls") or []
if not tool_calls:
answer = message.get("content", "")
if not answer.strip():
answer = self._empty_response_fallback(last_tool_results)
messages.append({"role": "assistant", "content": answer})
if self.memory:
self.memory.add_conversation("assistant", answer, resolved_thread_id)
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
messages.append(message)
for call in tool_calls:
name, arguments = self._extract_call(call)
result = await self.tools.execute(name, arguments)
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)})
image_scope = self.tools.chat_image_scope(normalized_images) if hasattr(self.tools, "chat_image_scope") else nullcontext()
with image_scope:
for _ in range(10):
try:
response = await self._chat_once(
prompt_text,
messages,
previous_interaction=previous_interaction,
thread_id=resolved_thread_id,
)
except Exception as exc:
if not last_tool_results:
raise
answer = self._tool_result_fallback(
last_tool_results,
f"The {self._provider_label()} stopped after the tool call: {exc}",
)
messages.append({"role": "assistant", "content": answer})
if self.memory:
self.memory.add_conversation("assistant", answer, resolved_thread_id)
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
message = response.get("message") or {}
tool_calls = message.get("tool_calls") or []
if not tool_calls:
answer = message.get("content", "")
if not answer.strip():
answer = self._empty_response_fallback(last_tool_results)
messages.append({"role": "assistant", "content": answer})
if self.memory:
self.memory.add_conversation("assistant", answer, resolved_thread_id)
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
messages.append(message)
for call in tool_calls:
name, arguments = self._extract_call(call)
result = await self.tools.execute(name, arguments)
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "tool_call_id": call.get("id"), "content": json.dumps(result)})
fallback = "I hit the tool-call limit while working on that. Try narrowing the request or approve any pending action first."
messages.append({"role": "assistant", "content": fallback})
if self.memory:
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
return {"message": fallback, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
async def chat_events(self, content: str, thread_id: str | None = DEFAULT_THREAD_ID) -> AsyncIterator[dict[str, Any]]:
async def chat_events(
self,
content: str,
thread_id: str | None = DEFAULT_THREAD_ID,
images: list[dict[str, Any]] | None = None,
) -> AsyncIterator[dict[str, Any]]:
health = await self.health()
if not health["online"]:
yield {"type": "warning", "message": health["message"]}
@@ -140,74 +161,77 @@ class OllamaAgent:
resolved_thread_id = self._thread_id(thread_id)
messages = self._messages_for_thread(resolved_thread_id)
previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None
normalized_images = self._normalize_images(images)
prompt_text = self._prompt_text(content, len(normalized_images))
memory_content = self._conversation_content(content, len(normalized_images))
if self.memory:
self.memory.add_conversation("user", content, resolved_thread_id)
await self._title_first_message(resolved_thread_id, content, previous_interaction)
messages.append({"role": "user", "content": content})
self.memory.add_conversation("user", memory_content, resolved_thread_id)
await self._title_first_message(resolved_thread_id, prompt_text, previous_interaction)
messages.append(self._user_message(prompt_text, normalized_images))
yield {"type": "status", "message": "Thinking"}
last_tool_results: list[dict[str, Any]] = []
image_scope = self.tools.chat_image_scope(normalized_images) if hasattr(self.tools, "chat_image_scope") else nullcontext()
with image_scope:
for _ in range(10):
assistant_message: dict[str, Any] = {"role": "assistant", "content": ""}
tool_calls: list[dict[str, Any]] = []
for _ in range(5):
assistant_message: dict[str, Any] = {"role": "assistant", "content": ""}
tool_calls: list[dict[str, Any]] = []
try:
async for event in self._ollama_chat_stream(
content,
messages,
previous_interaction=previous_interaction,
thread_id=resolved_thread_id,
):
message = event.get("message") or {}
chunk = message.get("content") or ""
if chunk:
assistant_message["content"] += chunk
yield {"type": "token", "content": chunk}
if message.get("tool_calls"):
tool_calls.extend(message["tool_calls"])
if event.get("done"):
metrics = self._stream_metrics(event)
if metrics:
yield {"type": "metrics", **metrics}
except Exception as exc:
if not last_tool_results:
yield {"type": "warning", "message": f"Chat failed before any tool result was available: {exc}"}
try:
async for event in self._chat_stream_once(
prompt_text,
messages,
previous_interaction=previous_interaction,
thread_id=resolved_thread_id,
):
message = event.get("message") or {}
chunk = message.get("content") or ""
if chunk:
assistant_message["content"] += chunk
yield {"type": "token", "content": chunk}
if message.get("tool_calls"):
tool_calls.extend(message["tool_calls"])
if event.get("done"):
metrics = self._stream_metrics(event)
if metrics:
yield {"type": "metrics", **metrics}
except Exception as exc:
if not last_tool_results:
yield {"type": "warning", "message": f"Chat failed before any tool result was available: {exc}"}
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
fallback = self._tool_result_fallback(
last_tool_results,
f"The {self._provider_label()} stopped after the tool call: {exc}",
)
assistant_message["content"] = fallback
messages.append(assistant_message)
if self.memory:
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
yield {"type": "token", "content": fallback}
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
fallback = self._tool_result_fallback(
last_tool_results,
f"The local model stopped after the tool call: {exc}",
)
assistant_message["content"] = fallback
if not tool_calls:
if not assistant_message.get("content", "").strip():
fallback = self._empty_response_fallback(last_tool_results)
assistant_message["content"] = fallback
yield {"type": "token", "content": fallback}
messages.append(assistant_message)
if self.memory:
self.memory.add_conversation("assistant", assistant_message.get("content", ""), resolved_thread_id)
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
assistant_message["tool_calls"] = tool_calls
messages.append(assistant_message)
if self.memory:
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
yield {"type": "token", "content": fallback}
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
if not tool_calls:
if not assistant_message.get("content", "").strip():
fallback = self._empty_response_fallback(last_tool_results)
assistant_message["content"] = fallback
yield {"type": "token", "content": fallback}
messages.append(assistant_message)
if self.memory:
self.memory.add_conversation("assistant", assistant_message.get("content", ""), resolved_thread_id)
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
assistant_message["tool_calls"] = tool_calls
messages.append(assistant_message)
for call in tool_calls:
name, arguments = self._extract_call(call)
yield {"type": "status", "message": self._tool_status(name)}
result = await self.tools.execute(name, arguments)
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)})
yield {"type": "status", "message": "Writing response"}
for call in tool_calls:
name, arguments = self._extract_call(call)
yield {"type": "status", "message": self._tool_status(name)}
result = await self.tools.execute(name, arguments)
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "tool_call_id": call.get("id"), "content": json.dumps(result)})
yield {"type": "status", "message": "Writing response"}
fallback = "I hit the tool-call limit while working on that. Try narrowing the request or approve any pending action first."
messages.append({"role": "assistant", "content": fallback})
if self.memory:
@@ -221,9 +245,9 @@ class OllamaAgent:
previous_interaction = self.memory.last_interaction("wake") if self.memory else None
messages.append({"role": "user", "content": wake_message})
last_tool_results: list[dict[str, Any]] = []
for _ in range(5):
for _ in range(10):
try:
response = await self._ollama_chat(
response = await self._chat_once(
wake_message,
messages,
previous_interaction=previous_interaction,
@@ -234,7 +258,7 @@ class OllamaAgent:
raise
content = self._tool_result_fallback(
last_tool_results,
f"The local model stopped after the wake-job tool call: {exc}",
f"The {self._provider_label()} stopped after the wake-job tool call: {exc}",
)
messages.append({"role": "assistant", "content": content})
if self.memory:
@@ -258,8 +282,7 @@ class OllamaAgent:
name, arguments = self._extract_call(call)
result = await self.tools.execute(name, arguments)
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)})
messages.append({"role": "tool", "tool_name": name, "tool_call_id": call.get("id"), "content": json.dumps(result)})
content = "I hit the tool-call limit while running this scheduled wake job. Check the job prompt or pending approvals."
messages.append({"role": "assistant", "content": content})
if self.memory:
@@ -267,6 +290,51 @@ class OllamaAgent:
self.memory.add_conversation("assistant", content, "wake")
return content
async def _chat_once(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> dict[str, Any]:
if self.provider == "openai":
return await self._openai_chat(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
)
return await self._ollama_chat(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
)
async def _chat_stream_once(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> AsyncIterator[dict[str, Any]]:
if self.provider == "openai":
async for event in self._openai_chat_stream(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
):
yield event
return
async for event in self._ollama_chat_stream(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
):
yield event
async def _ollama_chat(
self,
query: str = "",
@@ -322,6 +390,103 @@ class OllamaAgent:
if line:
yield json.loads(line)
async def _openai_chat(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=120) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=self._openai_headers(),
json={
"model": self.model,
"messages": self._openai_messages(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
),
"tools": self.tools.schemas,
"stream": False,
},
)
response.raise_for_status()
body = response.json()
choice = (body.get("choices") or [{}])[0]
message = choice.get("message") or {}
return {
"message": {
"role": message.get("role", "assistant"),
"content": message.get("content") or "",
"tool_calls": message.get("tool_calls") or [],
}
}
async def _openai_chat_stream(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> AsyncIterator[dict[str, Any]]:
tool_calls: dict[int, dict[str, Any]] = {}
async with httpx.AsyncClient(timeout=120) as client:
async with client.stream(
"POST",
f"{self.base_url}/chat/completions",
headers=self._openai_headers(),
json={
"model": self.model,
"messages": self._openai_messages(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
),
"tools": self.tools.schemas,
"stream": True,
},
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if not line or not line.startswith("data:"):
continue
payload = line.removeprefix("data:").strip()
if not payload:
continue
if payload == "[DONE]":
break
event = json.loads(payload)
choice = (event.get("choices") or [{}])[0]
delta = choice.get("delta") or {}
content = delta.get("content") or ""
if content:
yield {"message": {"role": "assistant", "content": content}}
for tool_call in delta.get("tool_calls") or []:
self._merge_openai_tool_call(tool_calls, tool_call)
finish_reason = choice.get("finish_reason")
if finish_reason:
yield {
"message": {
"role": "assistant",
"content": "",
"tool_calls": self._ordered_tool_calls(tool_calls),
},
"done": True,
}
return
yield {
"message": {
"role": "assistant",
"content": "",
"tool_calls": self._ordered_tool_calls(tool_calls),
},
"done": True,
}
def _messages_with_context(
self,
query: str,
@@ -329,21 +494,146 @@ class OllamaAgent:
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> list[dict[str, Any]]:
context = self._runtime_context(query, previous_interaction=previous_interaction, thread_id=thread_id)
attached_image_count = 0
for message in reversed(messages):
if message.get("role") != "user":
continue
attached_image_count = len(message.get("images") or [])
break
context = self._runtime_context(
query,
previous_interaction=previous_interaction,
thread_id=thread_id,
attached_image_count=attached_image_count,
)
if not context:
return messages
return [messages[0], {"role": "system", "content": context}, *messages[1:]]
async def _openai_health(self) -> dict[str, Any]:
if not self.api_key:
return {
"online": False,
"model": self.model,
"base_url": self.base_url,
"provider": "openai",
"model_available": False,
"models": [],
"message": "OpenAI is selected, but no OpenAI API key is configured.",
"detail": "",
}
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(f"{self.base_url}/models", headers=self._openai_headers())
response.raise_for_status()
body = response.json()
except (httpx.HTTPError, ValueError) as exc:
return {
"online": False,
"model": self.model,
"base_url": self.base_url,
"provider": "openai",
"model_available": False,
"models": [],
"message": f"OpenAI is unreachable at {self.base_url} or rejected the API key.",
"detail": str(exc),
}
models = sorted(item.get("id") for item in body.get("data", []) if item.get("id"))
return {
"online": True,
"model": self.model,
"base_url": self.base_url,
"provider": "openai",
"model_available": self.model in models,
"models": models,
"message": "OpenAI is online.",
}
def _openai_headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self.api_key or ''}",
"Content-Type": "application/json",
}
def _openai_messages(
self,
query: str,
messages: list[dict[str, Any]],
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for message in self._messages_with_context(
query,
messages,
previous_interaction=previous_interaction,
thread_id=thread_id,
):
role = message.get("role")
if role not in {"system", "user", "assistant", "tool"}:
continue
entry: dict[str, Any] = {"role": role, "content": message.get("content", "")}
if role == "user" and message.get("images"):
text_content = message.get("content", "")
content_parts: list[dict[str, Any]] = []
content_types = list(message.get("image_content_types") or [])
if text_content:
content_parts.append({"type": "text", "text": text_content})
for index, image_data in enumerate(message.get("images") or []):
content_type = content_types[index] if index < len(content_types) else "image/png"
content_parts.append(
{
"type": "image_url",
"image_url": {"url": f"data:{content_type};base64,{image_data}"},
}
)
entry["content"] = content_parts
if role == "assistant" and message.get("tool_calls"):
entry["tool_calls"] = message["tool_calls"]
if role == "tool":
entry["tool_call_id"] = message.get("tool_call_id") or message.get("tool_name") or "tool"
normalized.append(entry)
return normalized
def _provider_label(self) -> str:
return "OpenAI model" if self.provider == "openai" else "local model"
@staticmethod
def _merge_openai_tool_call(target: dict[int, dict[str, Any]], delta: dict[str, Any]) -> None:
index = int(delta.get("index") or 0)
current = target.setdefault(index, {"id": delta.get("id"), "type": "function", "function": {"name": "", "arguments": ""}})
if delta.get("id"):
current["id"] = delta["id"]
function = delta.get("function") or {}
current_function = current.setdefault("function", {"name": "", "arguments": ""})
if function.get("name"):
current_function["name"] += function["name"]
if function.get("arguments"):
current_function["arguments"] += function["arguments"]
@staticmethod
def _ordered_tool_calls(tool_calls: dict[int, dict[str, Any]]) -> list[dict[str, Any]]:
return [tool_calls[index] for index in sorted(tool_calls)]
def _runtime_context(
self,
query: str,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
attached_image_count: int = 0,
) -> str:
local_zone = get_localzone()
parts = [
f"Current local date/time: {iso_now()} UTC; {iso_now_in_zone(local_zone)} {local_zone}.",
]
if attached_image_count:
label = "image" if attached_image_count == 1 else "images"
parts.append(
f"Current user message includes {attached_image_count} pasted {label}. "
"You can inspect them visually. If the user wants one reused in a marketplace listing draft, "
"call draft_marketplace_listing or draft_marketplace_listing_with_cornerstone_image with "
"use_attached_image=true and attached_image_index when needed."
)
uex = getattr(self.tools, "uex", None)
if uex:
auth_methods = []
@@ -433,6 +723,24 @@ class OllamaAgent:
f"Message: {first_message[:800]}"
)
try:
if self.provider == "openai":
async with httpx.AsyncClient(timeout=20) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers=self._openai_headers(),
json={
"model": self.model,
"messages": [
{"role": "system", "content": "You write short chat titles."},
{"role": "user", "content": prompt},
],
"stream": False,
},
)
response.raise_for_status()
choice = (response.json().get("choices") or [{}])[0]
message = choice.get("message") or {}
return self._clean_generated_title(message.get("content", ""))
async with httpx.AsyncClient(timeout=20) as client:
response = await client.post(
f"{self.base_url}/api/chat",
@@ -489,10 +797,10 @@ class OllamaAgent:
@staticmethod
def _empty_response_fallback(tool_results: list[dict[str, Any]]) -> str:
if not tool_results:
return "I did not get a usable response from the local model. Please try again, or narrow the request a bit."
return "I did not get a usable response from the model. Please try again, or narrow the request a bit."
return OllamaAgent._tool_result_fallback(
tool_results,
"I completed the tool call, but the local model did not write a final answer.",
"I completed the tool call, but the model did not write a final answer.",
)
@staticmethod
@@ -633,6 +941,47 @@ class OllamaAgent:
arguments = json.loads(arguments or "{}")
return name, arguments
@staticmethod
def _normalize_images(images: list[dict[str, Any]] | None) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for image in images or []:
if not isinstance(image, dict):
continue
image_data = str(image.get("image_data") or "").strip()
if not image_data:
continue
normalized.append(
{
"name": str(image.get("name") or "").strip() or "pasted-image.png",
"content_type": str(image.get("content_type") or "image/png").strip() or "image/png",
"image_data": image_data,
}
)
return normalized
@staticmethod
def _prompt_text(content: str, image_count: int) -> str:
text = content.strip()
if text:
return text
return "Please analyze the attached image." if image_count == 1 else "Please analyze the attached images."
@staticmethod
def _conversation_content(content: str, image_count: int) -> str:
text = content.strip()
if not image_count:
return text
note = f"[Attached {image_count} pasted image{'s' if image_count != 1 else ''}]"
return f"{text}\n\n{note}" if text else note
@staticmethod
def _user_message(content: str, images: list[dict[str, Any]]) -> dict[str, Any]:
message: dict[str, Any] = {"role": "user", "content": content}
if images:
message["images"] = [image["image_data"] for image in images]
message["image_content_types"] = [image["content_type"] for image in images]
return message
class OllamaUnavailable(RuntimeError):
pass