160 lines
5.9 KiB
Python
160 lines
5.9 KiB
Python
from __future__ import annotations
|
|
|
|
from html.parser import HTMLParser
|
|
import json
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
|
|
class CornerstoneError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class CornerstoneClient:
|
|
def __init__(self, base_url: str = "https://finder.cstone.space") -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self._items: list[dict[str, Any]] | None = None
|
|
|
|
async def list_items(self) -> list[dict[str, Any]]:
|
|
if self._items is not None:
|
|
return self._items
|
|
body = await self._get_json("GetSearch")
|
|
if isinstance(body, str):
|
|
body = json.loads(body)
|
|
if not isinstance(body, list):
|
|
raise CornerstoneError("Cornerstone search response was not a list.")
|
|
self._items = [
|
|
{"id": item.get("id"), "name": item.get("name"), "sold": bool(item.get("Sold"))}
|
|
for item in body
|
|
if isinstance(item, dict) and item.get("id") and item.get("name")
|
|
]
|
|
return self._items
|
|
|
|
async def get_item_page(self, item_id: str) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/Search/{item_id.strip()}",
|
|
headers={"Accept": "text/html,application/xhtml+xml"},
|
|
)
|
|
if response.status_code >= 400:
|
|
raise CornerstoneError(f"Cornerstone HTTP {response.status_code}: {response.text[:240]}")
|
|
return {"url": str(response.url), "html": response.text}
|
|
|
|
async def _get_json(self, path: str) -> Any:
|
|
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
|
|
response = await client.get(f"{self.base_url}/{path.lstrip('/')}", headers={"Accept": "application/json"})
|
|
try:
|
|
body = response.json()
|
|
except ValueError as exc:
|
|
raise CornerstoneError(f"Cornerstone returned non-JSON response: HTTP {response.status_code}") from exc
|
|
if response.status_code >= 400:
|
|
raise CornerstoneError(f"Cornerstone HTTP {response.status_code}: {body}")
|
|
return body
|
|
|
|
|
|
class CornerstonePageParser(HTMLParser):
|
|
def __init__(self) -> None:
|
|
super().__init__(convert_charrefs=True)
|
|
self.title = ""
|
|
self.tables: list[list[list[str]]] = []
|
|
self._skip_depth = 0
|
|
self._in_title = False
|
|
self._current_table: list[list[str]] | None = None
|
|
self._current_row: list[str] | None = None
|
|
self._current_cell: list[str] | None = None
|
|
|
|
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
|
|
tag = tag.casefold()
|
|
if tag in {"script", "style"}:
|
|
self._skip_depth += 1
|
|
return
|
|
if self._skip_depth:
|
|
return
|
|
if tag == "title":
|
|
self._in_title = True
|
|
elif tag == "table":
|
|
self._current_table = []
|
|
elif tag == "tr" and self._current_table is not None:
|
|
self._current_row = []
|
|
elif tag in {"td", "th"} and self._current_row is not None:
|
|
self._current_cell = []
|
|
|
|
def handle_endtag(self, tag: str) -> None:
|
|
tag = tag.casefold()
|
|
if tag in {"script", "style"} and self._skip_depth:
|
|
self._skip_depth -= 1
|
|
return
|
|
if self._skip_depth:
|
|
return
|
|
if tag == "title":
|
|
self._in_title = False
|
|
elif tag in {"td", "th"} and self._current_cell is not None and self._current_row is not None:
|
|
text = " ".join("".join(self._current_cell).split())
|
|
self._current_row.append(text)
|
|
self._current_cell = None
|
|
elif tag == "tr" and self._current_row is not None and self._current_table is not None:
|
|
if any(cell for cell in self._current_row):
|
|
self._current_table.append(self._current_row)
|
|
self._current_row = None
|
|
elif tag == "table" and self._current_table is not None:
|
|
if self._current_table:
|
|
self.tables.append(self._current_table)
|
|
self._current_table = None
|
|
|
|
def handle_data(self, data: str) -> None:
|
|
if self._skip_depth:
|
|
return
|
|
if self._in_title:
|
|
self.title += data
|
|
if self._current_cell is not None:
|
|
self._current_cell.append(data)
|
|
|
|
|
|
def parse_cornerstone_item_page(html: str) -> dict[str, Any]:
|
|
parser = CornerstonePageParser()
|
|
parser.feed(html)
|
|
info: dict[str, Any] = {"page_title": " ".join(parser.title.split())}
|
|
general: dict[str, str] = {}
|
|
locations = []
|
|
|
|
for table in parser.tables:
|
|
if not table:
|
|
continue
|
|
header = [cell.casefold() for cell in table[0]]
|
|
if len(header) >= 3 and "location" in header[0] and "price" in header[1] and "verified" in header[2]:
|
|
for row in table[1:]:
|
|
if len(row) < 3:
|
|
continue
|
|
locations.append(
|
|
{
|
|
"location": row[0],
|
|
"base_price": _parse_cornerstone_price(row[1]),
|
|
"base_price_display": row[1],
|
|
"verified": row[2],
|
|
}
|
|
)
|
|
elif all(len(row) >= 2 for row in table):
|
|
for row in table:
|
|
key = row[0].strip().lower().replace(" ", "_")
|
|
value = row[1].strip()
|
|
if key and value and key not in general:
|
|
general[key] = value
|
|
|
|
info["name"] = general.get("name") or _name_from_title(info["page_title"])
|
|
if general:
|
|
info["general"] = general
|
|
info["locations"] = locations
|
|
return info
|
|
|
|
|
|
def _parse_cornerstone_price(value: str) -> int | None:
|
|
digits = "".join(char for char in value if char.isdigit())
|
|
return int(digits) if digits else None
|
|
|
|
|
|
def _name_from_title(title: str) -> str | None:
|
|
if " - " not in title:
|
|
return title or None
|
|
return title.rsplit(" - ", 1)[-1].strip() or None
|