from __future__ import annotations from html.parser import HTMLParser import base64 import json from typing import Any from urllib.parse import urljoin import httpx class CornerstoneError(RuntimeError): pass class CornerstoneClient: def __init__(self, base_url: str = "https://finder.cstone.space") -> None: self.base_url = base_url.rstrip("/") self._items: list[dict[str, Any]] | None = None async def list_items(self) -> list[dict[str, Any]]: if self._items is not None: return self._items body = await self._get_json("GetSearch") if isinstance(body, str): body = json.loads(body) if not isinstance(body, list): raise CornerstoneError("Cornerstone search response was not a list.") self._items = [ {"id": item.get("id"), "name": item.get("name"), "sold": bool(item.get("Sold"))} for item in body if isinstance(item, dict) and item.get("id") and item.get("name") ] return self._items async def get_item_page(self, item_id: str) -> dict[str, Any]: async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: response = await client.get( f"{self.base_url}/Search/{item_id.strip()}", headers={"Accept": "text/html,application/xhtml+xml"}, ) if response.status_code >= 400: raise CornerstoneError(f"Cornerstone HTTP {response.status_code}: {response.text[:240]}") return {"url": str(response.url), "html": response.text} async def get_image_data(self, url: str, max_bytes: int = 10_000_000) -> dict[str, Any]: async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: response = await client.get(url, headers={"Accept": "image/png,image/jpeg,image/*"}) if response.status_code >= 400: raise CornerstoneError(f"Cornerstone image HTTP {response.status_code}: {response.text[:240]}") content_type = response.headers.get("content-type", "").split(";")[0].strip().casefold() if content_type not in {"image/jpeg", "image/jpg", "image/png"}: raise CornerstoneError(f"Cornerstone image was not JPG or PNG: {content_type or 'unknown content type'}") if len(response.content) > max_bytes: raise CornerstoneError(f"Cornerstone image is larger than {max_bytes} bytes.") return { "url": str(response.url), "content_type": content_type, "size_bytes": len(response.content), "image_data": base64.b64encode(response.content).decode("ascii"), } async def _get_json(self, path: str) -> Any: async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: response = await client.get(f"{self.base_url}/{path.lstrip('/')}", headers={"Accept": "application/json"}) try: body = response.json() except ValueError as exc: raise CornerstoneError(f"Cornerstone returned non-JSON response: HTTP {response.status_code}") from exc if response.status_code >= 400: raise CornerstoneError(f"Cornerstone HTTP {response.status_code}: {body}") return body class CornerstonePageParser(HTMLParser): def __init__(self) -> None: super().__init__(convert_charrefs=True) self.title = "" self.tables: list[list[list[str]]] = [] self.images: list[dict[str, str]] = [] self._skip_depth = 0 self._in_title = False self._current_table: list[list[str]] | None = None self._current_row: list[str] | None = None self._current_cell: list[str] | None = None def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: tag = tag.casefold() if tag in {"script", "style"}: self._skip_depth += 1 return if self._skip_depth: return if tag == "title": self._in_title = True elif tag == "meta": attr_map = self._attrs(attrs) name = (attr_map.get("property") or attr_map.get("name") or "").casefold() content = attr_map.get("content") or "" if content and name in {"og:image", "twitter:image", "twitter:image:src"}: self.images.append({"url": content, "source": name}) elif tag == "link": attr_map = self._attrs(attrs) rel = (attr_map.get("rel") or "").casefold() href = attr_map.get("href") or "" if href and "image_src" in rel: self.images.append({"url": href, "source": "link:image_src"}) elif tag == "img": attr_map = self._attrs(attrs) url = attr_map.get("src") or attr_map.get("data-src") or attr_map.get("data-original") or "" if url: self.images.append( { "url": url, "alt": attr_map.get("alt") or "", "source": "img", } ) elif tag == "table": self._current_table = [] elif tag == "tr" and self._current_table is not None: self._current_row = [] elif tag in {"td", "th"} and self._current_row is not None: self._current_cell = [] def handle_endtag(self, tag: str) -> None: tag = tag.casefold() if tag in {"script", "style"} and self._skip_depth: self._skip_depth -= 1 return if self._skip_depth: return if tag == "title": self._in_title = False elif tag in {"td", "th"} and self._current_cell is not None and self._current_row is not None: text = " ".join("".join(self._current_cell).split()) self._current_row.append(text) self._current_cell = None elif tag == "tr" and self._current_row is not None and self._current_table is not None: if any(cell for cell in self._current_row): self._current_table.append(self._current_row) self._current_row = None elif tag == "table" and self._current_table is not None: if self._current_table: self.tables.append(self._current_table) self._current_table = None def handle_data(self, data: str) -> None: if self._skip_depth: return if self._in_title: self.title += data if self._current_cell is not None: self._current_cell.append(data) @staticmethod def _attrs(attrs: list[tuple[str, str | None]]) -> dict[str, str]: return {key.casefold(): value or "" for key, value in attrs} def parse_cornerstone_item_page(html: str, page_url: str | None = None) -> dict[str, Any]: parser = CornerstonePageParser() parser.feed(html) info: dict[str, Any] = {"page_title": " ".join(parser.title.split())} general: dict[str, str] = {} locations = [] for table in parser.tables: if not table: continue header = [cell.casefold() for cell in table[0]] if len(header) >= 3 and "location" in header[0] and "price" in header[1] and "verified" in header[2]: for row in table[1:]: if len(row) < 3: continue locations.append( { "location": row[0], "base_price": _parse_cornerstone_price(row[1]), "base_price_display": row[1], "verified": row[2], } ) elif all(len(row) >= 2 for row in table): for row in table: key = row[0].strip().lower().replace(" ", "_") value = row[1].strip() if key and value and key not in general: general[key] = value info["name"] = general.get("name") or _name_from_title(info["page_title"]) media = _dedupe_media(parser.images, page_url) if media: info["media"] = media if general: info["general"] = general info["locations"] = locations return info def _parse_cornerstone_price(value: str) -> int | None: digits = "".join(char for char in value if char.isdigit()) return int(digits) if digits else None def _name_from_title(title: str) -> str | None: if " - " not in title: return title or None return title.rsplit(" - ", 1)[-1].strip() or None def _dedupe_media(images: list[dict[str, str]], page_url: str | None = None) -> list[dict[str, str]]: media = [] seen = set() for image in images: raw_url = (image.get("url") or "").strip() if not raw_url or raw_url.startswith("data:"): continue url = urljoin(page_url or "", raw_url) if url in seen: continue seen.add(url) item = dict(image) item["url"] = url media.append(item) return media