from __future__ import annotations import json from dataclasses import dataclass from datetime import datetime, timezone from typing import Any from urllib.parse import urlparse from traderai.memory import MemoryStore, iso_now, unix_to_iso from traderai.uex_client import UEXClient UEX_NEGOTIATION_CLOSE_ENDPOINT = "marketplace_negotiations_close" def extract_negotiation_hash(redir: str | None) -> str | None: if not redir: return None parsed = urlparse(redir) path = parsed.path or str(redir) cleaned = path.strip("/") parts = cleaned.split("/") for index, part in enumerate(parts): if part == "hash" and index + 1 < len(parts): return parts[index + 1].strip() or None if len(parts) >= 3 and parts[-3:-1] == ["marketplace", "negotiations"]: return parts[-1].strip() or None return None @dataclass class NegotiationRefreshResult: hash: str refreshed: bool summary: dict[str, Any] | None = None messages_count: int = 0 class NegotiationSyncService: def __init__(self, memory: MemoryStore, uex: UEXClient) -> None: self.memory = memory self.uex = uex self.recent_days = 30 async def startup_sync(self) -> dict[str, Any]: return await self.refresh_negotiations(seed_open_messages=True) async def refresh_negotiations(self, *, seed_open_messages: bool = False) -> dict[str, Any]: response = await self.uex.list_negotiations() negotiations = response.get("negotiations") or response.get("data") or [] kept_hashes: list[str] = [] refreshed = 0 for item in negotiations: normalized = self._normalize_negotiation_summary(item) if not normalized: continue cached = self.memory.get_negotiation(normalized["negotiation_hash"]) if not self._should_keep_thread(normalized, cached): continue kept_hashes.append(normalized["negotiation_hash"]) self.memory.upsert_negotiation(**normalized) if seed_open_messages and (normalized["status"] == "open" or cached is None): result = await self.refresh_negotiation(normalized["negotiation_hash"], mark_read=False, summary=normalized) if result.refreshed: refreshed += 1 self.memory.set_negotiation_sync_state("last_full_negotiation_sync_at", iso_now()) return { "count": len(kept_hashes), "refreshed_threads": refreshed, "negotiations": self.memory.list_negotiations(limit=200), } async def refresh_negotiation( self, negotiation_hash: str, *, mark_read: bool = False, summary: dict[str, Any] | None = None, ) -> NegotiationRefreshResult: summary_data = summary or await self._fetch_summary_by_hash(negotiation_hash) if summary_data: self.memory.upsert_negotiation(**summary_data) response = await self.uex.get_negotiation_messages(hash=negotiation_hash) messages = response.get("messages") or response.get("data") or [] normalized_messages = [self._normalize_message(item) for item in messages if isinstance(item, dict)] normalized_messages = [item for item in normalized_messages if item] self.memory.replace_negotiation_messages(negotiation_hash, normalized_messages, mark_read=mark_read) if mark_read: self.memory.mark_negotiation_read(negotiation_hash) return NegotiationRefreshResult( hash=negotiation_hash, refreshed=True, summary=self.memory.get_negotiation(negotiation_hash), messages_count=len(normalized_messages), ) async def handle_notifications(self, notifications: list[dict[str, Any]]) -> list[dict[str, Any]]: if not notifications: return [] grouped: dict[str, list[dict[str, Any]]] = {} passthrough: list[dict[str, Any]] = [] for item in notifications: negotiation_hash = extract_negotiation_hash(item.get("redir")) if not negotiation_hash: passthrough.append(item) continue grouped.setdefault(negotiation_hash, []).append(item) for negotiation_hash, items in grouped.items(): latest = max(items, key=lambda item: int(item.get("date_added") or 0)) await self.refresh_negotiation(negotiation_hash, mark_read=False) self.memory.mark_negotiation_notified( negotiation_hash, notification_id=self._int_or_none(latest.get("id")), notification_at=unix_to_iso(latest.get("date_added")) or iso_now(), ) for item in passthrough: self.memory.add_outbox(self._notification_text(item)) self.memory.set_negotiation_sync_state("last_notification_sync_at", iso_now()) self.memory.set_negotiation_sync_state( "last_seen_notification_ids", sorted(self._int_or_none(item.get("id")) for item in notifications if self._int_or_none(item.get("id")) is not None), ) return notifications async def manual_send_message(self, negotiation_hash: str, message: str) -> dict[str, Any]: result = await self.uex.send_negotiation_message(hash=negotiation_hash, message=message, is_production=1) await self.refresh_negotiation(negotiation_hash, mark_read=True) return result async def manual_close_negotiation(self, negotiation_hash: str, payload: dict[str, Any]) -> dict[str, Any]: result = await self.uex.close_negotiation(hash=negotiation_hash, **payload) await self.refresh_negotiation(negotiation_hash, mark_read=True) self.memory.store_negotiation_rating(negotiation_hash, payload, raw_json=result) return result def list_negotiations(self, *, status: str = "all", unread_only: bool = False, search: str = "", limit: int = 50) -> list[dict[str, Any]]: return self.memory.list_negotiations(status=status, unread_only=unread_only, search=search, limit=limit) def unread_count(self) -> int: return sum(int(item.get("unread_count") or 0) for item in self.memory.list_negotiations(unread_only=True, limit=500)) def get_negotiation(self, negotiation_hash: str, *, mark_read: bool = True) -> dict[str, Any] | None: negotiation = self.memory.get_negotiation(negotiation_hash) if negotiation and mark_read: self.memory.mark_negotiation_read(negotiation_hash) negotiation["unread_count"] = 0 return negotiation def search_messages(self, query: str, limit: int = 8) -> list[dict[str, Any]]: return self.memory.search_negotiation_messages(query, limit=limit) async def _fetch_summary_by_hash(self, negotiation_hash: str) -> dict[str, Any] | None: response = await self.uex.list_negotiations(hash=negotiation_hash) negotiations = response.get("negotiations") or response.get("data") or [] for item in negotiations: normalized = self._normalize_negotiation_summary(item) if normalized and normalized["negotiation_hash"] == negotiation_hash: return normalized return None def _normalize_negotiation_summary(self, item: dict[str, Any]) -> dict[str, Any] | None: negotiation_hash = str(item.get("hash") or item.get("negotiation_hash") or "").strip() if not negotiation_hash: return None user = self.memory.get_profile().get("uex_user") or {} current_username = str(user.get("username") or user.get("user_username") or "").strip().casefold() advertiser_username = str(item.get("advertiser_username") or "").strip() client_username = str(item.get("client_username") or "").strip() is_listing_advertiser = bool(item.get("is_listing_advertiser")) if current_username: if advertiser_username.casefold() == current_username: counterparty = client_username elif client_username.casefold() == current_username: counterparty = advertiser_username else: counterparty = client_username if is_listing_advertiser else advertiser_username else: counterparty = client_username if is_listing_advertiser else advertiser_username closed_at = unix_to_iso(item.get("date_closed") or item.get("date_closed_client")) metadata = { "advertiser_name": item.get("advertiser_name"), "advertiser_username": advertiser_username or None, "client_name": item.get("client_name"), "client_username": client_username or None, "deal_value": item.get("deal_value"), "deal_value_currency": item.get("deal_value_currency"), "price": item.get("price"), "unit": item.get("unit"), "currency": item.get("currency"), "raw": item, } return { "negotiation_hash": negotiation_hash, "uex_negotiation_id": self._int_or_none(item.get("id") or item.get("id_negotiation")), "listing_id": self._int_or_none(item.get("id_listing")), "listing_slug": str(item.get("listing_slug") or "").strip() or None, "title": str(item.get("listing_title") or item.get("title") or "").strip() or None, "counterparty_username": counterparty or None, "status": "closed" if closed_at else "open", "last_message_at": unix_to_iso(item.get("date_modified") or item.get("date_added")), "last_synced_at": iso_now(), "closed_at": closed_at, "metadata": metadata, } def _normalize_message(self, item: dict[str, Any]) -> dict[str, Any] | None: negotiation_hash = str(item.get("negotiation_hash") or "").strip() if not negotiation_hash: return None user = self.memory.get_profile().get("uex_user") or {} current_username = str(user.get("username") or user.get("user_username") or "").strip().casefold() username = str(item.get("user_username") or "").strip() normalized = dict(item) normalized["is_me"] = bool(current_username and username.casefold() == current_username) normalized["author"] = item.get("user_name") or username or "UEX" normalized["source"] = item.get("api_name") or "uex" normalized["body"] = item.get("message") or item.get("event") or "" return normalized def _should_keep_thread(self, normalized: dict[str, Any], cached: dict[str, Any] | None) -> bool: if cached: return True if normalized["status"] == "open": return True last_message_at = normalized.get("last_message_at") if not last_message_at: return False try: age_seconds = max(0.0, (datetime.now(timezone.utc) - datetime.fromisoformat(last_message_at)).total_seconds()) except ValueError: return False return age_seconds <= self.recent_days * 24 * 60 * 60 @staticmethod def _notification_text(item: dict[str, Any]) -> str: message = item.get("message") or "You have a pending UEX notification." redir = item.get("redir") return f"UEX notification: {message}" + (f" (path `{redir}`)" if redir else "") @staticmethod def _int_or_none(value: Any) -> int | None: try: return int(value) except (TypeError, ValueError): return None