Files
2026-06-09 11:24:15 -04:00

249 lines
11 KiB
Python

from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from urllib.parse import urlparse
from traderai.memory import MemoryStore, iso_now, unix_to_iso
from traderai.uex_client import UEXClient
UEX_NEGOTIATION_CLOSE_ENDPOINT = "marketplace_negotiations_close"
def extract_negotiation_hash(redir: str | None) -> str | None:
if not redir:
return None
parsed = urlparse(redir)
path = parsed.path or str(redir)
cleaned = path.strip("/")
parts = cleaned.split("/")
for index, part in enumerate(parts):
if part == "hash" and index + 1 < len(parts):
return parts[index + 1].strip() or None
if len(parts) >= 3 and parts[-3:-1] == ["marketplace", "negotiations"]:
return parts[-1].strip() or None
return None
@dataclass
class NegotiationRefreshResult:
hash: str
refreshed: bool
summary: dict[str, Any] | None = None
messages_count: int = 0
class NegotiationSyncService:
def __init__(self, memory: MemoryStore, uex: UEXClient) -> None:
self.memory = memory
self.uex = uex
self.recent_days = 30
async def startup_sync(self) -> dict[str, Any]:
return await self.refresh_negotiations(seed_open_messages=True)
async def refresh_negotiations(self, *, seed_open_messages: bool = False) -> dict[str, Any]:
response = await self.uex.list_negotiations()
negotiations = response.get("negotiations") or response.get("data") or []
kept_hashes: list[str] = []
refreshed = 0
for item in negotiations:
normalized = self._normalize_negotiation_summary(item)
if not normalized:
continue
cached = self.memory.get_negotiation(normalized["negotiation_hash"])
if not self._should_keep_thread(normalized, cached):
continue
kept_hashes.append(normalized["negotiation_hash"])
self.memory.upsert_negotiation(**normalized)
if seed_open_messages and (normalized["status"] == "open" or cached is None):
result = await self.refresh_negotiation(normalized["negotiation_hash"], mark_read=False, summary=normalized)
if result.refreshed:
refreshed += 1
self.memory.set_negotiation_sync_state("last_full_negotiation_sync_at", iso_now())
return {
"count": len(kept_hashes),
"refreshed_threads": refreshed,
"negotiations": self.memory.list_negotiations(limit=200),
}
async def refresh_negotiation(
self,
negotiation_hash: str,
*,
mark_read: bool = False,
summary: dict[str, Any] | None = None,
) -> NegotiationRefreshResult:
summary_data = summary or await self._fetch_summary_by_hash(negotiation_hash)
if summary_data:
self.memory.upsert_negotiation(**summary_data)
response = await self.uex.get_negotiation_messages(hash=negotiation_hash)
messages = response.get("messages") or response.get("data") or []
normalized_messages = [self._normalize_message(item) for item in messages if isinstance(item, dict)]
normalized_messages = [item for item in normalized_messages if item]
self.memory.replace_negotiation_messages(negotiation_hash, normalized_messages, mark_read=mark_read)
if mark_read:
self.memory.mark_negotiation_read(negotiation_hash)
return NegotiationRefreshResult(
hash=negotiation_hash,
refreshed=True,
summary=self.memory.get_negotiation(negotiation_hash),
messages_count=len(normalized_messages),
)
async def handle_notifications(self, notifications: list[dict[str, Any]]) -> list[dict[str, Any]]:
if not notifications:
return []
grouped: dict[str, list[dict[str, Any]]] = {}
passthrough: list[dict[str, Any]] = []
for item in notifications:
negotiation_hash = extract_negotiation_hash(item.get("redir"))
if not negotiation_hash:
passthrough.append(item)
continue
grouped.setdefault(negotiation_hash, []).append(item)
for negotiation_hash, items in grouped.items():
latest = max(items, key=lambda item: int(item.get("date_added") or 0))
await self.refresh_negotiation(negotiation_hash, mark_read=False)
self.memory.mark_negotiation_notified(
negotiation_hash,
notification_id=self._int_or_none(latest.get("id")),
notification_at=unix_to_iso(latest.get("date_added")) or iso_now(),
)
for item in passthrough:
self.memory.add_outbox(self._notification_text(item))
self.memory.set_negotiation_sync_state("last_notification_sync_at", iso_now())
self.memory.set_negotiation_sync_state(
"last_seen_notification_ids",
sorted(self._int_or_none(item.get("id")) for item in notifications if self._int_or_none(item.get("id")) is not None),
)
return notifications
async def manual_send_message(self, negotiation_hash: str, message: str) -> dict[str, Any]:
result = await self.uex.send_negotiation_message(hash=negotiation_hash, message=message, is_production=1)
await self.refresh_negotiation(negotiation_hash, mark_read=True)
return result
async def manual_close_negotiation(self, negotiation_hash: str, payload: dict[str, Any]) -> dict[str, Any]:
result = await self.uex.close_negotiation(hash=negotiation_hash, **payload)
await self.refresh_negotiation(negotiation_hash, mark_read=True)
self.memory.store_negotiation_rating(negotiation_hash, payload, raw_json=result)
return result
def list_negotiations(self, *, status: str = "all", unread_only: bool = False, search: str = "", limit: int = 50) -> list[dict[str, Any]]:
return self.memory.list_negotiations(status=status, unread_only=unread_only, search=search, limit=limit)
def unread_count(self) -> int:
return sum(int(item.get("unread_count") or 0) for item in self.memory.list_negotiations(unread_only=True, limit=500))
def get_negotiation(self, negotiation_hash: str, *, mark_read: bool = True) -> dict[str, Any] | None:
negotiation = self.memory.get_negotiation(negotiation_hash)
if negotiation and mark_read:
self.memory.mark_negotiation_read(negotiation_hash)
negotiation["unread_count"] = 0
return negotiation
def search_messages(self, query: str, limit: int = 8) -> list[dict[str, Any]]:
return self.memory.search_negotiation_messages(query, limit=limit)
async def _fetch_summary_by_hash(self, negotiation_hash: str) -> dict[str, Any] | None:
response = await self.uex.list_negotiations(hash=negotiation_hash)
negotiations = response.get("negotiations") or response.get("data") or []
for item in negotiations:
normalized = self._normalize_negotiation_summary(item)
if normalized and normalized["negotiation_hash"] == negotiation_hash:
return normalized
return None
def _normalize_negotiation_summary(self, item: dict[str, Any]) -> dict[str, Any] | None:
negotiation_hash = str(item.get("hash") or item.get("negotiation_hash") or "").strip()
if not negotiation_hash:
return None
user = self.memory.get_profile().get("uex_user") or {}
current_username = str(user.get("username") or user.get("user_username") or "").strip().casefold()
advertiser_username = str(item.get("advertiser_username") or "").strip()
client_username = str(item.get("client_username") or "").strip()
is_listing_advertiser = bool(item.get("is_listing_advertiser"))
if current_username:
if advertiser_username.casefold() == current_username:
counterparty = client_username
elif client_username.casefold() == current_username:
counterparty = advertiser_username
else:
counterparty = client_username if is_listing_advertiser else advertiser_username
else:
counterparty = client_username if is_listing_advertiser else advertiser_username
closed_at = unix_to_iso(item.get("date_closed") or item.get("date_closed_client"))
metadata = {
"advertiser_name": item.get("advertiser_name"),
"advertiser_username": advertiser_username or None,
"client_name": item.get("client_name"),
"client_username": client_username or None,
"deal_value": item.get("deal_value"),
"deal_value_currency": item.get("deal_value_currency"),
"price": item.get("price"),
"unit": item.get("unit"),
"currency": item.get("currency"),
"raw": item,
}
return {
"negotiation_hash": negotiation_hash,
"uex_negotiation_id": self._int_or_none(item.get("id") or item.get("id_negotiation")),
"listing_id": self._int_or_none(item.get("id_listing")),
"listing_slug": str(item.get("listing_slug") or "").strip() or None,
"title": str(item.get("listing_title") or item.get("title") or "").strip() or None,
"counterparty_username": counterparty or None,
"status": "closed" if closed_at else "open",
"last_message_at": unix_to_iso(item.get("date_modified") or item.get("date_added")),
"last_synced_at": iso_now(),
"closed_at": closed_at,
"metadata": metadata,
}
def _normalize_message(self, item: dict[str, Any]) -> dict[str, Any] | None:
negotiation_hash = str(item.get("negotiation_hash") or "").strip()
if not negotiation_hash:
return None
user = self.memory.get_profile().get("uex_user") or {}
current_username = str(user.get("username") or user.get("user_username") or "").strip().casefold()
username = str(item.get("user_username") or "").strip()
normalized = dict(item)
normalized["is_me"] = bool(current_username and username.casefold() == current_username)
normalized["author"] = item.get("user_name") or username or "UEX"
normalized["source"] = item.get("api_name") or "uex"
normalized["body"] = item.get("message") or item.get("event") or ""
return normalized
def _should_keep_thread(self, normalized: dict[str, Any], cached: dict[str, Any] | None) -> bool:
if cached:
return True
if normalized["status"] == "open":
return True
last_message_at = normalized.get("last_message_at")
if not last_message_at:
return False
try:
age_seconds = max(0.0, (datetime.now(timezone.utc) - datetime.fromisoformat(last_message_at)).total_seconds())
except ValueError:
return False
return age_seconds <= self.recent_days * 24 * 60 * 60
@staticmethod
def _notification_text(item: dict[str, Any]) -> str:
message = item.get("message") or "You have a pending UEX notification."
redir = item.get("redir")
return f"UEX notification: {message}" + (f" (path `{redir}`)" if redir else "")
@staticmethod
def _int_or_none(value: Any) -> int | None:
try:
return int(value)
except (TypeError, ValueError):
return None