Files
2026-06-09 11:24:15 -04:00

174 lines
6.5 KiB
Python

from __future__ import annotations
from typing import Any
import httpx
class UEXError(RuntimeError):
pass
class UEXClient:
def __init__(
self,
base_url: str,
secret_key: str | None = None,
bearer_token: str | None = None,
negotiation_close_endpoint: str = "marketplace_negotiations_close",
) -> None:
self.base_url = base_url.rstrip("/")
self.secret_key = secret_key
self.bearer_token = bearer_token
self.negotiation_close_endpoint = negotiation_close_endpoint.strip().strip("/") or "marketplace_negotiations_close"
def _headers(self, authenticated: bool = False) -> dict[str, str]:
headers = {"Accept": "application/json"}
if authenticated:
if not self.secret_key and not self.bearer_token:
raise UEXError("UEX_SECRET_KEY or UEX_BEARER_TOKEN is required for this action.")
if self.secret_key:
headers["secret-key"] = self.secret_key
if self.bearer_token:
headers["Authorization"] = f"Bearer {self.bearer_token}"
return headers
async def get(self, path: str, params: dict[str, Any] | None = None, authenticated: bool = False) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(
f"{self.base_url}/{path.strip('/')}/",
params={k: v for k, v in (params or {}).items() if v is not None},
headers=self._headers(authenticated),
)
return self._handle_response(response)
async def get_user(self, username: str | None = None, authenticated: bool = False) -> dict[str, Any]:
body = await self.get("user", {"username": username}, authenticated=authenticated)
data = body.get("data")
if isinstance(data, list):
data = data[0] if data else None
return {"status": body.get("status"), "user": data}
async def get_user_notifications(self) -> dict[str, Any]:
body = await self.get("user_notifications", authenticated=True)
data = body.get("data") or []
if isinstance(data, dict):
data = [data]
return {"status": body.get("status"), "notifications": data}
async def list_negotiations(
self,
id: int | None = None,
id_listing: int | None = None,
hash: str | None = None,
) -> dict[str, Any]:
body = await self.get(
"marketplace_negotiations",
{"id": id, "id_listing": id_listing, "hash": hash},
authenticated=True,
)
data = body.get("data") or []
if isinstance(data, dict):
data = [data]
return {"status": body.get("status"), "negotiations": data}
async def get_negotiation_messages(self, hash: str | None = None, id_negotiation: int | None = None) -> dict[str, Any]:
body = await self.get(
"marketplace_negotiations_messages",
{"hash": hash, "id_negotiation": id_negotiation},
authenticated=True,
)
data = body.get("data") or []
if isinstance(data, dict):
data = [data]
return {"status": body.get("status"), "messages": data}
async def send_negotiation_message(
self,
*,
message: str,
hash: str | None = None,
id_negotiation: int | None = None,
is_production: int = 1,
) -> dict[str, Any]:
return await self.post(
"marketplace_negotiations_messages",
{
"hash": hash,
"id_negotiation": id_negotiation,
"message": message,
"is_production": is_production,
},
authenticated=True,
)
async def close_negotiation(
self,
*,
hash: str | None = None,
id_negotiation: int | None = None,
deal_closed: bool,
deal_value: float | None = None,
currency: str | None = None,
clarity_rating: int | None = None,
speed_rating: int | None = None,
respect_rating: int | None = None,
fairness_rating: int | None = None,
comment: str | None = None,
is_production: int = 1,
) -> dict[str, Any]:
payload = {
"hash": hash,
"id_negotiation": id_negotiation,
"deal_closed": 1 if deal_closed else 0,
"deal_value": deal_value,
"currency": currency,
"clarity_rating": clarity_rating,
"speed_rating": speed_rating,
"respect_rating": respect_rating,
"fairness_rating": fairness_rating,
"comment": comment,
"is_production": is_production,
}
try:
return await self.post(
self.negotiation_close_endpoint,
payload,
authenticated=True,
)
except UEXError as exc:
raise UEXError(
"UEX negotiation close failed via endpoint "
f"`{self.negotiation_close_endpoint}`. If UEX changed this route, set "
"`UEX_NEGOTIATION_CLOSE_ENDPOINT` to the correct endpoint and retry. "
f"Original error: {exc}"
) from exc
async def post(self, path: str, payload: dict[str, Any], authenticated: bool = True) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
f"{self.base_url}/{path.strip('/')}/",
json=payload,
headers=self._headers(authenticated),
)
return self._handle_response(response)
async def delete(self, path: str, params: dict[str, Any] | None = None, authenticated: bool = True) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.delete(
f"{self.base_url}/{path.strip('/')}/",
params={k: v for k, v in (params or {}).items() if v is not None},
headers=self._headers(authenticated),
)
return self._handle_response(response)
@staticmethod
def _handle_response(response: httpx.Response) -> dict[str, Any]:
try:
body = response.json()
except ValueError as exc:
raise UEXError(f"UEX returned non-JSON response: HTTP {response.status_code}") from exc
if response.status_code >= 400:
raise UEXError(f"UEX HTTP {response.status_code}: {body}")
return body