#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import json from pathlib import Path from typing import Any def normalize_asn(value: Any) -> str: text = str(value).strip().upper() if text.startswith("AS"): text = text[2:] return f"AS{int(text)}" def write_set(path: Path, rows: set[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(sorted(rows)) + ("\n" if rows else ""), encoding="utf-8") def load_json(path: Path) -> Any: return json.loads(path.read_text(encoding="utf-8")) def normalize_vrps_from_json(path: Path) -> set[str]: data = load_json(path) rows: set[str] = set() objects: list[dict[str, Any]] = [] if isinstance(data, dict): for key in ("roas", "routeOrigins", "valid_roas"): value = data.get(key) if isinstance(value, list): objects.extend(item for item in value if isinstance(item, dict)) elif isinstance(data, list): objects = [item for item in data if isinstance(item, dict)] for item in objects: asn = item.get("asn") or item.get("asID") or item.get("origin") prefix = item.get("prefix") max_length = item.get("maxLength") or item.get("max_length") or item.get("maxlen") or item.get("maxLengthPrefix") if asn is None or prefix is None or max_length is None: continue rows.add(f"{normalize_asn(asn)}|{prefix}|{int(max_length)}") return rows def normalize_vaps_from_json(path: Path) -> set[str]: data = load_json(path) rows: set[str] = set() objects: list[dict[str, Any]] = [] if isinstance(data, dict): for key in ("aspas", "aspaAssertions", "vaps"): value = data.get(key) if isinstance(value, list): objects.extend(item for item in value if isinstance(item, dict)) elif isinstance(data, list): objects = [item for item in data if isinstance(item, dict)] for item in objects: customer = ( item.get("customer") or item.get("customer_asid") or item.get("customerASID") or item.get("customerAsid") or item.get("customerAsn") ) providers = item.get("providers") or item.get("provider_asns") or item.get("providerASNs") or [] if customer is None: continue provider_asns = [normalize_asn(provider) for provider in providers] rows.add(f"{normalize_asn(customer)}|{','.join(sorted(set(provider_asns), key=lambda value: int(value[2:])))}") return rows def normalize_vrps_from_csv(path: Path) -> set[str]: rows: set[str] = set() with path.open(newline="", encoding="utf-8", errors="replace") as handle: for row in csv.DictReader(handle): asn = row.get("ASN") or row.get("asn") or row.get("AS") prefix = row.get("IP Prefix") or row.get("prefix") or row.get("Prefix") max_length = row.get("Max Length") or row.get("maxLength") or row.get("max_length") if asn and prefix and max_length: rows.add(f"{normalize_asn(asn)}|{prefix}|{int(max_length)}") return rows def normalize_routinator(input_path: Path) -> tuple[set[str], set[str]]: return normalize_vrps_from_json(input_path), normalize_vaps_from_json(input_path) def normalize_rpki_client(input_path: Path) -> tuple[set[str], set[str]]: json_path = input_path / "json" if input_path.is_dir() else input_path csv_path = input_path / "csv" if input_path.is_dir() else input_path vrps: set[str] = set() vaps: set[str] = set() if json_path.is_file(): vrps |= normalize_vrps_from_json(json_path) vaps |= normalize_vaps_from_json(json_path) if csv_path.is_file(): vrps |= normalize_vrps_from_csv(csv_path) return vrps, vaps def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--kind", choices=["routinator", "rpki-client"], required=True) parser.add_argument("--input", type=Path, required=True) parser.add_argument("--vrps-out", type=Path, required=True) parser.add_argument("--vaps-out", type=Path, required=True) args = parser.parse_args() if args.kind == "routinator": vrps, vaps = normalize_routinator(args.input) else: vrps, vaps = normalize_rpki_client(args.input) write_set(args.vrps_out, vrps) write_set(args.vaps_out, vaps) print(json.dumps({"vrps": len(vrps), "vaps": len(vaps)}, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())