feat: chat sidebar and inbox, feat: saved chats, fix: wake jobs, fix: sandbox sends, ux: negotiation replies and draft box

This commit is contained in:
2026-05-06 22:53:19 -04:00
parent 58a57ddc6a
commit 3b6e3c34d5
18 changed files with 1797 additions and 105 deletions
+271 -58
View File
@@ -1,13 +1,14 @@
from __future__ import annotations
import json
import re
from collections.abc import AsyncIterator
from typing import Any
import httpx
from tzlocal import get_localzone
from traderai.memory import MemoryStore, iso_now, iso_now_in_zone, time_since
from traderai.memory import DEFAULT_THREAD_ID, MemoryStore, iso_now, iso_now_in_zone, time_since
from traderai.tools import ToolRegistry
@@ -19,6 +20,7 @@ When the user asks for history, trends, changes over time, or past prices, prefe
Prefer open and current UEX marketplace information. Do not use historical sale data, completed sale records, or sale/average-history information unless the user explicitly asks for historical sales.
Treat UEX marketplace prices as in-game aUEC/UEC credits, never real-world dollars, unless the user explicitly says otherwise.
For marketplace writes, draft the exact pending action and tell the user what will be sent; never claim it was sent until approval succeeds.
When a scheduled wake job fires, always write a concise Inbox-ready result that says what you checked, the key findings, and the suggested next action.
Keep prices, listing ids, slugs, users, and UEX status codes precise. If data is missing, say what you need next."""
@@ -38,7 +40,7 @@ class OllamaAgent:
self.memory = memory
self.user_name = user_name
self.num_ctx = num_ctx
self.messages: list[dict[str, Any]] = [{"role": "system", "content": SYSTEM_PROMPT}]
self.thread_messages: dict[str, list[dict[str, Any]]] = {}
async def health(self) -> dict[str, Any]:
try:
@@ -70,107 +72,214 @@ class OllamaAgent:
if not health["online"]:
raise OllamaUnavailable(health["message"])
async def chat(self, content: str) -> dict[str, Any]:
async def chat(self, content: str, thread_id: str | None = DEFAULT_THREAD_ID) -> dict[str, Any]:
await self.ensure_available()
previous_interaction = self.memory.last_interaction() if self.memory else None
resolved_thread_id = self._thread_id(thread_id)
messages = self._messages_for_thread(resolved_thread_id)
previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None
if self.memory:
self.memory.add_conversation("user", content)
self.messages.append({"role": "user", "content": content})
self.memory.add_conversation("user", content, resolved_thread_id)
await self._title_first_message(resolved_thread_id, content, previous_interaction)
messages.append({"role": "user", "content": content})
last_tool_results: list[dict[str, Any]] = []
for _ in range(5):
response = await self._ollama_chat(content, previous_interaction=previous_interaction)
try:
response = await self._ollama_chat(
content,
messages,
previous_interaction=previous_interaction,
thread_id=resolved_thread_id,
)
except Exception as exc:
if not last_tool_results:
raise
answer = self._tool_result_fallback(
last_tool_results,
f"The local model stopped after the tool call: {exc}",
)
messages.append({"role": "assistant", "content": answer})
if self.memory:
self.memory.add_conversation("assistant", answer, resolved_thread_id)
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
message = response.get("message") or {}
tool_calls = message.get("tool_calls") or []
if not tool_calls:
self.messages.append({"role": "assistant", "content": message.get("content", "")})
answer = message.get("content", "")
if not answer.strip():
answer = self._empty_response_fallback(last_tool_results)
messages.append({"role": "assistant", "content": answer})
if self.memory:
self.memory.add_conversation("assistant", message.get("content", ""))
return {"message": message.get("content", ""), "pending_actions": self._pending_payloads()}
self.memory.add_conversation("assistant", answer, resolved_thread_id)
return {"message": answer, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
self.messages.append(message)
messages.append(message)
for call in tool_calls:
name, arguments = self._extract_call(call)
result = await self.tools.execute(name, arguments)
self.messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)})
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)})
fallback = "I hit the tool-call limit while working on that. Try narrowing the request or approve any pending action first."
self.messages.append({"role": "assistant", "content": fallback})
messages.append({"role": "assistant", "content": fallback})
if self.memory:
self.memory.add_conversation("assistant", fallback)
return {"message": fallback, "pending_actions": self._pending_payloads()}
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
return {"message": fallback, "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
async def chat_events(self, content: str) -> AsyncIterator[dict[str, Any]]:
async def chat_events(self, content: str, thread_id: str | None = DEFAULT_THREAD_ID) -> AsyncIterator[dict[str, Any]]:
health = await self.health()
if not health["online"]:
yield {"type": "warning", "message": health["message"]}
yield {"type": "done", "pending_actions": self._pending_payloads()}
return
previous_interaction = self.memory.last_interaction() if self.memory else None
resolved_thread_id = self._thread_id(thread_id)
messages = self._messages_for_thread(resolved_thread_id)
previous_interaction = self.memory.last_interaction(resolved_thread_id) if self.memory else None
if self.memory:
self.memory.add_conversation("user", content)
self.messages.append({"role": "user", "content": content})
self.memory.add_conversation("user", content, resolved_thread_id)
await self._title_first_message(resolved_thread_id, content, previous_interaction)
messages.append({"role": "user", "content": content})
yield {"type": "status", "message": "Thinking"}
last_tool_results: list[dict[str, Any]] = []
for _ in range(5):
assistant_message: dict[str, Any] = {"role": "assistant", "content": ""}
tool_calls: list[dict[str, Any]] = []
async for event in self._ollama_chat_stream(content, previous_interaction=previous_interaction):
message = event.get("message") or {}
chunk = message.get("content") or ""
if chunk:
assistant_message["content"] += chunk
yield {"type": "token", "content": chunk}
if message.get("tool_calls"):
tool_calls.extend(message["tool_calls"])
if event.get("done"):
metrics = self._stream_metrics(event)
if metrics:
yield {"type": "metrics", **metrics}
try:
async for event in self._ollama_chat_stream(
content,
messages,
previous_interaction=previous_interaction,
thread_id=resolved_thread_id,
):
message = event.get("message") or {}
chunk = message.get("content") or ""
if chunk:
assistant_message["content"] += chunk
yield {"type": "token", "content": chunk}
if message.get("tool_calls"):
tool_calls.extend(message["tool_calls"])
if event.get("done"):
metrics = self._stream_metrics(event)
if metrics:
yield {"type": "metrics", **metrics}
except Exception as exc:
if not last_tool_results:
yield {"type": "warning", "message": f"Chat failed before any tool result was available: {exc}"}
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
fallback = self._tool_result_fallback(
last_tool_results,
f"The local model stopped after the tool call: {exc}",
)
assistant_message["content"] = fallback
messages.append(assistant_message)
if self.memory:
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
yield {"type": "token", "content": fallback}
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
if not tool_calls:
self.messages.append(assistant_message)
if not assistant_message.get("content", "").strip():
fallback = self._empty_response_fallback(last_tool_results)
assistant_message["content"] = fallback
yield {"type": "token", "content": fallback}
messages.append(assistant_message)
if self.memory:
self.memory.add_conversation("assistant", assistant_message.get("content", ""))
yield {"type": "done", "pending_actions": self._pending_payloads()}
self.memory.add_conversation("assistant", assistant_message.get("content", ""), resolved_thread_id)
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
return
assistant_message["tool_calls"] = tool_calls
self.messages.append(assistant_message)
messages.append(assistant_message)
for call in tool_calls:
name, arguments = self._extract_call(call)
yield {"type": "status", "message": self._tool_status(name)}
result = await self.tools.execute(name, arguments)
self.messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)})
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)})
yield {"type": "status", "message": "Writing response"}
fallback = "I hit the tool-call limit while working on that. Try narrowing the request or approve any pending action first."
self.messages.append({"role": "assistant", "content": fallback})
messages.append({"role": "assistant", "content": fallback})
if self.memory:
self.memory.add_conversation("assistant", fallback)
self.memory.add_conversation("assistant", fallback, resolved_thread_id)
yield {"type": "token", "content": fallback}
yield {"type": "done", "pending_actions": self._pending_payloads()}
yield {"type": "done", "pending_actions": self._pending_payloads(), "thread_id": resolved_thread_id}
async def generate_wake_response(self, wake_message: str) -> str:
await self.ensure_available()
self.messages.append({"role": "user", "content": wake_message})
response = await self._ollama_chat(wake_message)
message = response.get("message") or {}
content = message.get("content", "")
self.messages.append({"role": "assistant", "content": content})
if self.memory:
self.memory.add_conversation("system", wake_message)
self.memory.add_conversation("assistant", content)
return content or wake_message
messages = self._messages_for_thread("wake")
previous_interaction = self.memory.last_interaction("wake") if self.memory else None
messages.append({"role": "user", "content": wake_message})
last_tool_results: list[dict[str, Any]] = []
for _ in range(5):
try:
response = await self._ollama_chat(
wake_message,
messages,
previous_interaction=previous_interaction,
thread_id="wake",
)
except Exception as exc:
if not last_tool_results:
raise
content = self._tool_result_fallback(
last_tool_results,
f"The local model stopped after the wake-job tool call: {exc}",
)
messages.append({"role": "assistant", "content": content})
if self.memory:
self.memory.add_conversation("system", wake_message, "wake")
self.memory.add_conversation("assistant", content, "wake")
return content
message = response.get("message") or {}
tool_calls = message.get("tool_calls") or []
if not tool_calls:
content = message.get("content", "")
if not content.strip():
content = self._empty_response_fallback(last_tool_results)
messages.append({"role": "assistant", "content": content})
if self.memory:
self.memory.add_conversation("system", wake_message, "wake")
self.memory.add_conversation("assistant", content, "wake")
return content
async def _ollama_chat(self, query: str = "", previous_interaction: dict[str, Any] | None = None) -> dict[str, Any]:
messages.append(message)
for call in tool_calls:
name, arguments = self._extract_call(call)
result = await self.tools.execute(name, arguments)
last_tool_results.append({"tool": name, "result": result})
messages.append({"role": "tool", "tool_name": name, "content": json.dumps(result)})
content = "I hit the tool-call limit while running this scheduled wake job. Check the job prompt or pending approvals."
messages.append({"role": "assistant", "content": content})
if self.memory:
self.memory.add_conversation("system", wake_message, "wake")
self.memory.add_conversation("assistant", content, "wake")
return content
async def _ollama_chat(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=120) as client:
response = await client.post(
f"{self.base_url}/api/chat",
json={
"model": self.model,
"messages": self._messages_with_context(query, previous_interaction=previous_interaction),
"messages": self._messages_with_context(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
),
"tools": self.tools.schemas,
"options": self._ollama_options(),
"stream": False,
@@ -182,7 +291,9 @@ class OllamaAgent:
async def _ollama_chat_stream(
self,
query: str = "",
messages: list[dict[str, Any]] | None = None,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> AsyncIterator[dict[str, Any]]:
async with httpx.AsyncClient(timeout=120) as client:
async with client.stream(
@@ -190,7 +301,12 @@ class OllamaAgent:
f"{self.base_url}/api/chat",
json={
"model": self.model,
"messages": self._messages_with_context(query, previous_interaction=previous_interaction),
"messages": self._messages_with_context(
query,
messages or self._messages_for_thread(thread_id),
previous_interaction=previous_interaction,
thread_id=thread_id,
),
"tools": self.tools.schemas,
"options": self._ollama_options(),
"stream": True,
@@ -204,14 +320,21 @@ class OllamaAgent:
def _messages_with_context(
self,
query: str,
messages: list[dict[str, Any]],
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> list[dict[str, Any]]:
context = self._runtime_context(query, previous_interaction=previous_interaction)
context = self._runtime_context(query, previous_interaction=previous_interaction, thread_id=thread_id)
if not context:
return self.messages
return [self.messages[0], {"role": "system", "content": context}, *self.messages[1:]]
return messages
return [messages[0], {"role": "system", "content": context}, *messages[1:]]
def _runtime_context(self, query: str, previous_interaction: dict[str, Any] | None = None) -> str:
def _runtime_context(
self,
query: str,
previous_interaction: dict[str, Any] | None = None,
thread_id: str | None = DEFAULT_THREAD_ID,
) -> str:
local_zone = get_localzone()
parts = [
f"Current local date/time: {iso_now()} UTC; {iso_now_in_zone(local_zone)} {local_zone}.",
@@ -244,7 +367,7 @@ class OllamaAgent:
parts.append(identity)
parts.append(f"Known user profile JSON: {json.dumps(self._profile_for_prompt(profile), ensure_ascii=True)}.")
last = previous_interaction if previous_interaction is not None else self.memory.last_interaction()
last = previous_interaction if previous_interaction is not None else self.memory.last_interaction(thread_id)
if last:
parts.append(
f"Previous interaction before this message: {last['created_at']} "
@@ -261,16 +384,85 @@ class OllamaAgent:
)
parts.append(f"Relevant long-term memories:\n{memory_text}")
recent = self.memory.recent_conversation(limit=6)
recent = self.memory.recent_conversation(limit=6, thread_id=thread_id)
if recent:
recent_text = "\n".join(
f"- {item['created_at']} {item['role']}: {item['content'][:500]}"
for item in recent
)
parts.append(f"Recent conversation excerpts:\n{recent_text}")
parts.append(f"Recent conversation excerpts from this chat:\n{recent_text}")
return "\n".join(parts)
def _messages_for_thread(self, thread_id: str | None) -> list[dict[str, Any]]:
resolved_thread_id = self._thread_id(thread_id)
if resolved_thread_id not in self.thread_messages:
messages: list[dict[str, Any]] = [{"role": "system", "content": SYSTEM_PROMPT}]
if self.memory:
self.memory.ensure_thread(resolved_thread_id)
for item in self.memory.recent_conversation(limit=30, thread_id=resolved_thread_id):
role = item.get("role")
if role in {"user", "assistant"} and item.get("content"):
messages.append({"role": role, "content": item["content"]})
self.thread_messages[resolved_thread_id] = messages
return self.thread_messages[resolved_thread_id]
async def _title_first_message(
self,
thread_id: str,
first_message: str,
previous_interaction: dict[str, Any] | None,
) -> None:
if self.memory is None or previous_interaction is not None:
return
thread = self.memory.get_thread(thread_id)
if not thread or thread.get("title") != "New chat":
return
title = await self._generate_chat_title(first_message)
self.memory.rename_thread(thread_id, title or MemoryStore._thread_title(first_message))
async def _generate_chat_title(self, first_message: str) -> str:
prompt = (
"Create a concise chat title for this first user message. "
"Use 2 to 6 words. No quotes, no punctuation at the end, no preamble.\n\n"
f"Message: {first_message[:800]}"
)
try:
async with httpx.AsyncClient(timeout=20) as client:
response = await client.post(
f"{self.base_url}/api/chat",
json={
"model": self.model,
"messages": [
{"role": "system", "content": "You write short chat titles."},
{"role": "user", "content": prompt},
],
"options": self._ollama_options(),
"stream": False,
},
)
response.raise_for_status()
message = response.json().get("message") or {}
return self._clean_generated_title(message.get("content", ""))
except Exception:
return ""
@staticmethod
def _thread_id(thread_id: str | None) -> str:
return (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID
@staticmethod
def _clean_generated_title(title: str) -> str:
text = re.sub(r"[\r\n]+", " ", title).strip().strip('"').strip("'")
text = re.sub(r"^(title|chat title)\s*:\s*", "", text, flags=re.IGNORECASE).strip()
text = text.rstrip(".!?;:-").strip()
if not text:
return ""
words = text.split()
if len(words) > 8:
text = " ".join(words[:8])
return text[:64]
def _pending_payloads(self) -> list[dict[str, Any]]:
return [
{
@@ -288,6 +480,27 @@ class OllamaAgent:
return {}
return {"num_ctx": self.num_ctx}
@staticmethod
def _empty_response_fallback(tool_results: list[dict[str, Any]]) -> str:
if not tool_results:
return "I did not get a usable response from the local model. Please try again, or narrow the request a bit."
return OllamaAgent._tool_result_fallback(
tool_results,
"I completed the tool call, but the local model did not write a final answer.",
)
@staticmethod
def _tool_result_fallback(tool_results: list[dict[str, Any]], reason: str) -> str:
last = tool_results[-1]
text = json.dumps(last, indent=2, ensure_ascii=True)
if len(text) > 1800:
text = text[:1800] + "\n..."
return (
f"{reason} "
"Here is the last tool result so you are not left staring at a blank response:\n\n"
f"```json\n{text}\n```"
)
@staticmethod
def _tool_status(name: str) -> str:
if name.startswith("get_uex_"):
+195 -16
View File
@@ -8,6 +8,9 @@ from typing import Any
from zoneinfo import ZoneInfo
DEFAULT_THREAD_ID = "default"
def utc_now() -> datetime:
return datetime.now(timezone.utc)
@@ -65,8 +68,16 @@ class MemoryStore:
with self._connect() as db:
db.executescript(
"""
CREATE TABLE IF NOT EXISTS chat_threads (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
thread_id TEXT,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL
@@ -129,27 +140,152 @@ class MemoryStore:
);
"""
)
def add_conversation(self, role: str, content: str) -> None:
with self._connect() as db:
self._ensure_column(db, "conversations", "thread_id", "TEXT")
now = iso_now()
db.execute(
"INSERT INTO conversations(role, content, created_at) VALUES (?, ?, ?)",
(role, content, iso_now()),
"""
INSERT INTO chat_threads(id, title, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING
""",
(DEFAULT_THREAD_ID, "New chat", now, now),
)
db.execute(
"UPDATE conversations SET thread_id = ? WHERE thread_id IS NULL OR thread_id = ''",
(DEFAULT_THREAD_ID,),
)
def last_interaction(self) -> dict[str, Any] | None:
@staticmethod
def _ensure_column(db: sqlite3.Connection, table: str, column: str, definition: str) -> None:
columns = {row["name"] for row in db.execute(f"PRAGMA table_info({table})").fetchall()}
if column not in columns:
db.execute(f"ALTER TABLE {table} ADD COLUMN {column} {definition}")
def ensure_thread(self, thread_id: str | None = None, title: str | None = None) -> dict[str, Any]:
now = iso_now()
resolved_id = (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID
resolved_title = (title or "New chat").strip() or "New chat"
with self._connect() as db:
db.execute(
"""
INSERT INTO chat_threads(id, title, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET updated_at=excluded.updated_at
""",
(resolved_id, resolved_title, now, now),
)
row = db.execute(
"SELECT role, content, created_at FROM conversations ORDER BY id DESC LIMIT 1"
"SELECT id, title, created_at, updated_at FROM chat_threads WHERE id = ?",
(resolved_id,),
).fetchone()
return dict(row)
def create_thread(self, title: str | None = None) -> dict[str, Any]:
import uuid
thread_id = f"chat-{uuid.uuid4()}"
return self.ensure_thread(thread_id, title or "New chat")
def list_threads(self) -> list[dict[str, Any]]:
with self._connect() as db:
rows = db.execute(
"""
SELECT
t.id,
t.title,
t.created_at,
t.updated_at,
COUNT(c.id) AS message_count,
MAX(c.created_at) AS last_message_at
FROM chat_threads t
LEFT JOIN conversations c ON c.thread_id = t.id
GROUP BY t.id
ORDER BY COALESCE(MAX(c.created_at), t.updated_at) DESC
"""
).fetchall()
return [dict(row) for row in rows]
def delete_thread(self, thread_id: str) -> bool:
with self._connect() as db:
db.execute("DELETE FROM conversations WHERE thread_id = ?", (thread_id,))
cursor = db.execute("DELETE FROM chat_threads WHERE id = ?", (thread_id,))
return cursor.rowcount > 0
def rename_thread(self, thread_id: str, title: str) -> dict[str, Any] | None:
clean_title = self._clean_thread_title(title)
if not clean_title:
return None
now = iso_now()
with self._connect() as db:
db.execute(
"UPDATE chat_threads SET title = ?, updated_at = ? WHERE id = ?",
(clean_title, now, thread_id),
)
row = db.execute(
"SELECT id, title, created_at, updated_at FROM chat_threads WHERE id = ?",
(thread_id,),
).fetchone()
return dict(row) if row else None
def recent_conversation(self, limit: int = 8) -> list[dict[str, Any]]:
def get_thread(self, thread_id: str) -> dict[str, Any] | None:
with self._connect() as db:
rows = db.execute(
"SELECT role, content, created_at FROM conversations ORDER BY id DESC LIMIT ?",
(limit,),
).fetchall()
row = db.execute(
"SELECT id, title, created_at, updated_at FROM chat_threads WHERE id = ?",
(thread_id,),
).fetchone()
return dict(row) if row else None
def add_conversation(self, role: str, content: str, thread_id: str | None = DEFAULT_THREAD_ID) -> None:
resolved_thread_id = (thread_id or DEFAULT_THREAD_ID).strip() or DEFAULT_THREAD_ID
self.ensure_thread(resolved_thread_id)
now = iso_now()
with self._connect() as db:
db.execute(
"INSERT INTO conversations(thread_id, role, content, created_at) VALUES (?, ?, ?, ?)",
(resolved_thread_id, role, content, now),
)
db.execute(
"UPDATE chat_threads SET updated_at = ? WHERE id = ?",
(now, resolved_thread_id),
)
def last_interaction(self, thread_id: str | None = None) -> dict[str, Any] | None:
with self._connect() as db:
if thread_id:
row = db.execute(
"""
SELECT thread_id, role, content, created_at
FROM conversations
WHERE thread_id = ?
ORDER BY id DESC
LIMIT 1
""",
(thread_id,),
).fetchone()
else:
row = db.execute(
"SELECT thread_id, role, content, created_at FROM conversations ORDER BY id DESC LIMIT 1"
).fetchone()
return dict(row) if row else None
def recent_conversation(self, limit: int = 8, thread_id: str | None = None) -> list[dict[str, Any]]:
with self._connect() as db:
if thread_id:
rows = db.execute(
"""
SELECT id, thread_id, role, content, created_at
FROM conversations
WHERE thread_id = ?
ORDER BY id DESC
LIMIT ?
""",
(thread_id, limit),
).fetchall()
else:
rows = db.execute(
"SELECT id, thread_id, role, content, created_at FROM conversations ORDER BY id DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(row) for row in reversed(rows)]
def remember(self, kind: str, content: str, importance: int = 3, metadata: dict[str, Any] | None = None) -> dict[str, Any]:
@@ -222,13 +358,22 @@ class MemoryStore:
).fetchall()
conversations = db.execute(
"""
SELECT id, role, content, created_at
SELECT id, thread_id, role, content, created_at
FROM conversations
ORDER BY id DESC
LIMIT ?
""",
(limit,),
).fetchall()
threads = db.execute(
"""
SELECT id, title, created_at, updated_at
FROM chat_threads
ORDER BY updated_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
profile_rows = db.execute(
"SELECT key, value, updated_at FROM user_profile ORDER BY key"
).fetchall()
@@ -252,6 +397,7 @@ class MemoryStore:
return {
"path": str(self.path),
"memories": [self._memory_row(row) for row in memories],
"chat_threads": [dict(row) for row in threads],
"conversations": [dict(row) for row in conversations],
"profile": profile,
"scheduled_jobs": [dict(row) for row in jobs],
@@ -339,17 +485,38 @@ class MemoryStore:
).fetchall()
return [dict(row) for row in rows]
def mark_job_run(self, job_id: str, next_run_at: str | None = None) -> None:
def mark_job_run(self, job_id: str, next_run_at: str | None = None, enabled: bool = True) -> None:
with self._connect() as db:
db.execute(
"UPDATE scheduled_jobs SET last_run_at = ?, next_run_at = ? WHERE id = ?",
(iso_now(), next_run_at, job_id),
"UPDATE scheduled_jobs SET last_run_at = ?, next_run_at = ?, enabled = ? WHERE id = ?",
(iso_now(), next_run_at, 1 if enabled else 0, job_id),
)
def add_outbox(self, content: str) -> None:
with self._connect() as db:
db.execute("INSERT INTO outbox(content, created_at) VALUES (?, ?)", (content, iso_now()))
def list_outbox(self, limit: int = 100) -> list[dict[str, Any]]:
with self._connect() as db:
rows = db.execute(
"SELECT id, content, created_at, delivered_at FROM outbox ORDER BY id DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(row) for row in rows]
def get_outbox(self, inbox_id: int) -> dict[str, Any] | None:
with self._connect() as db:
row = db.execute(
"SELECT id, content, created_at, delivered_at FROM outbox WHERE id = ?",
(inbox_id,),
).fetchone()
return dict(row) if row else None
def delete_outbox(self, inbox_id: int) -> bool:
with self._connect() as db:
cursor = db.execute("DELETE FROM outbox WHERE id = ?", (inbox_id,))
return cursor.rowcount > 0
def undelivered_outbox(self) -> list[dict[str, Any]]:
now = iso_now()
with self._connect() as db:
@@ -362,6 +529,18 @@ class MemoryStore:
)
return [dict(row) for row in rows]
@staticmethod
def _thread_title(content: str) -> str:
text = " ".join(content.strip().split())
if not text:
return "New chat"
return text[:42] + ("..." if len(text) > 42 else "")
@staticmethod
def _clean_thread_title(title: str) -> str:
text = " ".join(title.strip().strip('"').strip("'").split())
return text[:64]
@staticmethod
def _fts_query(query: str) -> str:
tokens = [token.replace('"', "") for token in query.split() if token.strip()]
+17 -3
View File
@@ -83,11 +83,20 @@ class WakeScheduler:
)
if self.agent is None:
self.memory.add_outbox(wake_message)
self._mark_job_finished(job_id)
return
text = await self.agent.generate_wake_response(wake_message)
try:
text = await self.agent.generate_wake_response(wake_message)
except Exception as exc:
text = f"Wake job failed: {exc}. Job instruction: {prompt}"
self.memory.add_outbox(text)
self.memory.mark_job_run(job_id)
self._mark_job_finished(job_id)
def _mark_job_finished(self, job_id: str) -> None:
job = self.scheduler.get_job(job_id)
next_run = job.next_run_time if job else None
self.memory.mark_job_run(job_id, next_run.isoformat() if next_run else None, enabled=bool(next_run))
def _schedule_notification_poll(self) -> None:
if self.uex is None:
@@ -104,7 +113,12 @@ class WakeScheduler:
if self.uex is None:
return []
response = await self.uex.get_user_notifications()
try:
response = await self.uex.get_user_notifications()
except Exception as exc:
self.memory.add_outbox(f"UEX notification poll failed: {exc}")
self.memory.set_profile("uex_last_notification_error", str(exc))
return []
notifications = response.get("notifications") or []
pending = [item for item in notifications if not item.get("date_read")]
profile = self.memory.get_profile()
+79 -3
View File
@@ -21,7 +21,7 @@ from pydantic import BaseModel
from traderai.agent import OllamaAgent, OllamaUnavailable
from traderai.config import save_settings, settings_payload
from traderai.config import get_settings
from traderai.memory import MemoryStore
from traderai.memory import DEFAULT_THREAD_ID, MemoryStore
from traderai.scheduler import WakeScheduler
from traderai.tools import ToolRegistry
from traderai.uex_client import UEXClient
@@ -35,6 +35,19 @@ def resource_path(*parts: str) -> Path:
class ChatRequest(BaseModel):
message: str
thread_id: str | None = DEFAULT_THREAD_ID
class ChatThreadRequest(BaseModel):
title: str | None = None
class RenameChatThreadRequest(BaseModel):
title: str
class DirectNegotiationMessageRequest(BaseModel):
message: str
class ClearMemoryRequest(BaseModel):
@@ -246,18 +259,43 @@ def create_app() -> FastAPI:
@app.post("/api/chat")
async def chat(request: ChatRequest) -> dict:
try:
return await agent.chat(request.message)
return await agent.chat(request.message, thread_id=request.thread_id)
except OllamaUnavailable as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest) -> StreamingResponse:
async def events():
async for event in agent.chat_events(request.message):
async for event in agent.chat_events(request.message, thread_id=request.thread_id):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(events(), media_type="text/event-stream")
@app.get("/api/chats")
async def chats() -> dict:
return {"chats": memory.list_threads()}
@app.post("/api/chats")
async def create_chat(request: ChatThreadRequest) -> dict:
return {"chat": memory.create_thread(request.title)}
@app.get("/api/chats/{thread_id}/messages")
async def chat_messages(thread_id: str) -> dict:
memory.ensure_thread(thread_id)
return {"thread_id": thread_id, "messages": memory.recent_conversation(limit=200, thread_id=thread_id)}
@app.delete("/api/chats/{thread_id}")
async def delete_chat(thread_id: str) -> dict:
deleted = memory.delete_thread(thread_id)
return {"deleted": deleted, "chats": memory.list_threads()}
@app.patch("/api/chats/{thread_id}")
async def rename_chat(thread_id: str, request: RenameChatThreadRequest) -> dict:
chat = memory.rename_thread(thread_id, request.title)
if not chat:
raise HTTPException(status_code=400, detail="A non-empty chat title is required.")
return {"chat": chat, "chats": memory.list_threads()}
@app.get("/api/pending-actions")
async def pending_actions() -> dict:
return {"pending_actions": agent._pending_payloads()}
@@ -266,6 +304,35 @@ def create_app() -> FastAPI:
async def notifications() -> dict:
return {"notifications": memory.undelivered_outbox()}
@app.get("/api/inbox")
async def inbox() -> dict:
return {"inbox": memory.list_outbox()}
@app.post("/api/inbox/{inbox_id}/continue")
async def continue_inbox(inbox_id: int) -> dict:
item = memory.get_outbox(inbox_id)
if not item:
raise HTTPException(status_code=404, detail="Inbox item not found.")
thread = memory.create_thread("Inbox follow-up")
memory.add_conversation("assistant", item["content"], thread["id"])
return {"chat": thread, "message": item}
@app.delete("/api/inbox/{inbox_id}")
async def delete_inbox(inbox_id: int) -> dict:
deleted = memory.delete_outbox(inbox_id)
return {"deleted": deleted, "inbox": memory.list_outbox()}
@app.get("/api/negotiations/{identifier}/messages")
async def negotiation_messages(identifier: str) -> dict:
params = negotiation_identifier_params(identifier)
return await uex.get("marketplace_negotiations_messages", params, authenticated=True)
@app.post("/api/negotiations/{identifier}/messages")
async def send_negotiation_message(identifier: str, request: DirectNegotiationMessageRequest) -> dict:
params = negotiation_identifier_params(identifier)
payload = {**params, "message": request.message, "is_production": 1}
return await uex.post("marketplace_negotiations_messages", payload, authenticated=True)
@app.get("/api/wake-jobs")
async def wake_jobs() -> dict:
return {"scheduled_jobs": scheduler.list_jobs()}
@@ -300,6 +367,15 @@ def create_app() -> FastAPI:
return app
def negotiation_identifier_params(identifier: str) -> dict[str, Any]:
value = identifier.strip()
if not value:
raise HTTPException(status_code=400, detail="Negotiation id or hash is required.")
if value.isdigit():
return {"id_negotiation": int(value)}
return {"hash": value}
async def inspect_ollama() -> dict[str, Any]:
settings = get_settings()
executable = find_ollama_executable()
+80 -5
View File
@@ -129,9 +129,15 @@ UEX_DELETE_RESOURCES = {
UEX_RESOURCE_DESCRIPTIONS = {
"commodities_prices_history": "Historical commodity prices at a terminal. Requires id_terminal and id_commodity; accepts game_version. UEX limits this to 500 rows.",
"marketplace_prices_history": "Historical marketplace price snapshots, one row per listing per price change. Requires at least one filter; supports date_start/date_end and up to 1000 records.",
"marketplace_trends": "Current UEX marketplace trend metrics for an item. Use this when the user asks for trends, price movement, demand, or what the market is doing now.",
"currencies_index_history": "Historical UEX currency index snapshots with basket component detail. Supports currency, date_from, and date_to timestamps.",
}
UEX_PRODUCTION_WRITE_RESOURCES = {
"marketplace_advertise",
"marketplace_negotiations_messages",
}
@dataclass
class PendingAction:
@@ -266,7 +272,7 @@ class ToolRegistry:
"message": {"type": "string"},
"hash": {"type": "string"},
"id_negotiation": {"type": "integer"},
"is_production": {"type": "integer", "enum": [0, 1], "default": 0},
"is_production": {"type": "integer", "enum": [0, 1], "default": 1},
},
},
},
@@ -298,7 +304,7 @@ class ToolRegistry:
"in_stock": {"type": "integer"},
"hours_expiration": {"type": "integer"},
"is_hidden": {"type": "integer", "enum": [0, 1]},
"is_production": {"type": "integer", "enum": [0, 1], "default": 0},
"is_production": {"type": "integer", "enum": [0, 1], "default": 1},
},
},
},
@@ -382,7 +388,7 @@ class ToolRegistry:
return {"error": f"Pending action not found: {action_id}"}
if action.method == "DELETE":
return await self.uex.delete(action.endpoint, action.payload, authenticated=True)
return await self.uex.post(action.endpoint, action.payload, authenticated=True)
return await self.uex.post(action.endpoint, self._production_payload(action.endpoint, action.payload), authenticated=True)
async def decline(self, action_id: str) -> dict[str, Any]:
action = self.pending_actions.pop(action_id, None)
@@ -915,7 +921,13 @@ class ToolRegistry:
id_listing: int | None = None,
hash: str | None = None,
) -> dict[str, Any]:
return await self.uex.get("marketplace_negotiations", {"id": id, "id_listing": id_listing, "hash": hash}, authenticated=True)
response = await self.uex.get("marketplace_negotiations", {"id": id, "id_listing": id_listing, "hash": hash}, authenticated=True)
negotiations = [
self._summarize_negotiation(item)
for item in self._as_list(response.get("data"))
if isinstance(item, dict)
]
return {**response, "data": negotiations, "negotiations": negotiations}
async def get_negotiation_messages(self, hash: str | None = None, id_negotiation: int | None = None) -> dict[str, Any]:
return await self.uex.get("marketplace_negotiations_messages", {"hash": hash, "id_negotiation": id_negotiation}, authenticated=True)
@@ -925,7 +937,7 @@ class ToolRegistry:
message: str,
hash: str | None = None,
id_negotiation: int | None = None,
is_production: int = 0,
is_production: int = 1,
) -> dict[str, Any]:
payload = {"message": message, "hash": hash, "id_negotiation": id_negotiation, "is_production": is_production}
return self._pending("Send negotiation message", "marketplace_negotiations_messages", payload)
@@ -971,6 +983,7 @@ class ToolRegistry:
def _pending(self, label: str, endpoint: str, payload: dict[str, Any], method: str = "POST") -> dict[str, Any]:
action_id = str(uuid.uuid4())
payload = {key: value for key, value in payload.items() if value is not None}
payload = self._production_payload(endpoint, payload)
self.pending_actions[action_id] = PendingAction(action_id, label, endpoint, payload, method)
return {
"pending_action": {
@@ -983,6 +996,14 @@ class ToolRegistry:
}
}
@staticmethod
def _production_payload(endpoint: str, payload: dict[str, Any]) -> dict[str, Any]:
if endpoint not in UEX_PRODUCTION_WRITE_RESOURCES:
return payload
next_payload = dict(payload)
next_payload["is_production"] = 1
return next_payload
@staticmethod
def _validate_resource(resource: str, allowed: dict[str, Any] | set[str]) -> str:
normalized = resource.strip().strip("/").casefold()
@@ -1167,3 +1188,57 @@ class ToolRegistry:
"advertiser": listing.get("user_username"),
"expires_at": listing.get("date_expiration"),
}
@classmethod
def _summarize_negotiation(cls, negotiation: dict[str, Any]) -> dict[str, Any]:
summary = cls._project_item(negotiation, mode="summary")
state = cls._negotiation_state(negotiation)
summary.update(
{
"state": state["state"],
"is_open": state["is_open"],
"state_reason": state["reason"],
}
)
for key in ("hash", "id_listing", "id_user", "id_user_seller", "id_user_buyer", "date_closed"):
if key in negotiation and key not in summary:
summary[key] = negotiation.get(key)
return summary
@staticmethod
def _negotiation_state(negotiation: dict[str, Any]) -> dict[str, Any]:
closed_flags = [
"is_closed",
"closed",
"is_cancelled",
"is_canceled",
"is_archived",
"marked_closed",
]
for key in closed_flags:
value = negotiation.get(key)
if value in (True, 1, "1", "true", "True", "yes", "closed"):
return {"state": "closed", "is_open": False, "reason": f"{key} is set"}
closed_dates = [
"date_closed",
"date_completed",
"date_cancelled",
"date_canceled",
"closed_at",
"completed_at",
"cancelled_at",
"canceled_at",
]
for key in closed_dates:
value = negotiation.get(key)
if value not in (None, "", 0, "0", False):
return {"state": "closed", "is_open": False, "reason": f"{key} is populated"}
status = str(negotiation.get("status") or negotiation.get("state") or "").casefold()
if status in {"closed", "cancelled", "canceled", "completed", "declined", "accepted", "rejected"}:
return {"state": "closed", "is_open": False, "reason": f"status is {status}"}
if status in {"open", "active", "pending", "new"}:
return {"state": "open", "is_open": True, "reason": f"status is {status}"}
return {"state": "open", "is_open": True, "reason": "no closed flag, closed date, or closed status was present"}
+2 -1
View File
@@ -1,6 +1,6 @@
from __future__ import annotations
__version__ = "0.0.2"
__version__ = "0.0.3"
RELEASES_URL = "https://git.hudsonriggs.systems/LambdaBankingConglomerate/TraderAI/releases"
RELEASES_API_URL = "https://git.hudsonriggs.systems/api/v1/repos/LambdaBankingConglomerate/TraderAI/releases"
@@ -8,3 +8,4 @@ RELEASES_API_URL = "https://git.hudsonriggs.systems/api/v1/repos/LambdaBankingCo