177 lines
6.4 KiB
JavaScript
177 lines
6.4 KiB
JavaScript
import { mkdir, readdir, readFile, writeFile } from 'node:fs/promises';
|
|
import path from 'node:path';
|
|
import process from 'node:process';
|
|
|
|
const ROOT = path.resolve(path.dirname(new URL(import.meta.url).pathname), '..', '..');
|
|
const DATASET_DIR = path.join(ROOT, 'scripts/comms/datasets');
|
|
const OUTPUT_DIR = path.join(ROOT, 'artifacts/comms');
|
|
const OUTPUT_FILE = path.join(OUTPUT_DIR, 'current-metrics.json');
|
|
|
|
export function percentile(values, p) {
|
|
if (values.length === 0) return 0;
|
|
const sorted = [...values].sort((a, b) => a - b);
|
|
const idx = Math.min(sorted.length - 1, Math.ceil((p / 100) * sorted.length) - 1);
|
|
return sorted[idx];
|
|
}
|
|
|
|
export function dedupeKey(event) {
|
|
if (event.type !== 'gateway_event') return null;
|
|
const runId = event.runId ?? '';
|
|
const sessionKey = event.sessionKey ?? '';
|
|
const seq = event.seq ?? '';
|
|
const state = event.state ?? '';
|
|
if (!runId && !sessionKey && !seq && !state) return null;
|
|
return `${runId}|${sessionKey}|${seq}|${state}`;
|
|
}
|
|
|
|
export function calculateScenarioMetrics(events) {
|
|
let totalGatewayEvents = 0;
|
|
let uniqueGatewayEvents = 0;
|
|
let fanoutTotal = 0;
|
|
let duplicateGatewayEvents = 0;
|
|
let gatewayReconnectCount = 0;
|
|
let messageLossCount = 0;
|
|
let messageOrderViolationCount = 0;
|
|
let rpcTimeoutCount = 0;
|
|
const rpcLatencies = [];
|
|
const dedupeSet = new Set();
|
|
const historyInFlight = new Map();
|
|
let historyInflightMax = 0;
|
|
let historyLoadCount = 0;
|
|
|
|
const sorted = [...events].sort((a, b) => (a.ts ?? 0) - (b.ts ?? 0));
|
|
const startTs = sorted.length > 0 ? (sorted[0].ts ?? 0) : 0;
|
|
const endTs = sorted.length > 0 ? (sorted[sorted.length - 1].ts ?? 0) : 0;
|
|
const durationSec = Math.max(1, endTs - startTs);
|
|
|
|
for (const event of sorted) {
|
|
if (event.type === 'gateway_event') {
|
|
totalGatewayEvents += 1;
|
|
fanoutTotal += Number(event.fanout ?? 1);
|
|
const key = dedupeKey(event);
|
|
if (!key || !dedupeSet.has(key)) {
|
|
uniqueGatewayEvents += 1;
|
|
if (key) dedupeSet.add(key);
|
|
} else {
|
|
duplicateGatewayEvents += 1;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (event.type === 'history_load') {
|
|
const sessionKey = String(event.sessionKey ?? 'unknown');
|
|
if (event.action === 'start') {
|
|
const next = (historyInFlight.get(sessionKey) ?? 0) + 1;
|
|
historyInFlight.set(sessionKey, next);
|
|
historyInflightMax = Math.max(historyInflightMax, next);
|
|
historyLoadCount += 1;
|
|
} else if (event.action === 'end') {
|
|
const current = historyInFlight.get(sessionKey) ?? 0;
|
|
historyInFlight.set(sessionKey, Math.max(0, current - 1));
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (event.type === 'rpc') {
|
|
const latency = Number(event.latencyMs ?? 0);
|
|
if (latency > 0) rpcLatencies.push(latency);
|
|
if (event.timeout === true) rpcTimeoutCount += 1;
|
|
continue;
|
|
}
|
|
|
|
if (event.type === 'gateway_reconnect') {
|
|
gatewayReconnectCount += 1;
|
|
continue;
|
|
}
|
|
|
|
if (event.type === 'message') {
|
|
if (event.lost === true) messageLossCount += 1;
|
|
if (event.orderViolation === true) messageOrderViolationCount += 1;
|
|
}
|
|
}
|
|
|
|
return {
|
|
duplicate_event_rate: totalGatewayEvents > 0 ? duplicateGatewayEvents / totalGatewayEvents : 0,
|
|
event_fanout_ratio: uniqueGatewayEvents > 0 ? fanoutTotal / uniqueGatewayEvents : 0,
|
|
history_inflight_max: historyInflightMax,
|
|
history_load_qps: historyLoadCount / durationSec,
|
|
rpc_p50_ms: percentile(rpcLatencies, 50),
|
|
rpc_p95_ms: percentile(rpcLatencies, 95),
|
|
rpc_timeout_rate: rpcLatencies.length > 0 ? rpcTimeoutCount / rpcLatencies.length : 0,
|
|
gateway_reconnect_count: gatewayReconnectCount,
|
|
message_loss_count: messageLossCount,
|
|
message_order_violation_count: messageOrderViolationCount,
|
|
_meta: {
|
|
duration_sec: durationSec,
|
|
total_gateway_events: totalGatewayEvents,
|
|
unique_gateway_events: uniqueGatewayEvents,
|
|
total_rpc_calls: rpcLatencies.length,
|
|
},
|
|
};
|
|
}
|
|
|
|
export function aggregateMetrics(metricsList) {
|
|
if (metricsList.length === 0) {
|
|
return calculateScenarioMetrics([]);
|
|
}
|
|
const sum = (key) => metricsList.reduce((acc, item) => acc + Number(item[key] ?? 0), 0);
|
|
return {
|
|
duplicate_event_rate: sum('duplicate_event_rate') / metricsList.length,
|
|
event_fanout_ratio: sum('event_fanout_ratio') / metricsList.length,
|
|
history_inflight_max: Math.max(...metricsList.map((m) => Number(m.history_inflight_max ?? 0))),
|
|
history_load_qps: sum('history_load_qps') / metricsList.length,
|
|
rpc_p50_ms: sum('rpc_p50_ms') / metricsList.length,
|
|
rpc_p95_ms: sum('rpc_p95_ms') / metricsList.length,
|
|
rpc_timeout_rate: sum('rpc_timeout_rate') / metricsList.length,
|
|
gateway_reconnect_count: Math.round(sum('gateway_reconnect_count')),
|
|
message_loss_count: Math.round(sum('message_loss_count')),
|
|
message_order_violation_count: Math.round(sum('message_order_violation_count')),
|
|
};
|
|
}
|
|
|
|
export async function loadScenario(fileName) {
|
|
const fullPath = path.join(DATASET_DIR, fileName);
|
|
const raw = await readFile(fullPath, 'utf8');
|
|
const lines = raw.split('\n').map((line) => line.trim()).filter(Boolean);
|
|
return lines.map((line) => JSON.parse(line));
|
|
}
|
|
|
|
export async function main() {
|
|
const argScenario = process.argv.find((arg) => arg.startsWith('--scenario='))?.split('=')[1] ?? 'all';
|
|
const files = (await readdir(DATASET_DIR)).filter((name) => name.endsWith('.jsonl')).sort();
|
|
const selectedFiles = argScenario === 'all'
|
|
? files
|
|
: files.filter((name) => name === `${argScenario}.jsonl`);
|
|
|
|
if (selectedFiles.length === 0) {
|
|
throw new Error(`No dataset found for scenario "${argScenario}"`);
|
|
}
|
|
|
|
const scenarios = {};
|
|
for (const fileName of selectedFiles) {
|
|
const scenarioName = fileName.replace(/\.jsonl$/, '');
|
|
const events = await loadScenario(fileName);
|
|
scenarios[scenarioName] = calculateScenarioMetrics(events);
|
|
}
|
|
|
|
const aggregate = aggregateMetrics(Object.values(scenarios));
|
|
const output = {
|
|
generated_at: new Date().toISOString(),
|
|
scenario: argScenario,
|
|
scenarios,
|
|
aggregate,
|
|
};
|
|
|
|
await mkdir(OUTPUT_DIR, { recursive: true });
|
|
await writeFile(OUTPUT_FILE, JSON.stringify(output, null, 2));
|
|
console.log(`Wrote comms replay metrics to ${OUTPUT_FILE}`);
|
|
}
|
|
|
|
const isEntrypoint = process.argv[1] && path.resolve(process.argv[1]) === path.resolve(new URL(import.meta.url).pathname);
|
|
if (isEntrypoint) {
|
|
main().catch((error) => {
|
|
console.error('[comms:replay] failed:', error);
|
|
process.exitCode = 1;
|
|
});
|
|
}
|