rpki/scripts/local_repo_replay/normalize_rp_outputs.py

124 lines
4.5 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import json
from pathlib import Path
from typing import Any
def normalize_asn(value: Any) -> str:
text = str(value).strip().upper()
if text.startswith("AS"):
text = text[2:]
return f"AS{int(text)}"
def write_set(path: Path, rows: set[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(sorted(rows)) + ("\n" if rows else ""), encoding="utf-8")
def load_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def normalize_vrps_from_json(path: Path) -> set[str]:
data = load_json(path)
rows: set[str] = set()
objects: list[dict[str, Any]] = []
if isinstance(data, dict):
for key in ("roas", "routeOrigins", "valid_roas"):
value = data.get(key)
if isinstance(value, list):
objects.extend(item for item in value if isinstance(item, dict))
elif isinstance(data, list):
objects = [item for item in data if isinstance(item, dict)]
for item in objects:
asn = item.get("asn") or item.get("asID") or item.get("origin")
prefix = item.get("prefix")
max_length = item.get("maxLength") or item.get("max_length") or item.get("maxlen") or item.get("maxLengthPrefix")
if asn is None or prefix is None or max_length is None:
continue
rows.add(f"{normalize_asn(asn)}|{prefix}|{int(max_length)}")
return rows
def normalize_vaps_from_json(path: Path) -> set[str]:
data = load_json(path)
rows: set[str] = set()
objects: list[dict[str, Any]] = []
if isinstance(data, dict):
for key in ("aspas", "aspaAssertions", "vaps"):
value = data.get(key)
if isinstance(value, list):
objects.extend(item for item in value if isinstance(item, dict))
elif isinstance(data, list):
objects = [item for item in data if isinstance(item, dict)]
for item in objects:
customer = (
item.get("customer")
or item.get("customer_asid")
or item.get("customerASID")
or item.get("customerAsid")
or item.get("customerAsn")
)
providers = item.get("providers") or item.get("provider_asns") or item.get("providerASNs") or []
if customer is None:
continue
provider_asns = [normalize_asn(provider) for provider in providers]
rows.add(f"{normalize_asn(customer)}|{','.join(sorted(set(provider_asns), key=lambda value: int(value[2:])))}")
return rows
def normalize_vrps_from_csv(path: Path) -> set[str]:
rows: set[str] = set()
with path.open(newline="", encoding="utf-8", errors="replace") as handle:
for row in csv.DictReader(handle):
asn = row.get("ASN") or row.get("asn") or row.get("AS")
prefix = row.get("IP Prefix") or row.get("prefix") or row.get("Prefix")
max_length = row.get("Max Length") or row.get("maxLength") or row.get("max_length")
if asn and prefix and max_length:
rows.add(f"{normalize_asn(asn)}|{prefix}|{int(max_length)}")
return rows
def normalize_routinator(input_path: Path) -> tuple[set[str], set[str]]:
return normalize_vrps_from_json(input_path), normalize_vaps_from_json(input_path)
def normalize_rpki_client(input_path: Path) -> tuple[set[str], set[str]]:
json_path = input_path / "json" if input_path.is_dir() else input_path
csv_path = input_path / "csv" if input_path.is_dir() else input_path
vrps: set[str] = set()
vaps: set[str] = set()
if json_path.is_file():
vrps |= normalize_vrps_from_json(json_path)
vaps |= normalize_vaps_from_json(json_path)
if csv_path.is_file():
vrps |= normalize_vrps_from_csv(csv_path)
return vrps, vaps
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--kind", choices=["routinator", "rpki-client"], required=True)
parser.add_argument("--input", type=Path, required=True)
parser.add_argument("--vrps-out", type=Path, required=True)
parser.add_argument("--vaps-out", type=Path, required=True)
args = parser.parse_args()
if args.kind == "routinator":
vrps, vaps = normalize_routinator(args.input)
else:
vrps, vaps = normalize_rpki_client(args.input)
write_set(args.vrps_out, vrps)
write_set(args.vaps_out, vaps)
print(json.dumps({"vrps": len(vrps), "vaps": len(vaps)}, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())