Files
ClaudeCode-Roblox-Studio-MCP/serve10000.py
2025-12-04 19:32:46 +04:00

343 lines
11 KiB
Python

from flask import Flask, jsonify, send_from_directory
from pathlib import Path
import time
import json
from urllib.parse import urljoin
import re
try:
import requests
except ImportError: # pragma: no cover
requests = None
try:
from bs4 import BeautifulSoup
except ImportError: # pragma: no cover
BeautifulSoup = None
BASE_DIR = Path(__file__).resolve().parent
CUSTOM_OFFERS_PATH = BASE_DIR / "custom_offers.json"
REMOTE_URL = "https://www.dedicatednodes.io/solana-nodes/"
CACHE_TTL = 5 * 60
OPTION_CACHE_TTL = 5 * 60
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
_cache = {"ts": 0, "data": None}
_remote_cache = {"ts": 0, "soup": None}
_instant_cache = {"ts": 0, "data": None}
_option_cache = {}
PRICE_PATTERN = re.compile(r"([+-])?\s*([€$£]|[A-Za-z]{1,3})?\s*([0-9.,]+)")
app = Flask(__name__)
def normalize_text(value: str) -> str:
if not value:
return ""
return " ".join(value.replace("\xa0", " ").split())
def text_from_element(el):
if not el:
return ""
return normalize_text(el.get_text(separator=" ", strip=True))
def find_card_root(node):
current = node
depth = 0
while current and depth < 6:
current = current.parent
depth += 1
if not current:
break
classes = current.get("class") or []
if isinstance(classes, str):
classes = classes.split()
if "card" in classes:
return current
return node.parent
def collect_features(card):
features = []
for li in card.select("li"):
component = li.select_one(".component")
if component:
label = normalize_text(component.get_text())
value = text_from_element(li.select_one(".component-value"))
description = text_from_element(li.select_one(".component-description"))
parts = [part for part in (value, description) if part]
if label and parts:
features.append("|".join([label] + parts))
continue
price_band = li.select_one(".price-band")
if price_band:
label_text = normalize_text(price_band.select_one(".label").get_text()) if price_band.select_one(".label") else "Starting from"
amount_text = normalize_text(price_band.select_one(".amount").get_text()) if price_band.select_one(".amount") else ""
per_text = normalize_text(price_band.select_one(".per").get_text()) if price_band.select_one(".per") else ""
parts = [part for part in (amount_text, per_text) if part]
if label_text and parts:
features.append("|".join([label_text] + parts))
return features
def parse_custom_offers(soup):
if not BeautifulSoup or not soup:
return []
offers = []
seen_links = set()
for anchor in soup.find_all("a", string=lambda text: text and "configure now" in text.lower()):
href = anchor.get("href")
if not href or href in seen_links:
continue
seen_links.add(href)
card = find_card_root(anchor)
if not card:
continue
title_el = card.find(class_="card-title") or card.find(["h2", "h3", "h4"])
title = normalize_text(title_el.get_text()) if title_el else "Custom Server"
features = collect_features(card)
if not features:
continue
link = urljoin(REMOTE_URL, href)
offer = {
"title": title,
"link": link,
"features": features,
}
options = parse_product_options(link)
if options:
offer["options"] = options
offers.append(offer)
return offers
def fetch_remote_html():
if not requests:
return None
try:
response = requests.get(REMOTE_URL, headers=HEADERS, timeout=10)
response.raise_for_status()
return response.text
except Exception as exc: # pragma: no cover
print(f"[serve10000] Unable to reach {REMOTE_URL}: {exc}")
return None
def load_fallback_offers():
if not CUSTOM_OFFERS_PATH.exists():
return []
try:
data = json.loads(CUSTOM_OFFERS_PATH.read_text(encoding="utf-8"))
if isinstance(data, list):
return data
except json.JSONDecodeError:
print("[serve10000] Invalid fallback JSON, returning empty list")
return []
def fetch_remote_soup():
if not BeautifulSoup:
return None
now = time.time()
cached = _remote_cache.get("soup")
if cached and now - _remote_cache["ts"] < CACHE_TTL:
return cached
html = fetch_remote_html()
if not html:
return None
soup = BeautifulSoup(html, "html.parser")
_remote_cache["soup"] = soup
_remote_cache["ts"] = now
return soup
def parse_instant_offers(soup):
results = []
if not soup:
return results
table = soup.select_one("#instant-servers-table tbody")
if not table:
return results
for row in table.select("tr"):
cells = row.select("td")
if len(cells) < 8:
continue
cpu = normalize_text(cells[0].get_text())
location = normalize_text(cells[1].get_text())
cores = normalize_text(cells[2].get_text())
memory = normalize_text(cells[3].get_text())
storage = normalize_text(cells[4].get_text())
network = normalize_text(cells[5].get_text())
bandwidth = normalize_text(cells[6].get_text())
price_text = normalize_text(cells[7].get_text())
price_delta, currency_symbol = parse_price_delta(price_text)
order_link = ""
action = row.select_one(".action-cell a")
if action:
order_link = urljoin(REMOTE_URL, action.get("href", ""))
results.append(
{
"cpu": cpu,
"location": location,
"cores": cores,
"memory": memory,
"storage": storage,
"network": network,
"bandwidth": bandwidth,
"priceText": price_text,
"price": price_delta,
"currencySymbol": currency_symbol or "",
"orderUrl": order_link,
}
)
return results
def get_instant_offers():
now = time.time()
if _instant_cache["data"] and now - _instant_cache["ts"] < CACHE_TTL:
return _instant_cache["data"]
soup = fetch_remote_soup()
offers = parse_instant_offers(soup)
_instant_cache["data"] = offers
_instant_cache["ts"] = now
return offers
def format_price_delta(delta: float, symbol: str) -> str:
if not delta:
return ""
sign = "+" if delta > 0 else "-"
abs_value = abs(delta)
formatted = f"{abs_value:,.2f}".replace(",", "_").replace(".", ",").replace("_", ".")
return f"{sign} {symbol}{formatted}".strip()
def strip_price_from_label(text: str) -> str:
cleaned = re.sub(
r"\s*[+-]?\s*([€$£]|[A-Za-z]{1,3}|ƒ,ª)?\s*[0-9.,]+\s*(EUR|USD|GBP)?",
"",
text,
flags=re.IGNORECASE,
).strip()
return cleaned or text
def parse_price_delta(text: str) -> tuple[float, str]:
match = PRICE_PATTERN.search(text)
if not match:
return 0.0, ""
sign = match.group(1)
symbol = match.group(2) or ""
raw_amount = match.group(3).replace(" ", "")
if raw_amount.count(",") and raw_amount.count("."):
normalized = raw_amount.replace(".", "").replace(",", ".")
elif "," in raw_amount:
normalized = raw_amount.replace(",", ".")
else:
normalized = raw_amount
try:
amount = float(normalized)
except ValueError:
amount = 0.0
if sign == "-":
amount = -amount
return amount, symbol
def parse_product_options(url: str):
now = time.time()
cached = _option_cache.get(url)
if cached and now - cached["ts"] < OPTION_CACHE_TTL:
return cached["data"]
if not requests or not BeautifulSoup:
return cached["data"] if cached else []
try:
response = requests.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
except Exception as exc:
print(f"[serve10000] Failed to fetch options from {url}: {exc}")
return cached["data"] if cached else []
soup = BeautifulSoup(response.text, "html.parser")
selects = []
for select in soup.select('select[name^="configoption"]'):
label_text = ""
select_id = select.get("id")
if select_id:
label_el = soup.find("label", attrs={"for": select_id})
if label_el:
label_text = normalize_text(label_el.get_text())
if not label_text:
label_text = normalize_text(
select.get("data-option-name")
or select.get("data-custom-option-name")
or select.get("name")
)
choices = []
for opt in select.select("option"):
option_text = normalize_text(opt.get_text())
delta, currency = parse_price_delta(option_text)
choices.append(
{
"value": opt.get("value"),
"label": strip_price_from_label(option_text),
"rawLabel": option_text,
"priceDelta": delta,
"priceSymbol": currency,
"priceDisplay": format_price_delta(delta, currency or ""),
}
)
selects.append(
{
"name": select.get("name"),
"label": label_text,
"choices": choices,
}
)
_option_cache[url] = {"ts": now, "data": selects}
return selects
def get_custom_offers():
now = time.time()
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
return _cache["data"]
soup = fetch_remote_soup()
offers = parse_custom_offers(soup) if soup else []
if not offers:
offers = load_fallback_offers()
_cache["data"] = offers
_cache["ts"] = now
return offers
@app.route("/")
@app.route("/new-baremetal.html")
def serve_baremetal():
return send_from_directory(BASE_DIR, "new-baremetal.html")
@app.route("/custom_offers.json")
def custom_offers_file():
return send_from_directory(BASE_DIR, "custom_offers.json")
@app.route("/api/custom-offers")
def custom_offers_api():
return jsonify({"offers": get_custom_offers()})
@app.route("/api/instant-offers")
def instant_offers_api():
return jsonify({"offers": get_instant_offers()})
if __name__ == "__main__":
print("Starting DedicatedNodes mock site at http://127.0.0.1:10000/new-baremetal.html")
app.run(host="127.0.0.1", port=10000)