Files
TraderAI/traderai/cornerstone_client.py
HRiggs 97c751c585
Build Release EXE / build-windows-exe (release) Successful in 52s
versioning: 0.0.4, feat: create listing, source image
2026-05-08 00:02:59 -04:00

227 lines
8.9 KiB
Python

from __future__ import annotations
from html.parser import HTMLParser
import base64
import json
from typing import Any
from urllib.parse import urljoin
import httpx
class CornerstoneError(RuntimeError):
pass
class CornerstoneClient:
def __init__(self, base_url: str = "https://finder.cstone.space") -> None:
self.base_url = base_url.rstrip("/")
self._items: list[dict[str, Any]] | None = None
async def list_items(self) -> list[dict[str, Any]]:
if self._items is not None:
return self._items
body = await self._get_json("GetSearch")
if isinstance(body, str):
body = json.loads(body)
if not isinstance(body, list):
raise CornerstoneError("Cornerstone search response was not a list.")
self._items = [
{"id": item.get("id"), "name": item.get("name"), "sold": bool(item.get("Sold"))}
for item in body
if isinstance(item, dict) and item.get("id") and item.get("name")
]
return self._items
async def get_item_page(self, item_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
response = await client.get(
f"{self.base_url}/Search/{item_id.strip()}",
headers={"Accept": "text/html,application/xhtml+xml"},
)
if response.status_code >= 400:
raise CornerstoneError(f"Cornerstone HTTP {response.status_code}: {response.text[:240]}")
return {"url": str(response.url), "html": response.text}
async def get_image_data(self, url: str, max_bytes: int = 10_000_000) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
response = await client.get(url, headers={"Accept": "image/png,image/jpeg,image/*"})
if response.status_code >= 400:
raise CornerstoneError(f"Cornerstone image HTTP {response.status_code}: {response.text[:240]}")
content_type = response.headers.get("content-type", "").split(";")[0].strip().casefold()
if content_type not in {"image/jpeg", "image/jpg", "image/png"}:
raise CornerstoneError(f"Cornerstone image was not JPG or PNG: {content_type or 'unknown content type'}")
if len(response.content) > max_bytes:
raise CornerstoneError(f"Cornerstone image is larger than {max_bytes} bytes.")
return {
"url": str(response.url),
"content_type": content_type,
"size_bytes": len(response.content),
"image_data": base64.b64encode(response.content).decode("ascii"),
}
async def _get_json(self, path: str) -> Any:
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
response = await client.get(f"{self.base_url}/{path.lstrip('/')}", headers={"Accept": "application/json"})
try:
body = response.json()
except ValueError as exc:
raise CornerstoneError(f"Cornerstone returned non-JSON response: HTTP {response.status_code}") from exc
if response.status_code >= 400:
raise CornerstoneError(f"Cornerstone HTTP {response.status_code}: {body}")
return body
class CornerstonePageParser(HTMLParser):
def __init__(self) -> None:
super().__init__(convert_charrefs=True)
self.title = ""
self.tables: list[list[list[str]]] = []
self.images: list[dict[str, str]] = []
self._skip_depth = 0
self._in_title = False
self._current_table: list[list[str]] | None = None
self._current_row: list[str] | None = None
self._current_cell: list[str] | None = None
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
tag = tag.casefold()
if tag in {"script", "style"}:
self._skip_depth += 1
return
if self._skip_depth:
return
if tag == "title":
self._in_title = True
elif tag == "meta":
attr_map = self._attrs(attrs)
name = (attr_map.get("property") or attr_map.get("name") or "").casefold()
content = attr_map.get("content") or ""
if content and name in {"og:image", "twitter:image", "twitter:image:src"}:
self.images.append({"url": content, "source": name})
elif tag == "link":
attr_map = self._attrs(attrs)
rel = (attr_map.get("rel") or "").casefold()
href = attr_map.get("href") or ""
if href and "image_src" in rel:
self.images.append({"url": href, "source": "link:image_src"})
elif tag == "img":
attr_map = self._attrs(attrs)
url = attr_map.get("src") or attr_map.get("data-src") or attr_map.get("data-original") or ""
if url:
self.images.append(
{
"url": url,
"alt": attr_map.get("alt") or "",
"source": "img",
}
)
elif tag == "table":
self._current_table = []
elif tag == "tr" and self._current_table is not None:
self._current_row = []
elif tag in {"td", "th"} and self._current_row is not None:
self._current_cell = []
def handle_endtag(self, tag: str) -> None:
tag = tag.casefold()
if tag in {"script", "style"} and self._skip_depth:
self._skip_depth -= 1
return
if self._skip_depth:
return
if tag == "title":
self._in_title = False
elif tag in {"td", "th"} and self._current_cell is not None and self._current_row is not None:
text = " ".join("".join(self._current_cell).split())
self._current_row.append(text)
self._current_cell = None
elif tag == "tr" and self._current_row is not None and self._current_table is not None:
if any(cell for cell in self._current_row):
self._current_table.append(self._current_row)
self._current_row = None
elif tag == "table" and self._current_table is not None:
if self._current_table:
self.tables.append(self._current_table)
self._current_table = None
def handle_data(self, data: str) -> None:
if self._skip_depth:
return
if self._in_title:
self.title += data
if self._current_cell is not None:
self._current_cell.append(data)
@staticmethod
def _attrs(attrs: list[tuple[str, str | None]]) -> dict[str, str]:
return {key.casefold(): value or "" for key, value in attrs}
def parse_cornerstone_item_page(html: str, page_url: str | None = None) -> dict[str, Any]:
parser = CornerstonePageParser()
parser.feed(html)
info: dict[str, Any] = {"page_title": " ".join(parser.title.split())}
general: dict[str, str] = {}
locations = []
for table in parser.tables:
if not table:
continue
header = [cell.casefold() for cell in table[0]]
if len(header) >= 3 and "location" in header[0] and "price" in header[1] and "verified" in header[2]:
for row in table[1:]:
if len(row) < 3:
continue
locations.append(
{
"location": row[0],
"base_price": _parse_cornerstone_price(row[1]),
"base_price_display": row[1],
"verified": row[2],
}
)
elif all(len(row) >= 2 for row in table):
for row in table:
key = row[0].strip().lower().replace(" ", "_")
value = row[1].strip()
if key and value and key not in general:
general[key] = value
info["name"] = general.get("name") or _name_from_title(info["page_title"])
media = _dedupe_media(parser.images, page_url)
if media:
info["media"] = media
if general:
info["general"] = general
info["locations"] = locations
return info
def _parse_cornerstone_price(value: str) -> int | None:
digits = "".join(char for char in value if char.isdigit())
return int(digits) if digits else None
def _name_from_title(title: str) -> str | None:
if " - " not in title:
return title or None
return title.rsplit(" - ", 1)[-1].strip() or None
def _dedupe_media(images: list[dict[str, str]], page_url: str | None = None) -> list[dict[str, str]]:
media = []
seen = set()
for image in images:
raw_url = (image.get("url") or "").strip()
if not raw_url or raw_url.startswith("data:"):
continue
url = urljoin(page_url or "", raw_url)
if url in seen:
continue
seen.add(url)
item = dict(image)
item["url"] = url
media.append(item)
return media