174 lines
6.5 KiB
Python
174 lines
6.5 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
|
|
class UEXError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class UEXClient:
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
secret_key: str | None = None,
|
|
bearer_token: str | None = None,
|
|
negotiation_close_endpoint: str = "marketplace_negotiations_close",
|
|
) -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self.secret_key = secret_key
|
|
self.bearer_token = bearer_token
|
|
self.negotiation_close_endpoint = negotiation_close_endpoint.strip().strip("/") or "marketplace_negotiations_close"
|
|
|
|
def _headers(self, authenticated: bool = False) -> dict[str, str]:
|
|
headers = {"Accept": "application/json"}
|
|
if authenticated:
|
|
if not self.secret_key and not self.bearer_token:
|
|
raise UEXError("UEX_SECRET_KEY or UEX_BEARER_TOKEN is required for this action.")
|
|
if self.secret_key:
|
|
headers["secret-key"] = self.secret_key
|
|
if self.bearer_token:
|
|
headers["Authorization"] = f"Bearer {self.bearer_token}"
|
|
return headers
|
|
|
|
async def get(self, path: str, params: dict[str, Any] | None = None, authenticated: bool = False) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/{path.strip('/')}/",
|
|
params={k: v for k, v in (params or {}).items() if v is not None},
|
|
headers=self._headers(authenticated),
|
|
)
|
|
return self._handle_response(response)
|
|
|
|
async def get_user(self, username: str | None = None, authenticated: bool = False) -> dict[str, Any]:
|
|
body = await self.get("user", {"username": username}, authenticated=authenticated)
|
|
data = body.get("data")
|
|
if isinstance(data, list):
|
|
data = data[0] if data else None
|
|
return {"status": body.get("status"), "user": data}
|
|
|
|
async def get_user_notifications(self) -> dict[str, Any]:
|
|
body = await self.get("user_notifications", authenticated=True)
|
|
data = body.get("data") or []
|
|
if isinstance(data, dict):
|
|
data = [data]
|
|
return {"status": body.get("status"), "notifications": data}
|
|
|
|
async def list_negotiations(
|
|
self,
|
|
id: int | None = None,
|
|
id_listing: int | None = None,
|
|
hash: str | None = None,
|
|
) -> dict[str, Any]:
|
|
body = await self.get(
|
|
"marketplace_negotiations",
|
|
{"id": id, "id_listing": id_listing, "hash": hash},
|
|
authenticated=True,
|
|
)
|
|
data = body.get("data") or []
|
|
if isinstance(data, dict):
|
|
data = [data]
|
|
return {"status": body.get("status"), "negotiations": data}
|
|
|
|
async def get_negotiation_messages(self, hash: str | None = None, id_negotiation: int | None = None) -> dict[str, Any]:
|
|
body = await self.get(
|
|
"marketplace_negotiations_messages",
|
|
{"hash": hash, "id_negotiation": id_negotiation},
|
|
authenticated=True,
|
|
)
|
|
data = body.get("data") or []
|
|
if isinstance(data, dict):
|
|
data = [data]
|
|
return {"status": body.get("status"), "messages": data}
|
|
|
|
async def send_negotiation_message(
|
|
self,
|
|
*,
|
|
message: str,
|
|
hash: str | None = None,
|
|
id_negotiation: int | None = None,
|
|
is_production: int = 1,
|
|
) -> dict[str, Any]:
|
|
return await self.post(
|
|
"marketplace_negotiations_messages",
|
|
{
|
|
"hash": hash,
|
|
"id_negotiation": id_negotiation,
|
|
"message": message,
|
|
"is_production": is_production,
|
|
},
|
|
authenticated=True,
|
|
)
|
|
|
|
async def close_negotiation(
|
|
self,
|
|
*,
|
|
hash: str | None = None,
|
|
id_negotiation: int | None = None,
|
|
deal_closed: bool,
|
|
deal_value: float | None = None,
|
|
currency: str | None = None,
|
|
clarity_rating: int | None = None,
|
|
speed_rating: int | None = None,
|
|
respect_rating: int | None = None,
|
|
fairness_rating: int | None = None,
|
|
comment: str | None = None,
|
|
is_production: int = 1,
|
|
) -> dict[str, Any]:
|
|
payload = {
|
|
"hash": hash,
|
|
"id_negotiation": id_negotiation,
|
|
"deal_closed": 1 if deal_closed else 0,
|
|
"deal_value": deal_value,
|
|
"currency": currency,
|
|
"clarity_rating": clarity_rating,
|
|
"speed_rating": speed_rating,
|
|
"respect_rating": respect_rating,
|
|
"fairness_rating": fairness_rating,
|
|
"comment": comment,
|
|
"is_production": is_production,
|
|
}
|
|
try:
|
|
return await self.post(
|
|
self.negotiation_close_endpoint,
|
|
payload,
|
|
authenticated=True,
|
|
)
|
|
except UEXError as exc:
|
|
raise UEXError(
|
|
"UEX negotiation close failed via endpoint "
|
|
f"`{self.negotiation_close_endpoint}`. If UEX changed this route, set "
|
|
"`UEX_NEGOTIATION_CLOSE_ENDPOINT` to the correct endpoint and retry. "
|
|
f"Original error: {exc}"
|
|
) from exc
|
|
|
|
async def post(self, path: str, payload: dict[str, Any], authenticated: bool = True) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/{path.strip('/')}/",
|
|
json=payload,
|
|
headers=self._headers(authenticated),
|
|
)
|
|
return self._handle_response(response)
|
|
|
|
async def delete(self, path: str, params: dict[str, Any] | None = None, authenticated: bool = True) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
response = await client.delete(
|
|
f"{self.base_url}/{path.strip('/')}/",
|
|
params={k: v for k, v in (params or {}).items() if v is not None},
|
|
headers=self._headers(authenticated),
|
|
)
|
|
return self._handle_response(response)
|
|
|
|
@staticmethod
|
|
def _handle_response(response: httpx.Response) -> dict[str, Any]:
|
|
try:
|
|
body = response.json()
|
|
except ValueError as exc:
|
|
raise UEXError(f"UEX returned non-JSON response: HTTP {response.status_code}") from exc
|
|
if response.status_code >= 400:
|
|
raise UEXError(f"UEX HTTP {response.status_code}: {body}")
|
|
return body
|