343 lines
11 KiB
Python
343 lines
11 KiB
Python
from flask import Flask, jsonify, send_from_directory
|
|
from pathlib import Path
|
|
import time
|
|
import json
|
|
from urllib.parse import urljoin
|
|
import re
|
|
|
|
try:
|
|
import requests
|
|
except ImportError: # pragma: no cover
|
|
requests = None
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
except ImportError: # pragma: no cover
|
|
BeautifulSoup = None
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
CUSTOM_OFFERS_PATH = BASE_DIR / "custom_offers.json"
|
|
REMOTE_URL = "https://www.dedicatednodes.io/solana-nodes/"
|
|
CACHE_TTL = 5 * 60
|
|
OPTION_CACHE_TTL = 5 * 60
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
}
|
|
|
|
_cache = {"ts": 0, "data": None}
|
|
_remote_cache = {"ts": 0, "soup": None}
|
|
_instant_cache = {"ts": 0, "data": None}
|
|
_option_cache = {}
|
|
PRICE_PATTERN = re.compile(r"([+-])?\s*([€$£]|[A-Za-z]{1,3})?\s*([0-9.,]+)")
|
|
|
|
app = Flask(__name__)
|
|
|
|
|
|
def normalize_text(value: str) -> str:
|
|
if not value:
|
|
return ""
|
|
return " ".join(value.replace("\xa0", " ").split())
|
|
|
|
|
|
def text_from_element(el):
|
|
if not el:
|
|
return ""
|
|
return normalize_text(el.get_text(separator=" ", strip=True))
|
|
|
|
|
|
def find_card_root(node):
|
|
current = node
|
|
depth = 0
|
|
while current and depth < 6:
|
|
current = current.parent
|
|
depth += 1
|
|
if not current:
|
|
break
|
|
classes = current.get("class") or []
|
|
if isinstance(classes, str):
|
|
classes = classes.split()
|
|
if "card" in classes:
|
|
return current
|
|
return node.parent
|
|
|
|
|
|
def collect_features(card):
|
|
features = []
|
|
for li in card.select("li"):
|
|
component = li.select_one(".component")
|
|
if component:
|
|
label = normalize_text(component.get_text())
|
|
value = text_from_element(li.select_one(".component-value"))
|
|
description = text_from_element(li.select_one(".component-description"))
|
|
parts = [part for part in (value, description) if part]
|
|
if label and parts:
|
|
features.append("|".join([label] + parts))
|
|
continue
|
|
price_band = li.select_one(".price-band")
|
|
if price_band:
|
|
label_text = normalize_text(price_band.select_one(".label").get_text()) if price_band.select_one(".label") else "Starting from"
|
|
amount_text = normalize_text(price_band.select_one(".amount").get_text()) if price_band.select_one(".amount") else ""
|
|
per_text = normalize_text(price_band.select_one(".per").get_text()) if price_band.select_one(".per") else ""
|
|
parts = [part for part in (amount_text, per_text) if part]
|
|
if label_text and parts:
|
|
features.append("|".join([label_text] + parts))
|
|
return features
|
|
|
|
|
|
def parse_custom_offers(soup):
|
|
if not BeautifulSoup or not soup:
|
|
return []
|
|
offers = []
|
|
seen_links = set()
|
|
for anchor in soup.find_all("a", string=lambda text: text and "configure now" in text.lower()):
|
|
href = anchor.get("href")
|
|
if not href or href in seen_links:
|
|
continue
|
|
seen_links.add(href)
|
|
card = find_card_root(anchor)
|
|
if not card:
|
|
continue
|
|
title_el = card.find(class_="card-title") or card.find(["h2", "h3", "h4"])
|
|
title = normalize_text(title_el.get_text()) if title_el else "Custom Server"
|
|
features = collect_features(card)
|
|
if not features:
|
|
continue
|
|
link = urljoin(REMOTE_URL, href)
|
|
offer = {
|
|
"title": title,
|
|
"link": link,
|
|
"features": features,
|
|
}
|
|
options = parse_product_options(link)
|
|
if options:
|
|
offer["options"] = options
|
|
offers.append(offer)
|
|
return offers
|
|
|
|
|
|
def fetch_remote_html():
|
|
if not requests:
|
|
return None
|
|
try:
|
|
response = requests.get(REMOTE_URL, headers=HEADERS, timeout=10)
|
|
response.raise_for_status()
|
|
return response.text
|
|
except Exception as exc: # pragma: no cover
|
|
print(f"[serve10000] Unable to reach {REMOTE_URL}: {exc}")
|
|
return None
|
|
|
|
|
|
def load_fallback_offers():
|
|
if not CUSTOM_OFFERS_PATH.exists():
|
|
return []
|
|
try:
|
|
data = json.loads(CUSTOM_OFFERS_PATH.read_text(encoding="utf-8"))
|
|
if isinstance(data, list):
|
|
return data
|
|
except json.JSONDecodeError:
|
|
print("[serve10000] Invalid fallback JSON, returning empty list")
|
|
return []
|
|
|
|
|
|
def fetch_remote_soup():
|
|
if not BeautifulSoup:
|
|
return None
|
|
now = time.time()
|
|
cached = _remote_cache.get("soup")
|
|
if cached and now - _remote_cache["ts"] < CACHE_TTL:
|
|
return cached
|
|
html = fetch_remote_html()
|
|
if not html:
|
|
return None
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
_remote_cache["soup"] = soup
|
|
_remote_cache["ts"] = now
|
|
return soup
|
|
|
|
|
|
def parse_instant_offers(soup):
|
|
results = []
|
|
if not soup:
|
|
return results
|
|
table = soup.select_one("#instant-servers-table tbody")
|
|
if not table:
|
|
return results
|
|
for row in table.select("tr"):
|
|
cells = row.select("td")
|
|
if len(cells) < 8:
|
|
continue
|
|
cpu = normalize_text(cells[0].get_text())
|
|
location = normalize_text(cells[1].get_text())
|
|
cores = normalize_text(cells[2].get_text())
|
|
memory = normalize_text(cells[3].get_text())
|
|
storage = normalize_text(cells[4].get_text())
|
|
network = normalize_text(cells[5].get_text())
|
|
bandwidth = normalize_text(cells[6].get_text())
|
|
price_text = normalize_text(cells[7].get_text())
|
|
price_delta, currency_symbol = parse_price_delta(price_text)
|
|
order_link = ""
|
|
action = row.select_one(".action-cell a")
|
|
if action:
|
|
order_link = urljoin(REMOTE_URL, action.get("href", ""))
|
|
results.append(
|
|
{
|
|
"cpu": cpu,
|
|
"location": location,
|
|
"cores": cores,
|
|
"memory": memory,
|
|
"storage": storage,
|
|
"network": network,
|
|
"bandwidth": bandwidth,
|
|
"priceText": price_text,
|
|
"price": price_delta,
|
|
"currencySymbol": currency_symbol or "€",
|
|
"orderUrl": order_link,
|
|
}
|
|
)
|
|
return results
|
|
|
|
|
|
def get_instant_offers():
|
|
now = time.time()
|
|
if _instant_cache["data"] and now - _instant_cache["ts"] < CACHE_TTL:
|
|
return _instant_cache["data"]
|
|
soup = fetch_remote_soup()
|
|
offers = parse_instant_offers(soup)
|
|
_instant_cache["data"] = offers
|
|
_instant_cache["ts"] = now
|
|
return offers
|
|
|
|
|
|
def format_price_delta(delta: float, symbol: str) -> str:
|
|
if not delta:
|
|
return ""
|
|
sign = "+" if delta > 0 else "-"
|
|
abs_value = abs(delta)
|
|
formatted = f"{abs_value:,.2f}".replace(",", "_").replace(".", ",").replace("_", ".")
|
|
return f"{sign} {symbol}{formatted}".strip()
|
|
|
|
|
|
def strip_price_from_label(text: str) -> str:
|
|
cleaned = re.sub(
|
|
r"\s*[+-]?\s*([€$£]|[A-Za-z]{1,3}|ƒ,ª)?\s*[0-9.,]+\s*(EUR|USD|GBP)?",
|
|
"",
|
|
text,
|
|
flags=re.IGNORECASE,
|
|
).strip()
|
|
return cleaned or text
|
|
|
|
|
|
def parse_price_delta(text: str) -> tuple[float, str]:
|
|
match = PRICE_PATTERN.search(text)
|
|
if not match:
|
|
return 0.0, ""
|
|
sign = match.group(1)
|
|
symbol = match.group(2) or ""
|
|
raw_amount = match.group(3).replace(" ", "")
|
|
if raw_amount.count(",") and raw_amount.count("."):
|
|
normalized = raw_amount.replace(".", "").replace(",", ".")
|
|
elif "," in raw_amount:
|
|
normalized = raw_amount.replace(",", ".")
|
|
else:
|
|
normalized = raw_amount
|
|
try:
|
|
amount = float(normalized)
|
|
except ValueError:
|
|
amount = 0.0
|
|
if sign == "-":
|
|
amount = -amount
|
|
return amount, symbol
|
|
|
|
|
|
def parse_product_options(url: str):
|
|
now = time.time()
|
|
cached = _option_cache.get(url)
|
|
if cached and now - cached["ts"] < OPTION_CACHE_TTL:
|
|
return cached["data"]
|
|
if not requests or not BeautifulSoup:
|
|
return cached["data"] if cached else []
|
|
try:
|
|
response = requests.get(url, headers=HEADERS, timeout=10)
|
|
response.raise_for_status()
|
|
except Exception as exc:
|
|
print(f"[serve10000] Failed to fetch options from {url}: {exc}")
|
|
return cached["data"] if cached else []
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
selects = []
|
|
for select in soup.select('select[name^="configoption"]'):
|
|
label_text = ""
|
|
select_id = select.get("id")
|
|
if select_id:
|
|
label_el = soup.find("label", attrs={"for": select_id})
|
|
if label_el:
|
|
label_text = normalize_text(label_el.get_text())
|
|
if not label_text:
|
|
label_text = normalize_text(
|
|
select.get("data-option-name")
|
|
or select.get("data-custom-option-name")
|
|
or select.get("name")
|
|
)
|
|
choices = []
|
|
for opt in select.select("option"):
|
|
option_text = normalize_text(opt.get_text())
|
|
delta, currency = parse_price_delta(option_text)
|
|
choices.append(
|
|
{
|
|
"value": opt.get("value"),
|
|
"label": strip_price_from_label(option_text),
|
|
"rawLabel": option_text,
|
|
"priceDelta": delta,
|
|
"priceSymbol": currency,
|
|
"priceDisplay": format_price_delta(delta, currency or ""),
|
|
}
|
|
)
|
|
selects.append(
|
|
{
|
|
"name": select.get("name"),
|
|
"label": label_text,
|
|
"choices": choices,
|
|
}
|
|
)
|
|
_option_cache[url] = {"ts": now, "data": selects}
|
|
return selects
|
|
|
|
|
|
def get_custom_offers():
|
|
now = time.time()
|
|
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
|
|
return _cache["data"]
|
|
soup = fetch_remote_soup()
|
|
offers = parse_custom_offers(soup) if soup else []
|
|
if not offers:
|
|
offers = load_fallback_offers()
|
|
_cache["data"] = offers
|
|
_cache["ts"] = now
|
|
return offers
|
|
|
|
|
|
@app.route("/")
|
|
@app.route("/new-baremetal.html")
|
|
def serve_baremetal():
|
|
return send_from_directory(BASE_DIR, "new-baremetal.html")
|
|
|
|
|
|
@app.route("/custom_offers.json")
|
|
def custom_offers_file():
|
|
return send_from_directory(BASE_DIR, "custom_offers.json")
|
|
|
|
|
|
@app.route("/api/custom-offers")
|
|
def custom_offers_api():
|
|
return jsonify({"offers": get_custom_offers()})
|
|
|
|
|
|
@app.route("/api/instant-offers")
|
|
def instant_offers_api():
|
|
return jsonify({"offers": get_instant_offers()})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Starting DedicatedNodes mock site at http://127.0.0.1:10000/new-baremetal.html")
|
|
app.run(host="127.0.0.1", port=10000)
|