Compare commits
1 Commits
main
...
dev_1.2_xu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
251dea8e5e |
11
Cargo.toml
11
Cargo.toml
@ -13,3 +13,14 @@ url = "2.5.8"
|
||||
asn1-rs = "0.7.1"
|
||||
asn1-rs-derive = "0.6.0"
|
||||
asn1 = "0.23.0"
|
||||
arc-swap = "1.7.0"
|
||||
chrono = "0.4.44"
|
||||
bytes = "1.11.1"
|
||||
tokio = { version = "1.49.0", features = ["full"] }
|
||||
rand = "0.10.0"
|
||||
rocksdb = "0.21"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
anyhow = "1"
|
||||
bincode = "3.0.0"
|
||||
tracing = "0.1.44"
|
||||
236
specs/10_slurm.md
Normal file
236
specs/10_slurm.md
Normal file
@ -0,0 +1,236 @@
|
||||
# 10. SLURM(Simplified Local Internet Number Resource Management with the RPKI)
|
||||
|
||||
## 10.1 对象定位
|
||||
|
||||
SLURM是一个JSON文件,允许 RPKI 依赖方在本地“覆盖/修正/忽略”来自上游RPKI数据的内容,而不需要修改或伪造原始RPKI对象。
|
||||
|
||||
## 10.2 数据格式 (RFC 8416 §3)
|
||||
|
||||
### SLURM
|
||||
|
||||
SLURM是一个只包含一个JSON对象的文件。格式要求如下(RFC 8416 §3.2):
|
||||
|
||||
```text
|
||||
A SLURM file consists of a single JSON object containing the
|
||||
following members:
|
||||
o A "slurmVersion" member that MUST be set to 1, encoded as a number
|
||||
o A "validationOutputFilters" member (Section 3.3), whose value is
|
||||
an object. The object MUST contain exactly two members:
|
||||
* A "prefixFilters" member, whose value is described in
|
||||
Section 3.3.1.
|
||||
* A "bgpsecFilters" member, whose value is described in
|
||||
Section 3.3.2.
|
||||
o A "locallyAddedAssertions" member (Section 3.4), whose value is an
|
||||
object. The object MUST contain exactly two members:
|
||||
* A "prefixAssertions" member, whose value is described in
|
||||
Section 3.4.1.
|
||||
* A "bgpsecAssertions" member, whose value is described in
|
||||
Section 3.4.2.
|
||||
```
|
||||
|
||||
一个空的SLURM json结构体如下:
|
||||
|
||||
```json
|
||||
{
|
||||
"slurmVersion": 1,
|
||||
"validationOutputFilters": {
|
||||
"prefixFilters": [],
|
||||
"bgpsecFilters": []
|
||||
},
|
||||
"locallyAddedAssertions": {
|
||||
"prefixAssertions": [],
|
||||
"bgpsecAssertions": []
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### prefixFilters
|
||||
其中`prefixFilters`格式要求如下(RFC 8416 §3.3.1):
|
||||
|
||||
```text
|
||||
The above is expressed as a value of the "prefixFilters" member, as
|
||||
an array of zero or more objects. Each object MUST contain either 1)
|
||||
one of the following members or 2) one of each of the following
|
||||
members.
|
||||
o A "prefix" member, whose value is a string representing either an
|
||||
IPv4 prefix (see Section 3.1 of [RFC4632]) or an IPv6 prefix (see
|
||||
[RFC5952]).
|
||||
o An "asn" member, whose value is a number.
|
||||
In addition, each object MAY contain one optional "comment" member,
|
||||
whose value is a string.
|
||||
```
|
||||
示例:
|
||||
```json
|
||||
"prefixFilters": [
|
||||
{
|
||||
"prefix": "192.0.2.0/24",
|
||||
"comment": "All VRPs encompassed by prefix"
|
||||
},
|
||||
{
|
||||
"asn": 64496,
|
||||
"comment": "All VRPs matching ASN"
|
||||
},
|
||||
{
|
||||
"prefix": "198.51.100.0/24",
|
||||
"asn": 64497,
|
||||
"comment": "All VRPs encompassed by prefix, matching ASN"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
### bgpsecFilters
|
||||
`bgpsecFilters`格式要求如下(RFC 8416 §3.3.2)
|
||||
|
||||
```text
|
||||
The above is expressed as a value of the "bgpsecFilters" member, as
|
||||
an array of zero or more objects. Each object MUST contain one of
|
||||
either, or one each of both following members:
|
||||
o An "asn" member, whose value is a number
|
||||
o An "SKI" member, whose value is the Base64 encoding without
|
||||
trailing ’=’ (Section 5 of [RFC4648]) of the certificate’s Subject
|
||||
Key Identifier as described in Section 4.8.2 of [RFC6487]. (This
|
||||
is the value of the ASN.1 OCTET STRING without the ASN.1 tag or
|
||||
length fields.)
|
||||
In addition, each object MAY contain one optional "comment" member,
|
||||
whose value is a string.
|
||||
```
|
||||
|
||||
示例:
|
||||
```json
|
||||
"bgpsecFilters": [
|
||||
{
|
||||
"asn": 64496,
|
||||
"comment": "All keys for ASN"
|
||||
},
|
||||
{
|
||||
"SKI": "<Base 64 of some SKI>",
|
||||
"comment": "Key matching Router SKI"
|
||||
},
|
||||
{
|
||||
"asn": 64497,
|
||||
"SKI": "<Base 64 of some SKI>",
|
||||
"comment": "Key for ASN 64497 matching Router SKI"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
### prefixAssertions
|
||||
`prefixAssertions`格式要求如下(RFC 8416 §3.4.1)
|
||||
```text
|
||||
The above is expressed as a value of the "prefixAssertions" member,
|
||||
as an array of zero or more objects. Each object MUST contain one of
|
||||
each of the following members:
|
||||
o A "prefix" member, whose value is a string representing either an
|
||||
IPv4 prefix (see Section 3.1 of [RFC4632]) or an IPv6 prefix (see
|
||||
[RFC5952]).
|
||||
o An "asn" member, whose value is a number.
|
||||
In addition, each object MAY contain one of each of the following
|
||||
members:
|
||||
o A "maxPrefixLength" member, whose value is a number.
|
||||
o A "comment" member, whose value is a string.
|
||||
```
|
||||
|
||||
示例:
|
||||
```json
|
||||
"prefixAssertions": [
|
||||
{
|
||||
"asn": 64496,
|
||||
"prefix": "198.51.100.0/24",
|
||||
"comment": "My other important route"
|
||||
},
|
||||
{
|
||||
"asn": 64496,
|
||||
"prefix": "2001:DB8::/32",
|
||||
"maxPrefixLength": 48,
|
||||
"comment": "My other important de-aggregated routes"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
### bgpsecAssertions
|
||||
`bgpsecAssertions`格式要求如下(RFC 8416 §3.4.2)
|
||||
```text
|
||||
The above is expressed as a value of the "bgpsecAssertions" member,
|
||||
as an array of zero or more objects. Each object MUST contain one
|
||||
each of all of the following members:
|
||||
o An "asn" member, whose value is a number.
|
||||
o An "SKI" member, whose value is the Base64 encoding without
|
||||
trailing ’=’ (Section 5 of [RFC4648]) of the certificate’s Subject
|
||||
Key Identifier as described in Section 4.8.2 of [RFC6487] (This is
|
||||
the value of the ASN.1 OCTET STRING without the ASN.1 tag or
|
||||
length fields.)
|
||||
o A "routerPublicKey" member, whose value is the Base64 encoding
|
||||
without trailing ’=’ (Section 5 of [RFC4648]) of the equivalent to
|
||||
the subjectPublicKeyInfo value of the router certificate’s public
|
||||
key, as described in [RFC8208]. This is the full ASN.1 DER
|
||||
encoding of the subjectPublicKeyInfo, including the ASN.1 tag and
|
||||
length values of the subjectPublicKeyInfo SEQUENCE.
|
||||
```
|
||||
示例:
|
||||
```json
|
||||
"bgpsecAssertions": [
|
||||
{
|
||||
"asn": 64496,
|
||||
"SKI": "<some base64 SKI>",
|
||||
"routerPublicKey": "<some base64 public key>",
|
||||
"comment": "My known key for my important ASN"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## 10.3 抽象数据结构
|
||||
|
||||
### SLURM
|
||||
| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
|
||||
|---------------------------|------------------------|---------|---------|---------------|
|
||||
| slurm_version | number | SLURM版本 | 版本必须为1 | RFC 8416 §3.2 |
|
||||
| validation_output_filters | ValidationOutputFilter | 过滤条件 | | |
|
||||
| locally_added_assertions | LocallyAddedAssertions | 本地添加断言 | | |
|
||||
|
||||
### ValidationOutputFilter
|
||||
| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
|
||||
|----------------|-------------------|-----------|---------|---------------|
|
||||
| prefix_filters | Vec<PrefixFilter> | 前缀过滤 | 可以为空数组 | RFC 8416 §3.3 |
|
||||
| bgpsec_filters | Vec<BgpsecFilter> | BGPsec过滤 | 可以为空数组 | RFC 8416 §3.3 |
|
||||
|
||||
### LocallyAddedAssertions
|
||||
| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
|
||||
|-------------------|----------------------|-----------|---------|---------------|
|
||||
| prefix_assertions | Vec<PrefixAssertion> | 前缀断言 | 可以为空数组 | RFC 8416 §3.4 |
|
||||
| bgpsec_assertions | Vec<BgpsecAssertion> | BGPsec断言 | 可以为空数组 | RFC 8416 §3.4 |
|
||||
|
||||
### PrefixFilter
|
||||
| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
|
||||
|---------|--------|------|--------------------------------|-----------------|
|
||||
| prefix | string | 前缀 | IPv4前缀或IPv6前缀,prefix和asn至少存在一个 | RFC 8416 §3.3.1 |
|
||||
| asn | number | ASN | prefix和asn至少存在一个 | RFC 8416 §3.3.1 |
|
||||
| comment | string | 备注说明 | 可选字段 | RFC 8416 §3.3.1 |
|
||||
|
||||
### BgpsecFilter
|
||||
| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
|
||||
|---------|--------|------|------------------|------------------|
|
||||
| asn | number | ASN | prefix和asn至少存在一个 | RFC 8416 §3.3.1 |
|
||||
| ski | u8 | | 证书的SKI | RFC 8416 §3.3.1 |
|
||||
| comment | string | 备注说明 | 可选字段 | RFC 8416 §3.3.1 |
|
||||
|
||||
### PrefixAssertion
|
||||
| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
|
||||
|-------------------|--------|--------|---------------|-----------------|
|
||||
| prefix | string | 前缀 | IPv4前缀或IPv6前缀 | RFC 8416 §3.4.1 |
|
||||
| asn | number | ASN | | RFC 8416 §3.4.1 |
|
||||
| max_prefix_length | number | 最大前缀长度 | 可选字段 | RFC 8416 §3.4.1 |
|
||||
| comment | string | 备注说明 | 可选字段 | RFC 8416 §3.4.1 |
|
||||
|
||||
|
||||
### BgpsecAssertion
|
||||
| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
|
||||
|-------------------|--------|--------|------------------|-----------------|
|
||||
| asn | number | ASN | prefix和asn至少存在一个 | RFC 8416 §3.4.2 |
|
||||
| ski | u8 | | 证书的SKI | RFC 8416 §3.4.2 |
|
||||
| router_public_key | u8 | 证书的SKI | | RFC 8416 §3.4.2 |
|
||||
| comment | string | 备注说明 | 可选字段 | RFC 8416 §3.4.2 |
|
||||
|
||||
> 注:BGPsec部分可以在第一版考虑先留空
|
||||
|
||||
## 10.4 规则
|
||||
|
||||
65
specs/11_rtr.md
Normal file
65
specs/11_rtr.md
Normal file
@ -0,0 +1,65 @@
|
||||
# 11. RTR (The Resource Public Key Infrastructure (RPKI) to Router Protocol)
|
||||
|
||||
## 11.1 Cache Server
|
||||
|
||||
### 11.1.1 功能需求
|
||||
|
||||
- 支持Full Sync(Reset Query)
|
||||
- 支持Incremental Sync(Serial Query)
|
||||
- 支持多客户端并发
|
||||
- 支持Serial递增
|
||||
- 保留一定数量的delta
|
||||
- 支持原子更新
|
||||
|
||||
### 11.1.2 架构设计
|
||||
采用一级缓存+二级缓存并存的方式。
|
||||
|
||||

|
||||
|
||||
其中,一级缓存为运行时缓存,主要职责:
|
||||
- 存储当前完整的snapshot
|
||||
- 历史Delta队列管理
|
||||
- Serial管理
|
||||
- RTR查询响应
|
||||
|
||||
二级缓存为持久化缓存,主要职责:
|
||||
- snapshot持久化
|
||||
- 缓存重启后的快速恢复(snapshot和serial)
|
||||
- 不参与实时查询
|
||||
- 异步写入
|
||||
|
||||
### 11.1.3 核心数据结构设计
|
||||
|
||||
#### 11.1.3.1 总cache
|
||||
```rust
|
||||
struct RtrCache {
|
||||
serial: AtomicU32,
|
||||
snapshot: ArcSwap<Snapshot>,
|
||||
deltas: RwLock<VecDeque<Arc<Delta>>>,
|
||||
max_delta: usize,
|
||||
}
|
||||
```
|
||||
|
||||
#### 11.1.3.2 Snapshot
|
||||
```rust
|
||||
struct Snapshot {
|
||||
origins: Vec<RouteOrigin>,
|
||||
router_keys: Vec<RouterKey>,
|
||||
aspas: Vec<Aspa>,
|
||||
created_at: Instant,
|
||||
}
|
||||
```
|
||||
|
||||
#### 11.1.3.3 Delta
|
||||
```rust
|
||||
struct Delta {
|
||||
serial: u32,
|
||||
announced: Vec<Payload>,
|
||||
withdrawn: Vec<Payload>,
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## 11.2 Transport
|
||||
|
||||
初版实现RTR over TLS(可外网)和RTR over TCP(内网)两种方式。
|
||||
BIN
specs/img/img.png
Normal file
BIN
specs/img/img.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 14 KiB |
@ -2,5 +2,5 @@ pub mod crl;
|
||||
mod rc;
|
||||
mod tal;
|
||||
mod ta;
|
||||
mod resources;
|
||||
pub mod resources;
|
||||
mod oids;
|
||||
|
||||
1278
src/data_model/rc.rs
1278
src/data_model/rc.rs
File diff suppressed because it is too large
Load Diff
@ -54,7 +54,7 @@ impl ASRange {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Default)]
|
||||
pub struct Asn(u32);
|
||||
|
||||
impl Asn {
|
||||
|
||||
@ -42,6 +42,30 @@ pub struct IPAddressRange {
|
||||
pub max: IPAddress,
|
||||
}
|
||||
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||
pub struct IPAddress(u128);
|
||||
|
||||
impl IPAddress {
|
||||
pub fn to_ipv4(self) -> Option<Ipv4Addr> {
|
||||
if self.0 <= u32::MAX as u128 {
|
||||
Some(Ipv4Addr::from(self.0 as u32))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_ipv6(self) -> Ipv6Addr {
|
||||
Ipv6Addr::from(self.0)
|
||||
}
|
||||
|
||||
pub fn is_ipv4(self) -> bool {
|
||||
self.0 <= u32::MAX as u128
|
||||
}
|
||||
|
||||
pub fn as_u128(self) -> u128 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1 +1,3 @@
|
||||
pub mod data_model;
|
||||
mod slurm;
|
||||
mod rtr;
|
||||
|
||||
538
src/rtr/cache.rs
Normal file
538
src/rtr/cache.rs
Normal file
@ -0,0 +1,538 @@
|
||||
use std::collections::{BTreeSet, VecDeque};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::rtr::payload::{Aspa, Payload, RouteOrigin, RouterKey};
|
||||
use crate::rtr::store_db::RtrStore;
|
||||
|
||||
const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(600);
|
||||
const DEFAULT_EXPIRE_INTERVAL: Duration = Duration::from_secs(7200);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DualTime {
|
||||
instant: Instant,
|
||||
utc: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl DualTime {
|
||||
/// Create current time.
|
||||
pub fn now() -> Self {
|
||||
Self {
|
||||
instant: Instant::now(),
|
||||
utc: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get UTC time for logs.
|
||||
pub fn utc(&self) -> DateTime<Utc> {
|
||||
self.utc
|
||||
}
|
||||
|
||||
/// Elapsed duration since creation/reset.
|
||||
pub fn elapsed(&self) -> Duration {
|
||||
self.instant.elapsed()
|
||||
}
|
||||
|
||||
/// Whether duration is expired.
|
||||
pub fn is_expired(&self, duration: Duration) -> bool {
|
||||
self.elapsed() >= duration
|
||||
}
|
||||
|
||||
/// Reset to now.
|
||||
pub fn reset(&mut self) {
|
||||
self.instant = Instant::now();
|
||||
self.utc = Utc::now();
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for DualTime {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
self.utc.timestamp_millis().serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for DualTime {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let millis = i64::deserialize(deserializer)?;
|
||||
let naive = NaiveDateTime::from_timestamp_millis(millis)
|
||||
.ok_or_else(|| serde::de::Error::custom("invalid timestamp"))?;
|
||||
let utc = DateTime::<Utc>::from_utc(naive, Utc);
|
||||
Ok(Self {
|
||||
instant: Instant::now(),
|
||||
utc,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Snapshot {
|
||||
origins: BTreeSet<RouteOrigin>,
|
||||
router_keys: BTreeSet<RouterKey>,
|
||||
aspas: BTreeSet<Aspa>,
|
||||
created_at: DualTime,
|
||||
}
|
||||
|
||||
impl Snapshot {
|
||||
pub fn new(
|
||||
origins: BTreeSet<RouteOrigin>,
|
||||
router_keys: BTreeSet<RouterKey>,
|
||||
aspas: BTreeSet<Aspa>,
|
||||
) -> Self {
|
||||
Snapshot {
|
||||
origins,
|
||||
router_keys,
|
||||
aspas,
|
||||
created_at: DualTime::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn empty() -> Self {
|
||||
Self::new(BTreeSet::new(), BTreeSet::new(), BTreeSet::new())
|
||||
}
|
||||
|
||||
pub fn from_payloads(payloads: Vec<Payload>) -> Self {
|
||||
let mut origins = BTreeSet::new();
|
||||
let mut router_keys = BTreeSet::new();
|
||||
let mut aspas = BTreeSet::new();
|
||||
|
||||
for p in payloads {
|
||||
match p {
|
||||
Payload::RouteOrigin(o) => {
|
||||
origins.insert(o);
|
||||
}
|
||||
Payload::RouterKey(k) => {
|
||||
router_keys.insert(k);
|
||||
}
|
||||
Payload::Aspa(a) => {
|
||||
aspas.insert(a);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Snapshot {
|
||||
origins,
|
||||
router_keys,
|
||||
aspas,
|
||||
created_at: DualTime::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn diff(&self, new_snapshot: &Snapshot) -> (Vec<Payload>, Vec<Payload>) {
|
||||
let mut announced = Vec::new();
|
||||
let mut withdrawn = Vec::new();
|
||||
|
||||
for origin in new_snapshot.origins.difference(&self.origins) {
|
||||
announced.push(Payload::RouteOrigin(origin.clone()));
|
||||
}
|
||||
for origin in self.origins.difference(&new_snapshot.origins) {
|
||||
withdrawn.push(Payload::RouteOrigin(origin.clone()));
|
||||
}
|
||||
|
||||
for key in new_snapshot.router_keys.difference(&self.router_keys) {
|
||||
announced.push(Payload::RouterKey(key.clone()));
|
||||
}
|
||||
for key in self.router_keys.difference(&new_snapshot.router_keys) {
|
||||
withdrawn.push(Payload::RouterKey(key.clone()));
|
||||
}
|
||||
|
||||
for aspa in new_snapshot.aspas.difference(&self.aspas) {
|
||||
announced.push(Payload::Aspa(aspa.clone()));
|
||||
}
|
||||
for aspa in self.aspas.difference(&new_snapshot.aspas) {
|
||||
withdrawn.push(Payload::Aspa(aspa.clone()));
|
||||
}
|
||||
|
||||
(announced, withdrawn)
|
||||
}
|
||||
|
||||
pub fn created_at(&self) -> DualTime {
|
||||
self.created_at.clone()
|
||||
}
|
||||
|
||||
pub fn payloads(&self) -> Vec<Payload> {
|
||||
let mut v = Vec::with_capacity(
|
||||
self.origins.len() + self.router_keys.len() + self.aspas.len(),
|
||||
);
|
||||
|
||||
v.extend(self.origins.iter().cloned().map(Payload::RouteOrigin));
|
||||
v.extend(self.router_keys.iter().cloned().map(Payload::RouterKey));
|
||||
v.extend(self.aspas.iter().cloned().map(Payload::Aspa));
|
||||
|
||||
v
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Delta {
|
||||
serial: u32,
|
||||
announced: Vec<Payload>,
|
||||
withdrawn: Vec<Payload>,
|
||||
created_at: DualTime,
|
||||
}
|
||||
|
||||
impl Delta {
|
||||
pub fn new(serial: u32, announced: Vec<Payload>, withdrawn: Vec<Payload>) -> Self {
|
||||
Delta {
|
||||
serial,
|
||||
announced,
|
||||
withdrawn,
|
||||
created_at: DualTime::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serial(&self) -> u32 {
|
||||
self.serial
|
||||
}
|
||||
|
||||
pub fn announced(&self) -> &[Payload] {
|
||||
&self.announced
|
||||
}
|
||||
|
||||
pub fn withdrawn(&self) -> &[Payload] {
|
||||
&self.withdrawn
|
||||
}
|
||||
|
||||
pub fn created_at(self) -> DualTime {
|
||||
self.created_at
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RtrCache {
|
||||
// Session ID created at cache startup.
|
||||
session_id: u16,
|
||||
// Current serial.
|
||||
pub serial: u32,
|
||||
// Full snapshot.
|
||||
pub snapshot: Snapshot,
|
||||
// Delta window.
|
||||
deltas: VecDeque<Arc<Delta>>,
|
||||
// Max number of deltas to keep.
|
||||
max_delta: u8,
|
||||
// Refresh interval.
|
||||
refresh_interval: Duration,
|
||||
// Last update begin time.
|
||||
last_update_begin: DualTime,
|
||||
// Last update end time.
|
||||
last_update_end: DualTime,
|
||||
// Cache created time.
|
||||
created_at: DualTime,
|
||||
}
|
||||
|
||||
impl Default for RtrCache {
|
||||
fn default() -> Self {
|
||||
let now = DualTime::now();
|
||||
Self {
|
||||
session_id: rand::random(),
|
||||
serial: 0,
|
||||
snapshot: Snapshot::empty(),
|
||||
deltas: VecDeque::with_capacity(100),
|
||||
max_delta: 100,
|
||||
refresh_interval: Duration::from_secs(600),
|
||||
last_update_begin: now.clone(),
|
||||
last_update_end: now.clone(),
|
||||
created_at: now,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RtrCacheBuilder {
|
||||
session_id: Option<u16>,
|
||||
max_delta: Option<u8>,
|
||||
refresh_interval: Option<Duration>,
|
||||
serial: Option<u32>,
|
||||
snapshot: Option<Snapshot>,
|
||||
created_at: Option<DualTime>,
|
||||
}
|
||||
|
||||
impl RtrCacheBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
session_id: None,
|
||||
max_delta: None,
|
||||
refresh_interval: None,
|
||||
serial: None,
|
||||
snapshot: None,
|
||||
created_at: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn session_id(mut self, v: u16) -> Self {
|
||||
self.session_id = Some(v);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn max_delta(mut self, v: u8) -> Self {
|
||||
self.max_delta = Some(v);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn refresh_interval(mut self, v: Duration) -> Self {
|
||||
self.refresh_interval = Some(v);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn serial(mut self, v: u32) -> Self {
|
||||
self.serial = Some(v);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn snapshot(mut self, v: Snapshot) -> Self {
|
||||
self.snapshot = Some(v);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn created_at(mut self, v: DualTime) -> Self {
|
||||
self.created_at = Some(v);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RtrCache {
|
||||
let now = DualTime::now();
|
||||
let max_delta = self.max_delta.unwrap_or(100);
|
||||
let refresh_interval = self.refresh_interval.unwrap_or(Duration::from_secs(600));
|
||||
let snapshot = self.snapshot.unwrap_or_else(Snapshot::empty);
|
||||
let serial = self.serial.unwrap_or(0);
|
||||
let created_at = self.created_at.unwrap_or_else(|| now.clone());
|
||||
let session_id = self.session_id.unwrap_or_else(rand::random);
|
||||
|
||||
RtrCache {
|
||||
session_id,
|
||||
serial,
|
||||
snapshot,
|
||||
deltas: VecDeque::with_capacity(max_delta.into()),
|
||||
max_delta,
|
||||
refresh_interval,
|
||||
last_update_begin: now.clone(),
|
||||
last_update_end: now,
|
||||
created_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RtrCache {
|
||||
/// Initialize cache from DB if possible; otherwise from file loader.
|
||||
pub fn init(
|
||||
self,
|
||||
store: &RtrStore,
|
||||
max_delta: u8,
|
||||
refresh_interval: Duration,
|
||||
file_loader: impl Fn() -> anyhow::Result<Vec<Payload>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let snapshot = store.get_snapshot()?;
|
||||
let session_id = store.get_session_id()?;
|
||||
let serial = store.get_serial()?;
|
||||
|
||||
if let (Some(snapshot), Some(session_id), Some(serial)) =
|
||||
(snapshot, session_id, serial)
|
||||
{
|
||||
let mut cache = RtrCacheBuilder::new()
|
||||
.session_id(session_id)
|
||||
.max_delta(max_delta)
|
||||
.refresh_interval(refresh_interval)
|
||||
.serial(serial)
|
||||
.snapshot(snapshot)
|
||||
.build();
|
||||
|
||||
if let Some((min_serial, _max_serial)) = store.get_delta_window()? {
|
||||
let deltas = store.load_deltas_since(min_serial.wrapping_sub(1))?;
|
||||
for delta in deltas {
|
||||
cache.push_delta(Arc::new(delta));
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(cache);
|
||||
}
|
||||
|
||||
let payloads = file_loader()?;
|
||||
let snapshot = Snapshot::from_payloads(payloads);
|
||||
let serial = 1;
|
||||
let session_id: u16 = rand::random();
|
||||
|
||||
let store = store.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = store.save_snapshot_and_meta(&snapshot, session_id, serial) {
|
||||
tracing::error!("persist failed: {:?}", e);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(RtrCacheBuilder::new()
|
||||
.session_id(session_id)
|
||||
.max_delta(max_delta)
|
||||
.refresh_interval(refresh_interval)
|
||||
.serial(serial)
|
||||
.snapshot(snapshot)
|
||||
.build())
|
||||
}
|
||||
|
||||
fn next_serial(&mut self) -> u32 {
|
||||
self.serial = self.serial.wrapping_add(1);
|
||||
self.serial
|
||||
}
|
||||
|
||||
fn push_delta(&mut self, delta: Arc<Delta>) {
|
||||
if self.deltas.len() >= self.max_delta as usize {
|
||||
self.deltas.pop_front();
|
||||
}
|
||||
self.deltas.push_back(delta);
|
||||
}
|
||||
|
||||
fn replace_snapshot(&mut self, snapshot: Snapshot) {
|
||||
self.snapshot = snapshot;
|
||||
}
|
||||
|
||||
fn delta_window(&self) -> Option<(u32, u32)> {
|
||||
let min = self.deltas.front().map(|d| d.serial());
|
||||
let max = self.deltas.back().map(|d| d.serial());
|
||||
match (min, max) {
|
||||
(Some(min), Some(max)) => Some((min, max)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn store_sync(
|
||||
&mut self,
|
||||
store: &RtrStore,
|
||||
snapshot: Snapshot,
|
||||
serial: u32,
|
||||
session_id: u16,
|
||||
delta: Arc<Delta>,
|
||||
) {
|
||||
let window = self.delta_window();
|
||||
let store = store.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = store.save_delta(&delta) {
|
||||
tracing::error!("persist delta failed: {:?}", e);
|
||||
}
|
||||
if let Err(e) = store.save_snapshot_and_meta(&snapshot, session_id, serial) {
|
||||
tracing::error!("persist snapshot/meta failed: {:?}", e);
|
||||
}
|
||||
if let Some((min_serial, max_serial)) = window {
|
||||
if let Err(e) = store.set_delta_window(min_serial, max_serial) {
|
||||
tracing::error!("persist delta window failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Update cache.
|
||||
pub fn update(
|
||||
&mut self,
|
||||
new_payloads: Vec<Payload>,
|
||||
store: &RtrStore,
|
||||
) -> anyhow::Result<()> {
|
||||
let new_snapshot = Snapshot::from_payloads(new_payloads);
|
||||
let (announced, withdrawn) = self.snapshot.diff(&new_snapshot);
|
||||
|
||||
if announced.is_empty() && withdrawn.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let new_serial = self.next_serial();
|
||||
let delta = Arc::new(Delta::new(new_serial, announced, withdrawn));
|
||||
|
||||
self.push_delta(delta.clone());
|
||||
self.replace_snapshot(new_snapshot.clone());
|
||||
self.last_update_end = DualTime::now();
|
||||
|
||||
self.store_sync(store, new_snapshot, new_serial, self.session_id, delta);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn session_id(&self) -> u16 {
|
||||
self.session_id
|
||||
}
|
||||
|
||||
pub fn snapshot(&self) -> Snapshot {
|
||||
self.snapshot.clone()
|
||||
}
|
||||
|
||||
pub fn serial(&self) -> u32 {
|
||||
self.serial
|
||||
}
|
||||
|
||||
pub fn refresh_interval(&self) -> Duration {
|
||||
self.refresh_interval
|
||||
}
|
||||
|
||||
pub fn retry_interval(&self) -> Duration {
|
||||
DEFAULT_RETRY_INTERVAL
|
||||
}
|
||||
|
||||
pub fn expire_interval(&self) -> Duration {
|
||||
DEFAULT_EXPIRE_INTERVAL
|
||||
}
|
||||
|
||||
pub fn current_snapshot(&self) -> (&Snapshot, u32, u16) {
|
||||
(&self.snapshot, self.serial, self.session_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl RtrCache {
|
||||
pub fn get_deltas_since(
|
||||
&self,
|
||||
client_session: u16,
|
||||
client_serial: u32,
|
||||
) -> SerialResult {
|
||||
if client_session != self.session_id {
|
||||
return SerialResult::ResetRequired;
|
||||
}
|
||||
|
||||
if client_serial == self.serial {
|
||||
return SerialResult::UpToDate;
|
||||
}
|
||||
|
||||
if self.deltas.is_empty() {
|
||||
return SerialResult::ResetRequired;
|
||||
}
|
||||
|
||||
let oldest_serial = self.deltas.front().unwrap().serial;
|
||||
let newest_serial = self.deltas.back().unwrap().serial;
|
||||
|
||||
let min_supported = oldest_serial.wrapping_sub(1);
|
||||
if client_serial < min_supported {
|
||||
return SerialResult::ResetRequired;
|
||||
}
|
||||
|
||||
if client_serial > self.serial {
|
||||
return SerialResult::ResetRequired;
|
||||
}
|
||||
|
||||
let mut result = Vec::new();
|
||||
for delta in &self.deltas {
|
||||
if delta.serial > client_serial {
|
||||
result.push(delta.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(first) = result.first() {
|
||||
if first.serial != client_serial.wrapping_add(1) {
|
||||
return SerialResult::ResetRequired;
|
||||
}
|
||||
} else {
|
||||
return SerialResult::UpToDate;
|
||||
}
|
||||
|
||||
SerialResult::Deltas(result)
|
||||
}
|
||||
}
|
||||
|
||||
pub enum SerialResult {
|
||||
/// Client is up to date.
|
||||
UpToDate,
|
||||
/// Return applicable deltas.
|
||||
Deltas(Vec<Arc<Delta>>),
|
||||
/// Delta window cannot cover; reset required.
|
||||
ResetRequired,
|
||||
}
|
||||
98
src/rtr/error_type.rs
Normal file
98
src/rtr/error_type.rs
Normal file
@ -0,0 +1,98 @@
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt;
|
||||
|
||||
#[repr(u16)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ErrorCode {
|
||||
CorruptData = 0,
|
||||
InternalError = 1,
|
||||
NoDataAvailable = 2,
|
||||
InvalidRequest = 3,
|
||||
UnsupportedProtocolVersion = 4,
|
||||
UnsupportedPduType = 5,
|
||||
WithdrawalOfUnknownRecord = 6,
|
||||
DuplicateAnnouncement = 7,
|
||||
UnexpectedProtocolVersion = 8,
|
||||
AspaProviderListError = 9,
|
||||
TransportFailed = 10,
|
||||
OrderingError = 11,
|
||||
}
|
||||
|
||||
impl ErrorCode {
|
||||
|
||||
#[inline]
|
||||
pub fn as_u16(self) -> u16 {
|
||||
self as u16
|
||||
}
|
||||
|
||||
pub fn description(self) -> &'static str {
|
||||
match self {
|
||||
ErrorCode::CorruptData =>
|
||||
"Corrupt Data",
|
||||
|
||||
ErrorCode::InternalError =>
|
||||
"Internal Error",
|
||||
|
||||
ErrorCode::NoDataAvailable =>
|
||||
"No Data Available",
|
||||
|
||||
ErrorCode::InvalidRequest =>
|
||||
"Invalid Request",
|
||||
|
||||
ErrorCode::UnsupportedProtocolVersion =>
|
||||
"Unsupported Protocol Version",
|
||||
|
||||
ErrorCode::UnsupportedPduType =>
|
||||
"Unsupported PDU Type",
|
||||
|
||||
ErrorCode::WithdrawalOfUnknownRecord =>
|
||||
"Withdrawal of Unknown Record",
|
||||
|
||||
ErrorCode::DuplicateAnnouncement =>
|
||||
"Duplicate Announcement Received",
|
||||
|
||||
ErrorCode::UnexpectedProtocolVersion =>
|
||||
"Unexpected Protocol Version",
|
||||
|
||||
ErrorCode::AspaProviderListError =>
|
||||
"ASPA Provider List Error",
|
||||
|
||||
ErrorCode::TransportFailed =>
|
||||
"Transport Failed",
|
||||
|
||||
ErrorCode::OrderingError =>
|
||||
"Ordering Error",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<u16> for ErrorCode {
|
||||
type Error = ();
|
||||
|
||||
fn try_from(value: u16) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
0 => Ok(ErrorCode::CorruptData),
|
||||
1 => Ok(ErrorCode::InternalError),
|
||||
2 => Ok(ErrorCode::NoDataAvailable),
|
||||
3 => Ok(ErrorCode::InvalidRequest),
|
||||
4 => Ok(ErrorCode::UnsupportedProtocolVersion),
|
||||
5 => Ok(ErrorCode::UnsupportedPduType),
|
||||
6 => Ok(ErrorCode::WithdrawalOfUnknownRecord),
|
||||
7 => Ok(ErrorCode::DuplicateAnnouncement),
|
||||
8 => Ok(ErrorCode::UnexpectedProtocolVersion),
|
||||
9 => Ok(ErrorCode::AspaProviderListError),
|
||||
10 => Ok(ErrorCode::TransportFailed),
|
||||
11 => Ok(ErrorCode::OrderingError),
|
||||
_ => Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ErrorCode {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{} ({})",
|
||||
self.description(),
|
||||
*self as u16
|
||||
)
|
||||
}
|
||||
}
|
||||
7
src/rtr/mod.rs
Normal file
7
src/rtr/mod.rs
Normal file
@ -0,0 +1,7 @@
|
||||
pub mod pdu;
|
||||
pub mod cache;
|
||||
pub mod payload;
|
||||
mod store_db;
|
||||
mod session;
|
||||
mod error_type;
|
||||
mod state;
|
||||
106
src/rtr/payload.rs
Normal file
106
src/rtr/payload.rs
Normal file
@ -0,0 +1,106 @@
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use asn1_rs::nom::character::streaming::u64;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use crate::data_model::resources::as_resources::Asn;
|
||||
use crate::data_model::resources::ip_resources::IPAddressPrefix;
|
||||
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct Ski([u8; 20]);
|
||||
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
pub struct RouteOrigin {
|
||||
prefix: IPAddressPrefix,
|
||||
max_length: u8,
|
||||
asn: Asn,
|
||||
}
|
||||
|
||||
impl RouteOrigin {
|
||||
pub fn prefix(&self) -> &IPAddressPrefix {
|
||||
&self.prefix
|
||||
}
|
||||
|
||||
pub fn max_length(&self) -> u8 {
|
||||
self.max_length
|
||||
}
|
||||
|
||||
pub fn asn(&self) -> Asn {
|
||||
self.asn
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
pub struct RouterKey {
|
||||
subject_key_identifier: Ski,
|
||||
asn: Asn,
|
||||
subject_public_key_info: Arc<[u8]>,
|
||||
}
|
||||
|
||||
impl RouterKey {
|
||||
pub fn asn(&self) -> Asn {
|
||||
self.asn
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
pub struct Aspa {
|
||||
customer_asn: Asn,
|
||||
provider_asns: Vec<Asn>,
|
||||
}
|
||||
|
||||
impl Aspa {
|
||||
pub fn customer_asn(&self) -> Asn {
|
||||
self.customer_asn
|
||||
}
|
||||
|
||||
pub fn provider_asns(&self) -> &[Asn] {
|
||||
&self.provider_asns
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
|
||||
pub enum Payload {
|
||||
/// A route origin authorisation.
|
||||
RouteOrigin(RouteOrigin),
|
||||
|
||||
/// A BGPsec router key.
|
||||
RouterKey(RouterKey),
|
||||
|
||||
/// An ASPA unit.
|
||||
Aspa(Aspa),
|
||||
}
|
||||
|
||||
|
||||
// Timing
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct Timing {
|
||||
/// The number of seconds until a client should refresh its data.
|
||||
pub refresh: u32,
|
||||
|
||||
/// The number of seconds a client whould wait before retrying to connect.
|
||||
pub retry: u32,
|
||||
|
||||
/// The number of secionds before data expires if not refreshed.
|
||||
pub expire: u32
|
||||
}
|
||||
|
||||
impl Timing {
|
||||
pub fn refresh(self) -> Duration {
|
||||
Duration::from_secs(u64::from(self.refresh))
|
||||
}
|
||||
|
||||
pub fn retry(self) -> Duration {
|
||||
Duration::from_secs(u64::from(self.retry))
|
||||
}
|
||||
|
||||
pub fn expire(self) -> Duration {
|
||||
Duration::from_secs(u64::from(self.expire))
|
||||
}
|
||||
|
||||
}
|
||||
868
src/rtr/pdu.rs
Normal file
868
src/rtr/pdu.rs
Normal file
@ -0,0 +1,868 @@
|
||||
use std::{cmp, mem};
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
use std::sync::Arc;
|
||||
use crate::data_model::resources::as_resources::Asn;
|
||||
use crate::rtr::payload::{Ski, Timing};
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use tokio::io::{AsyncWrite};
|
||||
use anyhow::Result;
|
||||
|
||||
use std::slice;
|
||||
use anyhow::bail;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
pub const HEADER_LEN: u32 = 8;
|
||||
pub const MAX_PDU_LEN: u32 = 65535;
|
||||
pub const IPV4_PREFIX_LEN: u32 = 20;
|
||||
pub const IPV6_PREFIX_LEN: u32 = 32;
|
||||
pub const END_OF_DATA_V0_LEN: u32 = 12;
|
||||
pub const END_OF_DATA_V1_LEN: u32 = 24;
|
||||
pub const ZERO_16: u16 = 0;
|
||||
pub const ZERO_8: u8 = 0;
|
||||
|
||||
macro_rules! common {
|
||||
( $type:ident ) => {
|
||||
#[allow(dead_code)]
|
||||
impl $type {
|
||||
/// Writes a value to a writer.
|
||||
pub async fn write<A: AsyncWrite + Unpin>(
|
||||
&self,
|
||||
a: &mut A
|
||||
) -> Result<(), io::Error> {
|
||||
a.write_all(self.as_ref()).await
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for $type {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
unsafe {
|
||||
slice::from_raw_parts(
|
||||
self as *const Self as *const u8,
|
||||
mem::size_of::<Self>()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8]> for $type {
|
||||
fn as_mut(&mut self) -> &mut [u8] {
|
||||
unsafe {
|
||||
slice::from_raw_parts_mut(
|
||||
self as *mut Self as *mut u8,
|
||||
mem::size_of::<Self>()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! concrete {
|
||||
( $type:ident ) => {
|
||||
common!($type);
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl $type {
|
||||
/// Returns the value of the version field of the header.
|
||||
pub fn version(&self) -> u8 {
|
||||
self.header.version()
|
||||
}
|
||||
|
||||
/// Returns the value of the session field of the header.
|
||||
///
|
||||
/// Note that this field is used for other purposes in some PDU
|
||||
/// types.
|
||||
pub fn session_id(&self) -> u16 {
|
||||
self.header.session_id()
|
||||
}
|
||||
|
||||
/// Returns the PDU size.
|
||||
///
|
||||
/// The size is returned as a `u32` since that type is used in
|
||||
/// the header.
|
||||
pub fn size() -> u32 {
|
||||
mem::size_of::<Self>() as u32
|
||||
}
|
||||
|
||||
/// Reads a value from a reader.
|
||||
///
|
||||
/// If a value with a different PDU type is received, returns an
|
||||
/// error.
|
||||
pub async fn read<Sock: AsyncRead + Unpin>(
|
||||
sock: &mut Sock
|
||||
) -> Result<Self, io::Error> {
|
||||
let mut res = Self::default();
|
||||
sock.read_exact(res.header.as_mut()).await?;
|
||||
if res.header.pdu() != Self::PDU {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
concat!(
|
||||
"PDU type mismatch when expecting ",
|
||||
stringify!($type)
|
||||
)
|
||||
))
|
||||
}
|
||||
if res.header.length() as usize != res.as_ref().len() {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
concat!(
|
||||
"invalid length for ",
|
||||
stringify!($type)
|
||||
)
|
||||
))
|
||||
}
|
||||
sock.read_exact(&mut res.as_mut()[Header::LEN..]).await?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Tries to read a value from a reader.
|
||||
///
|
||||
/// If a different PDU type is received, returns the header as
|
||||
/// the error case of the ok case.
|
||||
pub async fn try_read<Sock: AsyncRead + Unpin>(
|
||||
sock: &mut Sock
|
||||
) -> Result<Result<Self, Header>, io::Error> {
|
||||
let mut res = Self::default();
|
||||
sock.read_exact(res.header.as_mut()).await?;
|
||||
if res.header.pdu() == Error::PDU {
|
||||
// Since we should drop the session after an error, we
|
||||
// can safely ignore all the rest of the error for now.
|
||||
return Ok(Err(res.header))
|
||||
}
|
||||
if res.header.pdu() != Self::PDU {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
concat!(
|
||||
"PDU type mismatch when expecting ",
|
||||
stringify!($type)
|
||||
)
|
||||
))
|
||||
}
|
||||
if res.header.length() as usize != res.as_ref().len() {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
concat!(
|
||||
"invalid length for ",
|
||||
stringify!($type)
|
||||
)
|
||||
))
|
||||
}
|
||||
sock.read_exact(&mut res.as_mut()[Header::LEN..]).await?;
|
||||
Ok(Ok(res))
|
||||
}
|
||||
|
||||
/// Reads only the payload part of a value from a reader.
|
||||
///
|
||||
/// Assuming that the header was already read and is passed via
|
||||
/// `header`, the function reads the rest of the PUD from the
|
||||
/// reader and returns the complete value.
|
||||
pub async fn read_payload<Sock: AsyncRead + Unpin>(
|
||||
header: Header, sock: &mut Sock
|
||||
) -> Result<Self, io::Error> {
|
||||
if header.length() as usize != mem::size_of::<Self>() {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
concat!(
|
||||
"invalid length for ",
|
||||
stringify!($type),
|
||||
" PDU"
|
||||
)
|
||||
))
|
||||
}
|
||||
let mut res = Self::default();
|
||||
sock.read_exact(&mut res.as_mut()[Header::LEN..]).await?;
|
||||
res.header = header;
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 所有PDU公共头部信息
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct Header {
|
||||
version: u8,
|
||||
pdu: u8,
|
||||
session_id: u16,
|
||||
length: u32,
|
||||
}
|
||||
|
||||
impl Header {
|
||||
|
||||
const LEN: usize = mem::size_of::<Self>();
|
||||
pub fn new(version: u8, pdu: u8, session: u16, length: u32) -> Self {
|
||||
Header {
|
||||
version,
|
||||
pdu,
|
||||
session_id: session.to_be(),
|
||||
length: length.to_be(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn read(sock: &mut TcpStream) -> Result<Self> {
|
||||
let mut buf = [0u8; HEADER_LEN];
|
||||
|
||||
// 1. 精确读取 8 字节
|
||||
sock.read_exact(&mut buf).await?;
|
||||
|
||||
// 2. 手动解析(大端)
|
||||
let version = buf[0];
|
||||
let pdu = buf[1];
|
||||
let reserved = u16::from_be_bytes([buf[2], buf[3]]);
|
||||
let length = u32::from_be_bytes([
|
||||
buf[4], buf[5], buf[6], buf[7],
|
||||
]);
|
||||
|
||||
// 3. 基础合法性校验
|
||||
|
||||
if length < HEADER_LEN{
|
||||
bail!("Invalid PDU length");
|
||||
}
|
||||
|
||||
// 限制最大长度
|
||||
if length > MAX_PDU_LEN {
|
||||
bail!("PDU too large");
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
version,
|
||||
pdu,
|
||||
session_id: reserved,
|
||||
length,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn version(self) -> u8{self.version}
|
||||
|
||||
pub fn pdu(self) -> u8{self.pdu}
|
||||
|
||||
pub fn session_id(self) -> u16{u16::from_be(self.session_id)}
|
||||
|
||||
pub fn length(self) -> u32{u32::from_be(self.length)}
|
||||
|
||||
pub fn pdu_len(self) -> Result<usize, io::Error> {
|
||||
usize::try_from(self.length()).map_err(|_| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"PDU too large for this system to handle",
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
common!(Header);
|
||||
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct HeaderWithFlags {
|
||||
version: u8,
|
||||
pdu: u8,
|
||||
flags: u8,
|
||||
zero: u8,
|
||||
length: u32,
|
||||
}
|
||||
|
||||
impl HeaderWithFlags {
|
||||
pub fn new(version: u8, pdu: u8, flags: Flags, length: u32) -> Self {
|
||||
HeaderWithFlags {
|
||||
version,
|
||||
pdu,
|
||||
flags: flags.into_u8(),
|
||||
zero: ZERO_8,
|
||||
length: length.to_be(),
|
||||
}
|
||||
}
|
||||
pub async fn read(sock: &mut TcpStream) -> Result<Self> {
|
||||
let mut buf = [0u8; HEADER_LEN];
|
||||
|
||||
// 1. 精确读取 8 字节
|
||||
sock.read_exact(&mut buf).await?;
|
||||
|
||||
// 2. 手动解析(大端)
|
||||
let version = buf[0];
|
||||
let pdu = buf[1];
|
||||
let flags = buf[2];
|
||||
let zero = buf[3];
|
||||
let length = u32::from_be_bytes([
|
||||
buf[4], buf[5], buf[6], buf[7],
|
||||
]);
|
||||
|
||||
// 3. 基础合法性校验
|
||||
if length < HEADER_LEN{
|
||||
bail!("Invalid PDU length");
|
||||
}
|
||||
|
||||
// 限制最大长度
|
||||
if length > MAX_PDU_LEN {
|
||||
bail!("PDU too large");
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
version,
|
||||
pdu,
|
||||
flags,
|
||||
zero,
|
||||
length,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn version(self) -> u8{self.version}
|
||||
|
||||
pub fn pdu(self) -> u8{self.pdu}
|
||||
|
||||
pub fn flags(self) -> Flags{Flags(self.flags)}
|
||||
|
||||
pub fn length(self) -> u32{u32::from_be(self.length)}
|
||||
}
|
||||
|
||||
|
||||
// Serial Notify
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct SerialNotify {
|
||||
header: Header,
|
||||
serial_number: u32,
|
||||
}
|
||||
|
||||
impl SerialNotify {
|
||||
pub const PDU: u8 = 0;
|
||||
|
||||
pub fn new(version: u8, session_id: u16, serial_number: u32) -> Self {
|
||||
SerialNotify{
|
||||
header: Header::new(version, Self::PDU, session_id, Self::size()),
|
||||
serial_number: serial_number.to_be()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
concrete!(SerialNotify);
|
||||
|
||||
|
||||
// Serial Query
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct SerialQuery {
|
||||
header: Header,
|
||||
serial_number: u32,
|
||||
}
|
||||
|
||||
impl SerialQuery {
|
||||
pub const PDU: u8 = 1;
|
||||
|
||||
pub fn new(version: u8, session_id: u16, serial_number: u32) -> Self {
|
||||
SerialQuery{
|
||||
header: Header::new(version, Self::PDU, session_id, Self::size()),
|
||||
serial_number: serial_number.to_be()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serial_number(self) -> u32 {
|
||||
self.serial_number
|
||||
}
|
||||
}
|
||||
|
||||
concrete!(SerialQuery);
|
||||
|
||||
|
||||
// Reset Query
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct ResetQuery {
|
||||
header: Header
|
||||
}
|
||||
|
||||
impl ResetQuery {
|
||||
pub const PDU: u8 = 2;
|
||||
|
||||
pub fn new(version: u8) -> Self {
|
||||
ResetQuery {
|
||||
header: Header::new(version, Self::PDU, ZERO_16, HEADER_LEN),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
concrete!(ResetQuery);
|
||||
|
||||
|
||||
// Cache Response
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct CacheResponse {
|
||||
header: Header,
|
||||
}
|
||||
|
||||
impl CacheResponse {
|
||||
pub const PDU: u8 = 3;
|
||||
|
||||
pub fn new(version: u8, session_id: u16) -> Self {
|
||||
CacheResponse {
|
||||
header: Header::new(version, Self::PDU, session_id, HEADER_LEN),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
concrete!(CacheResponse);
|
||||
|
||||
|
||||
// Flags
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct Flags(u8);
|
||||
|
||||
impl Flags {
|
||||
pub fn new(raw: u8) -> Self {
|
||||
Self(raw)
|
||||
}
|
||||
|
||||
pub fn is_announce(self) -> bool {
|
||||
self.0 & 0x01 == 1
|
||||
}
|
||||
|
||||
pub fn is_withdraw(self) -> bool {
|
||||
!self.is_announce()
|
||||
}
|
||||
|
||||
pub fn into_u8(self) -> u8 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
// IPv4 Prefix
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct IPv4Prefix {
|
||||
header: Header,
|
||||
|
||||
flags: Flags,
|
||||
prefix_len: u8,
|
||||
max_len: u8,
|
||||
zero: u8,
|
||||
prefix: u32,
|
||||
asn: u32
|
||||
}
|
||||
|
||||
impl IPv4Prefix {
|
||||
pub const PDU: u8 = 4;
|
||||
pub fn new(
|
||||
version: u8,
|
||||
flags: Flags,
|
||||
prefix_len: u8,
|
||||
max_len: u8,
|
||||
prefix: Ipv4Addr,
|
||||
asn: Asn
|
||||
) -> Self {
|
||||
IPv4Prefix {
|
||||
header: Header::new(version, Self::PDU, ZERO_16, IPV4_PREFIX_LEN),
|
||||
flags,
|
||||
prefix_len,
|
||||
max_len,
|
||||
zero: ZERO_8,
|
||||
prefix: u32::from(prefix).to_be(),
|
||||
asn: asn.into_u32().to_be(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn flag(self) -> Flags{self.flags}
|
||||
|
||||
pub fn prefix_len(self) -> u8{self.prefix_len}
|
||||
pub fn max_len(self) -> u8{self.max_len}
|
||||
pub fn prefix(self) -> Ipv4Addr{u32::from_be(self.prefix).into()}
|
||||
pub fn asn(self) -> Asn{u32::from_be(self.asn).into()}
|
||||
}
|
||||
|
||||
concrete!(IPv4Prefix);
|
||||
|
||||
// IPv6 Prefix
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct IPv6Prefix {
|
||||
header: Header,
|
||||
|
||||
flags: Flags,
|
||||
prefix_len: u8,
|
||||
max_len: u8,
|
||||
zero: u8,
|
||||
prefix: u128,
|
||||
asn: u32
|
||||
}
|
||||
|
||||
impl IPv6Prefix {
|
||||
pub const PDU: u8 = 6;
|
||||
pub fn new(
|
||||
version: u8,
|
||||
flags: Flags,
|
||||
prefix_len: u8,
|
||||
max_len: u8,
|
||||
prefix: Ipv6Addr,
|
||||
asn: Asn
|
||||
) -> Self {
|
||||
IPv6Prefix {
|
||||
header: Header::new(version, Self::PDU, ZERO_16, IPV6_PREFIX_LEN),
|
||||
flags,
|
||||
prefix_len,
|
||||
max_len,
|
||||
zero: ZERO_8,
|
||||
prefix: u128::from(prefix).to_be(),
|
||||
asn: asn.into_u32().to_be(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn flag(self) -> Flags{self.flags}
|
||||
|
||||
pub fn prefix_len(self) -> u8{self.prefix_len}
|
||||
pub fn max_len(self) -> u8{self.max_len}
|
||||
pub fn prefix(self) -> Ipv6Addr{u128::from_be(self.prefix).into()}
|
||||
pub fn asn(self) -> Asn{u32::from_be(self.asn).into()}
|
||||
}
|
||||
|
||||
concrete!(IPv6Prefix);
|
||||
|
||||
|
||||
// End of Data
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
|
||||
pub enum EndOfData {
|
||||
V0(EndOfDataV0),
|
||||
V1(EndOfDataV1),
|
||||
}
|
||||
|
||||
impl EndOfData {
|
||||
pub fn new(version: u8, session_id: u16, serial_number: u32, timing: Timing) -> Self {
|
||||
if version == 0 {
|
||||
EndOfData::V0(EndOfDataV0::new(version, session_id, serial_number))
|
||||
}
|
||||
else {
|
||||
EndOfData::V1(EndOfDataV1::new(version, session_id, serial_number, timing))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct EndOfDataV0 {
|
||||
header: Header,
|
||||
|
||||
serial_number: u32,
|
||||
}
|
||||
|
||||
impl EndOfDataV0 {
|
||||
pub const PDU: u8 = 7;
|
||||
|
||||
pub fn new(version: u8, session_id: u16, serial_number: u32) -> Self {
|
||||
EndOfDataV0 {
|
||||
header: Header::new(version, Self::PDU, session_id, END_OF_DATA_V0_LEN),
|
||||
serial_number: serial_number.to_be(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serial_number(self) -> u32{u32::from_be(self.serial_number)}
|
||||
}
|
||||
concrete!(EndOfDataV0);
|
||||
|
||||
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct EndOfDataV1 {
|
||||
header: Header,
|
||||
|
||||
serial_number: u32,
|
||||
refresh_interval: u32,
|
||||
retry_interval: u32,
|
||||
expire_interval: u32,
|
||||
|
||||
}
|
||||
|
||||
impl EndOfDataV1 {
|
||||
pub const PDU: u8 = 7;
|
||||
|
||||
pub fn new(version: u8, session_id: u16, serial_number: u32, timing: Timing) -> Self {
|
||||
EndOfDataV1 {
|
||||
header: Header::new(version, Self::PDU, session_id, END_OF_DATA_V1_LEN),
|
||||
serial_number: serial_number.to_be(),
|
||||
refresh_interval: timing.refresh.to_be(),
|
||||
retry_interval: timing.retry.to_be(),
|
||||
expire_interval: timing.expire.to_be(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serial_number(self) -> u32{u32::from_be(self.serial_number)}
|
||||
|
||||
pub fn timing(self) -> Timing{
|
||||
Timing {
|
||||
refresh: u32::from_be(self.refresh_interval),
|
||||
retry: u32::from_be(self.retry_interval),
|
||||
expire: u32::from_be(self.expire_interval),
|
||||
}
|
||||
}
|
||||
}
|
||||
concrete!(EndOfDataV1);
|
||||
|
||||
// Cache Reset
|
||||
#[repr(C, packed)]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct CacheReset {
|
||||
header: Header,
|
||||
}
|
||||
|
||||
impl CacheReset {
|
||||
pub const PDU: u8 = 8;
|
||||
|
||||
pub fn new(version: u8) -> Self{
|
||||
CacheReset {
|
||||
header: Header::new(version, Self::PDU, ZERO_16, HEADER_LEN)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
concrete!(CacheReset);
|
||||
|
||||
|
||||
// Error Report
|
||||
#[derive(Clone, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct ErrorReport {
|
||||
octets: Vec<u8>,
|
||||
}
|
||||
|
||||
|
||||
impl ErrorReport {
|
||||
/// The PDU type of an error PDU.
|
||||
pub const PDU: u8 = 10;
|
||||
|
||||
/// Creates a new error PDU from components.
|
||||
pub fn new(
|
||||
version: u8,
|
||||
error_code: u16,
|
||||
pdu: impl AsRef<[u8]>,
|
||||
text: impl AsRef<[u8]>,
|
||||
) -> Self {
|
||||
let pdu = pdu.as_ref();
|
||||
let text = text.as_ref();
|
||||
|
||||
let size =
|
||||
mem::size_of::<Header>()
|
||||
+ 2 * mem::size_of::<u32>()
|
||||
+ pdu.len() + text.len()
|
||||
;
|
||||
let header = Header::new(
|
||||
version, 10, error_code, u32::try_from(size).unwrap()
|
||||
);
|
||||
|
||||
let mut octets = Vec::with_capacity(size);
|
||||
octets.extend_from_slice(header.as_ref());
|
||||
octets.extend_from_slice(
|
||||
u32::try_from(pdu.len()).unwrap().to_be_bytes().as_ref()
|
||||
);
|
||||
octets.extend_from_slice(pdu);
|
||||
octets.extend_from_slice(
|
||||
u32::try_from(text.len()).unwrap().to_be_bytes().as_ref()
|
||||
);
|
||||
octets.extend_from_slice(text);
|
||||
|
||||
ErrorReport { octets }
|
||||
}
|
||||
|
||||
/// Skips over the payload of the error PDU.
|
||||
pub async fn skip_payload<Sock: AsyncRead + Unpin>(
|
||||
header: Header, sock: &mut Sock
|
||||
) -> Result<(), io::Error> {
|
||||
let Some(mut remaining) = header.pdu_len()?.checked_sub(
|
||||
mem::size_of::<Header>()
|
||||
) else {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"PDU size smaller than header size",
|
||||
))
|
||||
};
|
||||
|
||||
let mut buf = [0u8; 1024];
|
||||
while remaining > 0 {
|
||||
let read_len = cmp::min(remaining, mem::size_of_val(&buf));
|
||||
let read = sock.read(
|
||||
// Safety: We limited the length to the buffer size.
|
||||
unsafe { buf.get_unchecked_mut(..read_len) }
|
||||
).await?;
|
||||
remaining -= read;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes the PUD to a writer.
|
||||
pub async fn write<A: AsyncWrite + Unpin>(
|
||||
&self, a: &mut A
|
||||
) -> Result<(), io::Error> {
|
||||
a.write_all(self.as_ref()).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// TODO: 补全
|
||||
// Router Key
|
||||
#[derive(Clone, Debug, Default, Eq, Hash, PartialEq)]
|
||||
pub struct RouterKey {
|
||||
header: HeaderWithFlags,
|
||||
|
||||
flags: Flags,
|
||||
|
||||
ski: Ski,
|
||||
asn: Asn,
|
||||
subject_public_key_info: Arc<[u8]>,
|
||||
}
|
||||
|
||||
impl RouterKey {
|
||||
|
||||
pub const PDU: u8 = 9;
|
||||
|
||||
pub async fn write<A: AsyncWrite + Unpin>(
|
||||
&self,
|
||||
w: &mut A,
|
||||
) -> Result<(), io::Error> {
|
||||
|
||||
let length = HEADER_LEN
|
||||
+ 1 // flags
|
||||
// + self.ski.as_ref().len()
|
||||
+ 4 // ASN
|
||||
+ self.subject_public_key_info.len() as u32;
|
||||
|
||||
let header = HeaderWithFlags::new(
|
||||
self.header.version(),
|
||||
Self::PDU,
|
||||
self.flags,
|
||||
length,
|
||||
);
|
||||
|
||||
w.write_all(&[
|
||||
header.version(),
|
||||
header.pdu(),
|
||||
header.flags().into_u8(),
|
||||
ZERO_8,
|
||||
]).await?;
|
||||
|
||||
w.write_all(&length.to_be_bytes()).await?;
|
||||
// w.write_all(self.ski.as_ref()).await?;
|
||||
w.write_all(&self.asn.into_u32().to_be_bytes()).await?;
|
||||
w.write_all(&self.subject_public_key_info).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// ASPA
|
||||
pub struct Aspa{
|
||||
header: HeaderWithFlags,
|
||||
|
||||
customer_asn: u32,
|
||||
provider_asns: Vec<u32>
|
||||
}
|
||||
|
||||
impl Aspa {
|
||||
pub const PDU: u8 = 11;
|
||||
|
||||
pub async fn write<A: AsyncWrite + Unpin>(
|
||||
&self,
|
||||
w: &mut A,
|
||||
) -> Result<(), io::Error> {
|
||||
|
||||
let length = HEADER_LEN
|
||||
+ 1
|
||||
+ 4
|
||||
+ (self.provider_asns.len() as u32 * 4);
|
||||
|
||||
let header = HeaderWithFlags::new(
|
||||
self.header.version(),
|
||||
Self::PDU,
|
||||
Flags::new(self.header.flags),
|
||||
length,
|
||||
);
|
||||
|
||||
w.write_all(&[
|
||||
header.version(),
|
||||
header.pdu(),
|
||||
header.flags().into_u8(),
|
||||
ZERO_8,
|
||||
]).await?;
|
||||
|
||||
w.write_all(&length.to_be_bytes()).await?;
|
||||
w.write_all(&self.customer_asn.to_be_bytes()).await?;
|
||||
|
||||
for asn in &self.provider_asns {
|
||||
w.write_all(&asn.to_be_bytes()).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
//--- AsRef and AsMut
|
||||
impl AsRef<[u8]> for ErrorReport {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.octets.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8]> for ErrorReport {
|
||||
fn as_mut(&mut self) -> &mut [u8] {
|
||||
self.octets.as_mut()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::io::duplex;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_serial_notify_roundtrip() {
|
||||
let (mut client, mut server) = duplex(1024);
|
||||
|
||||
let original = SerialNotify::new(1, 42, 100);
|
||||
|
||||
// 写入
|
||||
tokio::spawn(async move {
|
||||
original.write(&mut client).await.unwrap();
|
||||
});
|
||||
|
||||
// 读取
|
||||
let decoded = SerialNotify::read(&mut server).await.unwrap();
|
||||
|
||||
assert_eq!(decoded.version(), 1);
|
||||
assert_eq!(decoded.session_id(), 42);
|
||||
assert_eq!(decoded.serial_number, 100u32.to_be());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ipv4_prefix_roundtrip() {
|
||||
use std::net::Ipv4Addr;
|
||||
|
||||
let (mut client, mut server) = duplex(1024);
|
||||
|
||||
let prefix = IPv4Prefix::new(
|
||||
1,
|
||||
Flags::new(1),
|
||||
24,
|
||||
24,
|
||||
Ipv4Addr::new(192,168,0,0),
|
||||
65000u32.into(),
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
prefix.write(&mut client).await.unwrap();
|
||||
});
|
||||
|
||||
let decoded = IPv4Prefix::read(&mut server).await.unwrap();
|
||||
|
||||
assert_eq!(decoded.prefix_len(), 24);
|
||||
assert_eq!(decoded.max_len(), 24);
|
||||
assert_eq!(decoded.prefix(), Ipv4Addr::new(192,168,0,0));
|
||||
assert_eq!(decoded.flag().is_announce(), true);
|
||||
}
|
||||
}
|
||||
280
src/rtr/session.rs
Normal file
280
src/rtr/session.rs
Normal file
@ -0,0 +1,280 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use tokio::io;
|
||||
use tokio::net::TcpStream;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::rtr::cache::{Delta, RtrCache, SerialResult};
|
||||
use crate::rtr::error_type::ErrorCode;
|
||||
use crate::rtr::payload::{Payload, RouteOrigin, Timing};
|
||||
use crate::rtr::pdu::{
|
||||
CacheReset, CacheResponse, EndOfData, ErrorReport, Flags, Header, IPv4Prefix, IPv6Prefix,
|
||||
ResetQuery, SerialQuery,
|
||||
};
|
||||
|
||||
const SUPPORTED_MAX_VERSION: u8 = 2;
|
||||
const SUPPORTED_MIN_VERSION: u8 = 0;
|
||||
|
||||
const ANNOUNCE_FLAG: u8 = 1;
|
||||
const WITHDRAW_FLAG: u8 = 0;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum SessionState {
|
||||
Connected,
|
||||
Established,
|
||||
Closed,
|
||||
}
|
||||
|
||||
pub struct RtrSession {
|
||||
cache: Arc<RtrCache>,
|
||||
version: Option<u8>,
|
||||
stream: TcpStream,
|
||||
state: SessionState,
|
||||
}
|
||||
|
||||
impl RtrSession {
|
||||
pub fn new(cache: Arc<RtrCache>, stream: TcpStream) -> Self {
|
||||
Self {
|
||||
cache,
|
||||
version: None,
|
||||
stream,
|
||||
state: SessionState::Connected,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(mut self) -> Result<()> {
|
||||
loop {
|
||||
let header = match Header::read(&mut self.stream).await {
|
||||
Ok(h) => h,
|
||||
Err(_) => return Ok(()),
|
||||
};
|
||||
|
||||
if self.version.is_none() {
|
||||
self.negotiate_version(header.version()).await?;
|
||||
} else if header.version() != self.version.unwrap() {
|
||||
self.send_unsupported_version(self.version.unwrap()).await?;
|
||||
bail!("version changed within session");
|
||||
}
|
||||
|
||||
match header.pdu() {
|
||||
ResetQuery::PDU => {
|
||||
let _ = ResetQuery::read_payload(header, &mut self.stream).await?;
|
||||
self.handle_reset_query().await?;
|
||||
}
|
||||
SerialQuery::PDU => {
|
||||
let query = SerialQuery::read_payload(header, &mut self.stream).await?;
|
||||
let session_id = query.session_id();
|
||||
let serial = u32::from_be(query.serial_number());
|
||||
self.handle_serial(session_id, serial).await?;
|
||||
}
|
||||
ErrorReport::PDU => {
|
||||
let _ = ErrorReport::skip_payload(header, &mut self.stream).await;
|
||||
self.state = SessionState::Closed;
|
||||
return Ok(());
|
||||
}
|
||||
_ => {
|
||||
self.send_error(header.version(), ErrorCode::UnsupportedPduType, Some(&header), &[])
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn negotiate_version(&mut self, router_version: u8) -> io::Result<u8> {
|
||||
if router_version < SUPPORTED_MIN_VERSION {
|
||||
self.send_unsupported_version(SUPPORTED_MIN_VERSION).await?;
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"unsupported lower protocol version",
|
||||
));
|
||||
}
|
||||
|
||||
if router_version > SUPPORTED_MAX_VERSION {
|
||||
self.send_unsupported_version(SUPPORTED_MAX_VERSION).await?;
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"router version higher than cache",
|
||||
));
|
||||
}
|
||||
|
||||
self.version = Some(router_version);
|
||||
Ok(router_version)
|
||||
}
|
||||
|
||||
async fn send_unsupported_version(&mut self, cache_version: u8) -> io::Result<()> {
|
||||
ErrorReport::new(
|
||||
cache_version,
|
||||
ErrorCode::UnsupportedProtocolVersion.as_u16(),
|
||||
&[],
|
||||
ErrorCode::UnsupportedProtocolVersion.description(),
|
||||
)
|
||||
.write(&mut self.stream)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_reset_query(&mut self) -> Result<()> {
|
||||
self.state = SessionState::Established;
|
||||
|
||||
let snapshot = self.cache.snapshot();
|
||||
self.write_cache_response().await?;
|
||||
self.send_payloads(snapshot.payloads(), true).await?;
|
||||
self.write_end_of_data(self.cache.session_id(), self.cache.serial())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_serial(&mut self, client_session: u16, client_serial: u32) -> Result<()> {
|
||||
let current_session = self.cache.session_id();
|
||||
let current_serial = self.cache.serial();
|
||||
|
||||
match self.cache.get_deltas_since(client_session, client_serial) {
|
||||
SerialResult::ResetRequired => {
|
||||
self.write_cache_reset().await?;
|
||||
return Ok(());
|
||||
}
|
||||
SerialResult::UpToDate => {
|
||||
self.write_end_of_data(current_session, current_serial)
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
SerialResult::Deltas(deltas) => {
|
||||
self.write_cache_response().await?;
|
||||
for delta in deltas {
|
||||
self.send_delta(&delta).await?;
|
||||
}
|
||||
self.write_end_of_data(current_session, current_serial)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
self.state = SessionState::Established;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_cache_response(&mut self) -> Result<()> {
|
||||
let version = self.version.ok_or_else(|| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "version not negotiated")
|
||||
})?;
|
||||
|
||||
CacheResponse::new(version, self.cache.session_id())
|
||||
.write(&mut self.stream)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_cache_reset(&mut self) -> Result<()> {
|
||||
let version = self.version.ok_or_else(|| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "version not negotiated")
|
||||
})?;
|
||||
|
||||
CacheReset::new(version).write(&mut self.stream).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_end_of_data(&mut self, session_id: u16, serial: u32) -> Result<()> {
|
||||
let version = self.version.ok_or_else(|| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "version not negotiated")
|
||||
})?;
|
||||
|
||||
let timing = self.timing();
|
||||
let end = EndOfData::new(version, session_id, serial, timing);
|
||||
match end {
|
||||
EndOfData::V0(pdu) => pdu.write(&mut self.stream).await?,
|
||||
EndOfData::V1(pdu) => pdu.write(&mut self.stream).await?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_payloads(&mut self, payloads: Vec<Payload>, announce: bool) -> Result<()> {
|
||||
for payload in payloads {
|
||||
self.send_payload(&payload, announce).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_delta(&mut self, delta: &Arc<Delta>) -> Result<()> {
|
||||
for payload in delta.withdrawn() {
|
||||
self.send_payload(payload, false).await?;
|
||||
}
|
||||
for payload in delta.announced() {
|
||||
self.send_payload(payload, true).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_payload(&mut self, payload: &Payload, announce: bool) -> Result<()> {
|
||||
match payload {
|
||||
Payload::RouteOrigin(origin) => {
|
||||
self.send_route_origin(origin, announce).await?;
|
||||
}
|
||||
Payload::RouterKey(_) => {
|
||||
warn!("router key payload not supported yet");
|
||||
}
|
||||
Payload::Aspa(_) => {
|
||||
warn!("aspa payload not supported yet");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_route_origin(&mut self, origin: &RouteOrigin, announce: bool) -> Result<()> {
|
||||
let version = self.version.ok_or_else(|| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "version not negotiated")
|
||||
})?;
|
||||
|
||||
let flags = Flags::new(if announce {
|
||||
ANNOUNCE_FLAG
|
||||
} else {
|
||||
WITHDRAW_FLAG
|
||||
});
|
||||
|
||||
let prefix = origin.prefix();
|
||||
let prefix_len = prefix.prefix_length;
|
||||
let max_len = origin.max_length();
|
||||
|
||||
if let Some(v4) = prefix.address.to_ipv4() {
|
||||
IPv4Prefix::new(version, flags, prefix_len, max_len, v4, origin.asn())
|
||||
.write(&mut self.stream)
|
||||
.await?;
|
||||
} else {
|
||||
let v6 = prefix.address.to_ipv6();
|
||||
IPv6Prefix::new(version, flags, prefix_len, max_len, v6, origin.asn())
|
||||
.write(&mut self.stream)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_error(
|
||||
&mut self,
|
||||
version: u8,
|
||||
code: ErrorCode,
|
||||
offending_header: Option<&Header>,
|
||||
text: &[u8],
|
||||
) -> io::Result<()> {
|
||||
let offending = offending_header
|
||||
.map(|h| h.as_ref())
|
||||
.unwrap_or(&[]);
|
||||
|
||||
ErrorReport::new(version, code.as_u16(), offending, text)
|
||||
.write(&mut self.stream)
|
||||
.await
|
||||
}
|
||||
|
||||
fn timing(&self) -> Timing {
|
||||
let refresh = self.cache.refresh_interval().as_secs();
|
||||
let retry = self.cache.retry_interval().as_secs();
|
||||
let expire = self.cache.expire_interval().as_secs();
|
||||
|
||||
Timing {
|
||||
refresh: refresh.min(u32::MAX as u64) as u32,
|
||||
retry: retry.min(u32::MAX as u64) as u32,
|
||||
expire: expire.min(u32::MAX as u64) as u32,
|
||||
}
|
||||
}
|
||||
}
|
||||
17
src/rtr/state.rs
Normal file
17
src/rtr/state.rs
Normal file
@ -0,0 +1,17 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct State {
|
||||
session_id: u16,
|
||||
serial: u32,
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn session_id(self) -> u16 {
|
||||
self.session_id
|
||||
}
|
||||
|
||||
pub fn serial(self) -> u32 {
|
||||
self.serial
|
||||
}
|
||||
}
|
||||
300
src/rtr/store_db.rs
Normal file
300
src/rtr/store_db.rs
Normal file
@ -0,0 +1,300 @@
|
||||
use rocksdb::{ColumnFamilyDescriptor, DB, Direction, IteratorMode, Options, WriteBatch};
|
||||
use anyhow::{anyhow, Result};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::task;
|
||||
|
||||
use crate::rtr::cache::{Delta, Snapshot};
|
||||
use crate::rtr::state::State;
|
||||
|
||||
const CF_META: &str = "meta";
|
||||
const CF_SNAPSHOT: &str = "snapshot";
|
||||
const CF_DELTA: &str = "delta";
|
||||
|
||||
const META_STATE: &[u8] = b"state";
|
||||
const META_SESSION_ID: &[u8] = b"session_id";
|
||||
const META_SERIAL: &[u8] = b"serial";
|
||||
const META_DELTA_MIN: &[u8] = b"delta_min";
|
||||
const META_DELTA_MAX: &[u8] = b"delta_max";
|
||||
|
||||
const DELTA_KEY_PREFIX: u8 = b'd';
|
||||
|
||||
fn delta_key(serial: u32) -> [u8; 5] {
|
||||
let mut key = [0u8; 5];
|
||||
key[0] = DELTA_KEY_PREFIX;
|
||||
key[1..].copy_from_slice(&serial.to_be_bytes());
|
||||
key
|
||||
}
|
||||
|
||||
fn delta_key_serial(key: &[u8]) -> Option<u32> {
|
||||
if key.len() != 5 || key[0] != DELTA_KEY_PREFIX {
|
||||
return None;
|
||||
}
|
||||
let mut bytes = [0u8; 4];
|
||||
bytes.copy_from_slice(&key[1..]);
|
||||
Some(u32::from_be_bytes(bytes))
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RtrStore {
|
||||
db: Arc<DB>,
|
||||
}
|
||||
|
||||
impl RtrStore {
|
||||
/// Open or create DB with required column families.
|
||||
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
|
||||
let mut opts = Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.create_missing_column_families(true);
|
||||
|
||||
let cfs = vec![
|
||||
ColumnFamilyDescriptor::new(CF_META, Options::default()),
|
||||
ColumnFamilyDescriptor::new(CF_SNAPSHOT, Options::default()),
|
||||
ColumnFamilyDescriptor::new(CF_DELTA, Options::default()),
|
||||
];
|
||||
|
||||
let db = Arc::new(DB::open_cf_descriptors(&opts, path, cfs)?);
|
||||
|
||||
Ok(Self { db })
|
||||
}
|
||||
|
||||
/// Common serialize/put.
|
||||
fn put_cf<T: Serialize>(&self, cf: &str, key: &[u8], value: &T) -> Result<()> {
|
||||
let cf_handle = self.db.cf_handle(cf).ok_or_else(|| anyhow!("CF not found"))?;
|
||||
let data = serde_json::to_vec(value)?;
|
||||
self.db.put_cf(cf_handle, key, data)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Common get/deserialize.
|
||||
fn get_cf<T: DeserializeOwned>(&self, cf: &str, key: &[u8]) -> Result<Option<T>> {
|
||||
let cf_handle = self.db.cf_handle(cf).ok_or_else(|| anyhow!("CF not found"))?;
|
||||
if let Some(value) = self.db.get_cf(cf_handle, key)? {
|
||||
let obj = serde_json::from_slice(&value)?;
|
||||
Ok(Some(obj))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Common delete.
|
||||
fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> {
|
||||
let cf_handle = self.db.cf_handle(cf).ok_or_else(|| anyhow!("CF not found"))?;
|
||||
self.db.delete_cf(cf_handle, key)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ===============================
|
||||
// Meta/state
|
||||
// ===============================
|
||||
|
||||
pub fn set_state(&self, state: &State) -> Result<()> {
|
||||
self.put_cf(CF_META, META_STATE, &state)
|
||||
}
|
||||
|
||||
pub fn get_state(&self) -> Result<Option<State>> {
|
||||
self.get_cf(CF_META, META_STATE)
|
||||
}
|
||||
|
||||
pub fn set_meta(&self, meta: &State) -> Result<()> {
|
||||
self.set_state(meta)
|
||||
}
|
||||
|
||||
pub fn get_meta(&self) -> Result<Option<State>> {
|
||||
self.get_state()
|
||||
}
|
||||
|
||||
pub fn set_session_id(&self, session_id: u16) -> Result<()> {
|
||||
self.put_cf(CF_META, META_SESSION_ID, &session_id)
|
||||
}
|
||||
|
||||
pub fn get_session_id(&self) -> Result<Option<u16>> {
|
||||
self.get_cf(CF_META, META_SESSION_ID)
|
||||
}
|
||||
|
||||
pub fn set_serial(&self, serial: u32) -> Result<()> {
|
||||
self.put_cf(CF_META, META_SERIAL, &serial)
|
||||
}
|
||||
|
||||
pub fn get_serial(&self) -> Result<Option<u32>> {
|
||||
self.get_cf(CF_META, META_SERIAL)
|
||||
}
|
||||
|
||||
pub fn set_delta_window(&self, min_serial: u32, max_serial: u32) -> Result<()> {
|
||||
let meta_cf = self.db.cf_handle(CF_META).ok_or_else(|| anyhow!("CF_META not found"))?;
|
||||
let mut batch = WriteBatch::default();
|
||||
batch.put_cf(meta_cf, META_DELTA_MIN, serde_json::to_vec(&min_serial)?);
|
||||
batch.put_cf(meta_cf, META_DELTA_MAX, serde_json::to_vec(&max_serial)?);
|
||||
self.db.write(batch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_delta_window(&self) -> Result<Option<(u32, u32)>> {
|
||||
let min: Option<u32> = self.get_cf(CF_META, META_DELTA_MIN)?;
|
||||
let max: Option<u32> = self.get_cf(CF_META, META_DELTA_MAX)?;
|
||||
|
||||
match (min, max) {
|
||||
(Some(min), Some(max)) => Ok(Some((min, max))),
|
||||
(None, None) => Ok(None),
|
||||
_ => Err(anyhow!("Inconsistent DB state: delta window mismatch")),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_state(&self) -> Result<()> {
|
||||
self.delete_cf(CF_META, META_STATE)
|
||||
}
|
||||
|
||||
pub fn delete_serial(&self) -> Result<()> {
|
||||
self.delete_cf(CF_META, META_SERIAL)
|
||||
}
|
||||
|
||||
// ===============================
|
||||
// Snapshot
|
||||
// ===============================
|
||||
|
||||
pub fn save_snapshot(&self, snapshot: &Snapshot) -> Result<()> {
|
||||
let cf_handle = self.db.cf_handle(CF_SNAPSHOT).ok_or_else(|| anyhow!("CF_SNAPSHOT not found"))?;
|
||||
let mut batch = WriteBatch::default();
|
||||
let data = serde_json::to_vec(snapshot)?;
|
||||
batch.put_cf(cf_handle, b"current", data);
|
||||
self.db.write(batch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_snapshot(&self) -> Result<Option<Snapshot>> {
|
||||
self.get_cf(CF_SNAPSHOT, b"current")
|
||||
}
|
||||
|
||||
pub fn delete_snapshot(&self) -> Result<()> {
|
||||
self.delete_cf(CF_SNAPSHOT, b"current")
|
||||
}
|
||||
|
||||
pub fn save_snapshot_and_state(&self, snapshot: &Snapshot, state: &State) -> Result<()> {
|
||||
let snapshot_cf = self.db.cf_handle(CF_SNAPSHOT).ok_or_else(|| anyhow!("CF_SNAPSHOT not found"))?;
|
||||
let meta_cf = self.db.cf_handle(CF_META).ok_or_else(|| anyhow!("CF_META not found"))?;
|
||||
let mut batch = WriteBatch::default();
|
||||
|
||||
batch.put_cf(snapshot_cf, b"current", serde_json::to_vec(snapshot)?);
|
||||
batch.put_cf(meta_cf, META_STATE, serde_json::to_vec(state)?);
|
||||
batch.put_cf(
|
||||
meta_cf,
|
||||
META_SESSION_ID,
|
||||
serde_json::to_vec(&state.clone().session_id())?,
|
||||
);
|
||||
batch.put_cf(
|
||||
meta_cf,
|
||||
META_SERIAL,
|
||||
serde_json::to_vec(&state.clone().serial())?,
|
||||
);
|
||||
|
||||
self.db.write(batch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn save_snapshot_and_meta(
|
||||
&self,
|
||||
snapshot: &Snapshot,
|
||||
session_id: u16,
|
||||
serial: u32,
|
||||
) -> Result<()> {
|
||||
let mut batch = WriteBatch::default();
|
||||
let snapshot_cf = self.db.cf_handle(CF_SNAPSHOT).ok_or_else(|| anyhow!("CF_SNAPSHOT not found"))?;
|
||||
let meta_cf = self.db.cf_handle(CF_META).ok_or_else(|| anyhow!("CF_META not found"))?;
|
||||
|
||||
batch.put_cf(snapshot_cf, b"current", serde_json::to_vec(snapshot)?);
|
||||
batch.put_cf(meta_cf, META_SESSION_ID, serde_json::to_vec(&session_id)?);
|
||||
batch.put_cf(meta_cf, META_SERIAL, serde_json::to_vec(&serial)?);
|
||||
self.db.write(batch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn save_snapshot_and_serial(&self, snapshot: &Snapshot, serial: u32) -> Result<()> {
|
||||
let mut batch = WriteBatch::default();
|
||||
let snapshot_cf = self.db.cf_handle(CF_SNAPSHOT).ok_or_else(|| anyhow!("CF_SNAPSHOT not found"))?;
|
||||
let meta_cf = self.db.cf_handle(CF_META).ok_or_else(|| anyhow!("CF_META not found"))?;
|
||||
batch.put_cf(snapshot_cf, b"current", serde_json::to_vec(snapshot)?);
|
||||
batch.put_cf(meta_cf, META_SERIAL, serde_json::to_vec(&serial)?);
|
||||
self.db.write(batch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn save_snapshot_and_serial_async(
|
||||
self: Arc<Self>,
|
||||
snapshot: Snapshot,
|
||||
serial: u32,
|
||||
) -> Result<()> {
|
||||
let snapshot_bytes = serde_json::to_vec(&snapshot)?;
|
||||
let serial_bytes = serde_json::to_vec(&serial)?;
|
||||
|
||||
task::spawn_blocking(move || {
|
||||
let mut batch = WriteBatch::default();
|
||||
let snapshot_cf = self.db.cf_handle(CF_SNAPSHOT).ok_or_else(|| anyhow!("CF_SNAPSHOT not found"))?;
|
||||
let meta_cf = self.db.cf_handle(CF_META).ok_or_else(|| anyhow!("CF_META not found"))?;
|
||||
batch.put_cf(snapshot_cf, b"current", snapshot_bytes);
|
||||
batch.put_cf(meta_cf, META_SERIAL, serial_bytes);
|
||||
self.db.write(batch)?;
|
||||
Ok::<_, anyhow::Error>(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn load_snapshot_and_state(&self) -> Result<Option<(Snapshot, State)>> {
|
||||
let snapshot: Option<Snapshot> = self.get_snapshot()?;
|
||||
let state: Option<State> = self.get_state()?;
|
||||
match (snapshot, state) {
|
||||
(Some(snap), Some(state)) => Ok(Some((snap, state))),
|
||||
(None, None) => Ok(None),
|
||||
_ => Err(anyhow!("Inconsistent DB state: snapshot and state mismatch")),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load_snapshot_and_serial(&self) -> Result<Option<(Snapshot, u32)>> {
|
||||
let snapshot: Option<Snapshot> = self.get_snapshot()?;
|
||||
let serial: Option<u32> = self.get_serial()?;
|
||||
match (snapshot, serial) {
|
||||
(Some(snap), Some(serial)) => Ok(Some((snap, serial))),
|
||||
(None, None) => Ok(None),
|
||||
_ => Err(anyhow!("Inconsistent DB state: snapshot and serial mismatch")),
|
||||
}
|
||||
}
|
||||
|
||||
// ===============================
|
||||
// Delta
|
||||
// ===============================
|
||||
|
||||
pub fn save_delta(&self, delta: &Delta) -> Result<()> {
|
||||
self.put_cf(CF_DELTA, &delta_key(delta.serial()), delta)
|
||||
}
|
||||
|
||||
pub fn get_delta(&self, serial: u32) -> Result<Option<Delta>> {
|
||||
self.get_cf(CF_DELTA, &delta_key(serial))
|
||||
}
|
||||
|
||||
pub fn load_deltas_since(&self, serial: u32) -> Result<Vec<Delta>> {
|
||||
let cf_handle = self.db.cf_handle(CF_DELTA).ok_or_else(|| anyhow!("CF_DELTA not found"))?;
|
||||
let mut out = Vec::new();
|
||||
let start_key = delta_key(serial.wrapping_add(1));
|
||||
|
||||
let iter = self
|
||||
.db
|
||||
.iterator_cf(cf_handle, IteratorMode::From(&start_key, Direction::Forward));
|
||||
|
||||
for (key, value) in iter {
|
||||
let parsed = delta_key_serial(&key).ok_or_else(|| anyhow!("Invalid delta key"))?;
|
||||
if parsed <= serial {
|
||||
continue;
|
||||
}
|
||||
let delta: Delta = serde_json::from_slice(&value)?;
|
||||
out.push(delta);
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub fn delete_delta(&self, serial: u32) -> Result<()> {
|
||||
self.delete_cf(CF_DELTA, &delta_key(serial))
|
||||
}
|
||||
}
|
||||
1
src/slurm/mod.rs
Normal file
1
src/slurm/mod.rs
Normal file
@ -0,0 +1 @@
|
||||
mod slurm;
|
||||
80
src/slurm/slurm.rs
Normal file
80
src/slurm/slurm.rs
Normal file
@ -0,0 +1,80 @@
|
||||
use std::io;
|
||||
use crate::data_model::resources::as_resources::Asn;
|
||||
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SlurmError {
|
||||
#[error("Read slurm from reader error")]
|
||||
SlurmFromReader(),
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct SlurmFile {
|
||||
pub version: u32,
|
||||
pub validation_output_filters: ValidationOutputFilters,
|
||||
pub locally_added_assertions: LocallyAddedAssertions,
|
||||
}
|
||||
|
||||
impl SlurmFile {
|
||||
pub fn new(filters: ValidationOutputFilters,
|
||||
assertions: LocallyAddedAssertions,) -> Self {
|
||||
let version = 1;
|
||||
SlurmFile {
|
||||
version,
|
||||
validation_output_filters: filters,
|
||||
locally_added_assertions: assertions,
|
||||
}
|
||||
}
|
||||
|
||||
// pub fn from_reader(reader: impl io::Read)-> Result<Self, SlurmError> {
|
||||
//
|
||||
// }
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ValidationOutputFilters {
|
||||
pub prefix_filters: Vec<PrefixFilter>,
|
||||
pub bgpset_filters: Vec<BgpsecFilter>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Comment(String);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PrefixFilter {
|
||||
pub prefix: String,
|
||||
pub asn: Asn,
|
||||
pub comment: Option<Comment>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct BgpsecFilter {
|
||||
pub asn: Asn,
|
||||
pub ski: u8,
|
||||
pub comment: Option<Comment>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct LocallyAddedAssertions {
|
||||
pub prefix_assertions: Vec<PrefixAssertion>,
|
||||
pub bgpsec_assertions: Vec<BgpsecAssertion>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PrefixAssertion {
|
||||
pub prefix: String,
|
||||
pub asn: Asn,
|
||||
pub max_prefix_length: u8,
|
||||
pub comment: Option<Comment>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct BgpsecAssertion {
|
||||
pub asn: Asn,
|
||||
pub ski: u8,
|
||||
pub router_public_key: u8,
|
||||
pub comment: Option<Comment>,
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user