from flask import Flask, jsonify, send_from_directory from pathlib import Path import time import json from urllib.parse import urljoin import re try: import requests except ImportError: # pragma: no cover requests = None try: from bs4 import BeautifulSoup except ImportError: # pragma: no cover BeautifulSoup = None BASE_DIR = Path(__file__).resolve().parent CUSTOM_OFFERS_PATH = BASE_DIR / "custom_offers.json" REMOTE_URL = "https://www.dedicatednodes.io/solana-nodes/" CACHE_TTL = 5 * 60 OPTION_CACHE_TTL = 5 * 60 HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } _cache = {"ts": 0, "data": None} _remote_cache = {"ts": 0, "soup": None} _instant_cache = {"ts": 0, "data": None} _option_cache = {} PRICE_PATTERN = re.compile(r"([+-])?\s*([€$£]|[A-Za-z]{1,3})?\s*([0-9.,]+)") app = Flask(__name__) def normalize_text(value: str) -> str: if not value: return "" return " ".join(value.replace("\xa0", " ").split()) def text_from_element(el): if not el: return "" return normalize_text(el.get_text(separator=" ", strip=True)) def find_card_root(node): current = node depth = 0 while current and depth < 6: current = current.parent depth += 1 if not current: break classes = current.get("class") or [] if isinstance(classes, str): classes = classes.split() if "card" in classes: return current return node.parent def collect_features(card): features = [] for li in card.select("li"): component = li.select_one(".component") if component: label = normalize_text(component.get_text()) value = text_from_element(li.select_one(".component-value")) description = text_from_element(li.select_one(".component-description")) parts = [part for part in (value, description) if part] if label and parts: features.append("|".join([label] + parts)) continue price_band = li.select_one(".price-band") if price_band: label_text = normalize_text(price_band.select_one(".label").get_text()) if price_band.select_one(".label") else "Starting from" amount_text = normalize_text(price_band.select_one(".amount").get_text()) if price_band.select_one(".amount") else "" per_text = normalize_text(price_band.select_one(".per").get_text()) if price_band.select_one(".per") else "" parts = [part for part in (amount_text, per_text) if part] if label_text and parts: features.append("|".join([label_text] + parts)) return features def parse_custom_offers(soup): if not BeautifulSoup or not soup: return [] offers = [] seen_links = set() for anchor in soup.find_all("a", string=lambda text: text and "configure now" in text.lower()): href = anchor.get("href") if not href or href in seen_links: continue seen_links.add(href) card = find_card_root(anchor) if not card: continue title_el = card.find(class_="card-title") or card.find(["h2", "h3", "h4"]) title = normalize_text(title_el.get_text()) if title_el else "Custom Server" features = collect_features(card) if not features: continue link = urljoin(REMOTE_URL, href) offer = { "title": title, "link": link, "features": features, } options = parse_product_options(link) if options: offer["options"] = options offers.append(offer) return offers def fetch_remote_html(): if not requests: return None try: response = requests.get(REMOTE_URL, headers=HEADERS, timeout=10) response.raise_for_status() return response.text except Exception as exc: # pragma: no cover print(f"[serve10000] Unable to reach {REMOTE_URL}: {exc}") return None def load_fallback_offers(): if not CUSTOM_OFFERS_PATH.exists(): return [] try: data = json.loads(CUSTOM_OFFERS_PATH.read_text(encoding="utf-8")) if isinstance(data, list): return data except json.JSONDecodeError: print("[serve10000] Invalid fallback JSON, returning empty list") return [] def fetch_remote_soup(): if not BeautifulSoup: return None now = time.time() cached = _remote_cache.get("soup") if cached and now - _remote_cache["ts"] < CACHE_TTL: return cached html = fetch_remote_html() if not html: return None soup = BeautifulSoup(html, "html.parser") _remote_cache["soup"] = soup _remote_cache["ts"] = now return soup def parse_instant_offers(soup): results = [] if not soup: return results table = soup.select_one("#instant-servers-table tbody") if not table: return results for row in table.select("tr"): cells = row.select("td") if len(cells) < 8: continue cpu = normalize_text(cells[0].get_text()) location = normalize_text(cells[1].get_text()) cores = normalize_text(cells[2].get_text()) memory = normalize_text(cells[3].get_text()) storage = normalize_text(cells[4].get_text()) network = normalize_text(cells[5].get_text()) bandwidth = normalize_text(cells[6].get_text()) price_text = normalize_text(cells[7].get_text()) price_delta, currency_symbol = parse_price_delta(price_text) order_link = "" action = row.select_one(".action-cell a") if action: order_link = urljoin(REMOTE_URL, action.get("href", "")) results.append( { "cpu": cpu, "location": location, "cores": cores, "memory": memory, "storage": storage, "network": network, "bandwidth": bandwidth, "priceText": price_text, "price": price_delta, "currencySymbol": currency_symbol or "€", "orderUrl": order_link, } ) return results def get_instant_offers(): now = time.time() if _instant_cache["data"] and now - _instant_cache["ts"] < CACHE_TTL: return _instant_cache["data"] soup = fetch_remote_soup() offers = parse_instant_offers(soup) _instant_cache["data"] = offers _instant_cache["ts"] = now return offers def format_price_delta(delta: float, symbol: str) -> str: if not delta: return "" sign = "+" if delta > 0 else "-" abs_value = abs(delta) formatted = f"{abs_value:,.2f}".replace(",", "_").replace(".", ",").replace("_", ".") return f"{sign} {symbol}{formatted}".strip() def strip_price_from_label(text: str) -> str: cleaned = re.sub( r"\s*[+-]?\s*([€$£]|[A-Za-z]{1,3}|ƒ,ª)?\s*[0-9.,]+\s*(EUR|USD|GBP)?", "", text, flags=re.IGNORECASE, ).strip() return cleaned or text def parse_price_delta(text: str) -> tuple[float, str]: match = PRICE_PATTERN.search(text) if not match: return 0.0, "" sign = match.group(1) symbol = match.group(2) or "" raw_amount = match.group(3).replace(" ", "") if raw_amount.count(",") and raw_amount.count("."): normalized = raw_amount.replace(".", "").replace(",", ".") elif "," in raw_amount: normalized = raw_amount.replace(",", ".") else: normalized = raw_amount try: amount = float(normalized) except ValueError: amount = 0.0 if sign == "-": amount = -amount return amount, symbol def parse_product_options(url: str): now = time.time() cached = _option_cache.get(url) if cached and now - cached["ts"] < OPTION_CACHE_TTL: return cached["data"] if not requests or not BeautifulSoup: return cached["data"] if cached else [] try: response = requests.get(url, headers=HEADERS, timeout=10) response.raise_for_status() except Exception as exc: print(f"[serve10000] Failed to fetch options from {url}: {exc}") return cached["data"] if cached else [] soup = BeautifulSoup(response.text, "html.parser") selects = [] for select in soup.select('select[name^="configoption"]'): label_text = "" select_id = select.get("id") if select_id: label_el = soup.find("label", attrs={"for": select_id}) if label_el: label_text = normalize_text(label_el.get_text()) if not label_text: label_text = normalize_text( select.get("data-option-name") or select.get("data-custom-option-name") or select.get("name") ) choices = [] for opt in select.select("option"): option_text = normalize_text(opt.get_text()) delta, currency = parse_price_delta(option_text) choices.append( { "value": opt.get("value"), "label": strip_price_from_label(option_text), "rawLabel": option_text, "priceDelta": delta, "priceSymbol": currency, "priceDisplay": format_price_delta(delta, currency or ""), } ) selects.append( { "name": select.get("name"), "label": label_text, "choices": choices, } ) _option_cache[url] = {"ts": now, "data": selects} return selects def get_custom_offers(): now = time.time() if _cache["data"] and now - _cache["ts"] < CACHE_TTL: return _cache["data"] soup = fetch_remote_soup() offers = parse_custom_offers(soup) if soup else [] if not offers: offers = load_fallback_offers() _cache["data"] = offers _cache["ts"] = now return offers @app.route("/") @app.route("/new-baremetal.html") def serve_baremetal(): return send_from_directory(BASE_DIR, "new-baremetal.html") @app.route("/custom_offers.json") def custom_offers_file(): return send_from_directory(BASE_DIR, "custom_offers.json") @app.route("/api/custom-offers") def custom_offers_api(): return jsonify({"offers": get_custom_offers()}) @app.route("/api/instant-offers") def instant_offers_api(): return jsonify({"offers": get_instant_offers()}) if __name__ == "__main__": print("Starting DedicatedNodes mock site at http://127.0.0.1:10000/new-baremetal.html") app.run(host="127.0.0.1", port=10000)