修改部署文件;
This commit is contained in:
parent
19225edaa7
commit
897d168ceb
Binary file not shown.
@ -32,3 +32,4 @@ services:
|
|||||||
SHOW_ROA6: "1"
|
SHOW_ROA6: "1"
|
||||||
volumes:
|
volumes:
|
||||||
- ./bird.conf:/config/bird.conf:ro
|
- ./bird.conf:/config/bird.conf:ro
|
||||||
|
- ../../logs/bird:/app/logs
|
||||||
|
|||||||
@ -27,6 +27,14 @@ SHOW_ROA6="${SHOW_ROA6:-1}"
|
|||||||
SSH_HOST_PUBKEY_PATH="${SSH_HOST_PUBKEY_PATH:-/config/ssh/ssh_host_rsa_key.pub}"
|
SSH_HOST_PUBKEY_PATH="${SSH_HOST_PUBKEY_PATH:-/config/ssh/ssh_host_rsa_key.pub}"
|
||||||
SSH_KNOWN_HOSTS_PATH="${SSH_KNOWN_HOSTS_PATH:-/run/bird/known_hosts}"
|
SSH_KNOWN_HOSTS_PATH="${SSH_KNOWN_HOSTS_PATH:-/run/bird/known_hosts}"
|
||||||
|
|
||||||
|
LOG_DIR="${LOG_DIR:-/app/logs}"
|
||||||
|
LOG_NAME="${LOG_NAME:-${HOSTNAME:-bird-rpki-client}}"
|
||||||
|
STDOUT_LOG="${LOG_DIR}/${LOG_NAME}.stdout.log"
|
||||||
|
STDERR_LOG="${LOG_DIR}/${LOG_NAME}.stderr.log"
|
||||||
|
|
||||||
|
mkdir -p "$LOG_DIR"
|
||||||
|
exec >>"$STDOUT_LOG" 2>>"$STDERR_LOG"
|
||||||
|
|
||||||
ensure_ssh_known_hosts() {
|
ensure_ssh_known_hosts() {
|
||||||
if [ -s "$SSH_KNOWN_HOSTS_PATH" ]; then
|
if [ -s "$SSH_KNOWN_HOSTS_PATH" ]; then
|
||||||
return
|
return
|
||||||
|
|||||||
@ -27,6 +27,7 @@ services:
|
|||||||
RPKI_RTR_SLURM_DIR: "/app/slurm"
|
RPKI_RTR_SLURM_DIR: "/app/slurm"
|
||||||
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
|
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
|
||||||
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
|
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
|
||||||
|
RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128"
|
||||||
RUST_LOG: "info"
|
RUST_LOG: "info"
|
||||||
volumes:
|
volumes:
|
||||||
- ../../data:/app/data:ro
|
- ../../data:/app/data:ro
|
||||||
|
|||||||
@ -18,8 +18,9 @@ services:
|
|||||||
RPKI_RTR_CCR_DIR: "/app/data"
|
RPKI_RTR_CCR_DIR: "/app/data"
|
||||||
RPKI_RTR_SLURM_DIR: "/app/slurm"
|
RPKI_RTR_SLURM_DIR: "/app/slurm"
|
||||||
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
|
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
|
||||||
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
|
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "60"
|
||||||
RPKI_RTR_MAX_CONNECTIONS: "100000"
|
RPKI_RTR_MAX_CONNECTIONS: "100000"
|
||||||
|
RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128"
|
||||||
RUST_LOG: "info"
|
RUST_LOG: "info"
|
||||||
volumes:
|
volumes:
|
||||||
- ../../data:/app/data:ro
|
- ../../data:/app/data:ro
|
||||||
|
|||||||
@ -19,11 +19,13 @@ services:
|
|||||||
RPKI_RTR_TLS_CERT_PATH: "/app/certs/server-dns.crt"
|
RPKI_RTR_TLS_CERT_PATH: "/app/certs/server-dns.crt"
|
||||||
RPKI_RTR_TLS_KEY_PATH: "/app/certs/server-dns.key"
|
RPKI_RTR_TLS_KEY_PATH: "/app/certs/server-dns.key"
|
||||||
RPKI_RTR_TLS_CLIENT_CA_PATH: "/app/certs/client-ca.crt"
|
RPKI_RTR_TLS_CLIENT_CA_PATH: "/app/certs/client-ca.crt"
|
||||||
|
RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH: "false"
|
||||||
RPKI_RTR_DB_PATH: "/app/rtr-db"
|
RPKI_RTR_DB_PATH: "/app/rtr-db"
|
||||||
RPKI_RTR_CCR_DIR: "/app/data"
|
RPKI_RTR_CCR_DIR: "/app/data"
|
||||||
RPKI_RTR_SLURM_DIR: "/app/slurm"
|
RPKI_RTR_SLURM_DIR: "/app/slurm"
|
||||||
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
|
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
|
||||||
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
|
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
|
||||||
|
RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128"
|
||||||
RUST_LOG: "info"
|
RUST_LOG: "info"
|
||||||
volumes:
|
volumes:
|
||||||
- ../../data:/app/data:ro
|
- ../../data:/app/data:ro
|
||||||
|
|||||||
@ -22,6 +22,7 @@ services:
|
|||||||
RPKI_RTR_SLURM_DIR: "/app/slurm"
|
RPKI_RTR_SLURM_DIR: "/app/slurm"
|
||||||
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
|
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
|
||||||
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
|
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
|
||||||
|
RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128"
|
||||||
RUST_LOG: "info"
|
RUST_LOG: "info"
|
||||||
# SSH mode example:
|
# SSH mode example:
|
||||||
# RPKI_RTR_ENABLE_SSH: "true"
|
# RPKI_RTR_ENABLE_SSH: "true"
|
||||||
|
|||||||
42
src/main.rs
42
src/main.rs
@ -73,10 +73,12 @@ impl Default for AppConfig {
|
|||||||
|
|
||||||
service_config: RtrServiceConfig {
|
service_config: RtrServiceConfig {
|
||||||
max_connections: 512,
|
max_connections: 512,
|
||||||
|
max_concurrent_handshakes: 128,
|
||||||
notify_queue_size: 1024,
|
notify_queue_size: 1024,
|
||||||
tcp_keepalive: Some(Duration::from_secs(60)),
|
tcp_keepalive: Some(Duration::from_secs(60)),
|
||||||
warn_insecure_tcp: true,
|
warn_insecure_tcp: true,
|
||||||
require_tls_server_dns_name_san: false,
|
require_tls_server_dns_name_san: false,
|
||||||
|
enforce_tls_client_san_ip_match: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -218,6 +220,15 @@ impl AppConfig {
|
|||||||
.parse()
|
.parse()
|
||||||
.map_err(|err| anyhow!("invalid RPKI_RTR_MAX_CONNECTIONS '{}': {}", value, err))?;
|
.map_err(|err| anyhow!("invalid RPKI_RTR_MAX_CONNECTIONS '{}': {}", value, err))?;
|
||||||
}
|
}
|
||||||
|
if let Some(value) = env_var("RPKI_RTR_MAX_CONCURRENT_HANDSHAKES")? {
|
||||||
|
config.service_config.max_concurrent_handshakes = value.parse().map_err(|err| {
|
||||||
|
anyhow!(
|
||||||
|
"invalid RPKI_RTR_MAX_CONCURRENT_HANDSHAKES '{}': {}",
|
||||||
|
value,
|
||||||
|
err
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
if let Some(value) = env_var("RPKI_RTR_NOTIFY_QUEUE_SIZE")? {
|
if let Some(value) = env_var("RPKI_RTR_NOTIFY_QUEUE_SIZE")? {
|
||||||
config.service_config.notify_queue_size = value.parse().map_err(|err| {
|
config.service_config.notify_queue_size = value.parse().map_err(|err| {
|
||||||
anyhow!("invalid RPKI_RTR_NOTIFY_QUEUE_SIZE '{}': {}", value, err)
|
anyhow!("invalid RPKI_RTR_NOTIFY_QUEUE_SIZE '{}': {}", value, err)
|
||||||
@ -241,6 +252,29 @@ impl AppConfig {
|
|||||||
config.service_config.require_tls_server_dns_name_san =
|
config.service_config.require_tls_server_dns_name_san =
|
||||||
parse_bool(&value, "RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN")?;
|
parse_bool(&value, "RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN")?;
|
||||||
}
|
}
|
||||||
|
if let Some(value) = env_var("RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH")? {
|
||||||
|
config.service_config.enforce_tls_client_san_ip_match =
|
||||||
|
parse_bool(&value, "RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH")?;
|
||||||
|
}
|
||||||
|
if config.service_config.max_connections == 0 {
|
||||||
|
return Err(anyhow!(
|
||||||
|
"invalid RPKI_RTR_MAX_CONNECTIONS '{}': must be >= 1",
|
||||||
|
config.service_config.max_connections
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if config.service_config.max_concurrent_handshakes == 0 {
|
||||||
|
return Err(anyhow!(
|
||||||
|
"invalid RPKI_RTR_MAX_CONCURRENT_HANDSHAKES '{}': must be >= 1",
|
||||||
|
config.service_config.max_concurrent_handshakes
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if config.service_config.max_concurrent_handshakes > config.service_config.max_connections {
|
||||||
|
return Err(anyhow!(
|
||||||
|
"invalid handshake/connection limits: RPKI_RTR_MAX_CONCURRENT_HANDSHAKES ({}) must be <= RPKI_RTR_MAX_CONNECTIONS ({})",
|
||||||
|
config.service_config.max_concurrent_handshakes,
|
||||||
|
config.service_config.max_connections
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(config)
|
Ok(config)
|
||||||
}
|
}
|
||||||
@ -489,6 +523,10 @@ fn log_startup_config(config: &AppConfig) {
|
|||||||
info!("rtr_timing_retry_secs={}", config.timing.retry);
|
info!("rtr_timing_retry_secs={}", config.timing.retry);
|
||||||
info!("rtr_timing_expire_secs={}", config.timing.expire);
|
info!("rtr_timing_expire_secs={}", config.timing.expire);
|
||||||
info!("max_connections={}", config.service_config.max_connections);
|
info!("max_connections={}", config.service_config.max_connections);
|
||||||
|
info!(
|
||||||
|
"max_concurrent_handshakes={}",
|
||||||
|
config.service_config.max_concurrent_handshakes
|
||||||
|
);
|
||||||
info!(
|
info!(
|
||||||
"notify_queue_size={}",
|
"notify_queue_size={}",
|
||||||
config.service_config.notify_queue_size
|
config.service_config.notify_queue_size
|
||||||
@ -509,6 +547,10 @@ fn log_startup_config(config: &AppConfig) {
|
|||||||
"require_tls_server_dns_name_san={}",
|
"require_tls_server_dns_name_san={}",
|
||||||
config.service_config.require_tls_server_dns_name_san
|
config.service_config.require_tls_server_dns_name_san
|
||||||
);
|
);
|
||||||
|
info!(
|
||||||
|
"enforce_tls_client_san_ip_match={}",
|
||||||
|
config.service_config.enforce_tls_client_san_ip_match
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init_tracing() {
|
fn init_tracing() {
|
||||||
|
|||||||
@ -239,8 +239,12 @@ fn validate_aspa(customer_asn: u32, provider_asns: &[u32]) -> Result<()> {
|
|||||||
return Err(anyhow!("provider list must not be empty"));
|
return Err(anyhow!("provider list must not be empty"));
|
||||||
}
|
}
|
||||||
|
|
||||||
if provider_asns.iter().any(|asn| *asn == 0) {
|
if provider_asns.iter().any(|asn| *asn == 0)
|
||||||
return Err(anyhow!("provider list must not contain AS0"));
|
&& !(provider_asns.len() == 1 && provider_asns[0] == 0)
|
||||||
|
{
|
||||||
|
return Err(anyhow!(
|
||||||
|
"provider list containing AS0 must be exactly [0]"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@ -163,10 +163,12 @@ impl Aspa {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.provider_asns.iter().any(|asn| asn.into_u32() == 0) {
|
if self.provider_asns.iter().any(|asn| asn.into_u32() == 0)
|
||||||
|
&& !(self.provider_asns.len() == 1 && self.provider_asns[0].into_u32() == 0)
|
||||||
|
{
|
||||||
return Err(io::Error::new(
|
return Err(io::Error::new(
|
||||||
io::ErrorKind::InvalidData,
|
io::ErrorKind::InvalidData,
|
||||||
"ASPA provider list must not contain AS0",
|
"ASPA provider list containing AS0 must be exactly [0]",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1317,10 +1317,12 @@ impl Aspa {
|
|||||||
"ASPA withdrawal must not contain provider ASNs",
|
"ASPA withdrawal must not contain provider ASNs",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
if self.provider_asns.iter().any(|asn| *asn == 0) {
|
if self.provider_asns.iter().any(|asn| *asn == 0)
|
||||||
|
&& !(self.provider_asns.len() == 1 && self.provider_asns[0] == 0)
|
||||||
|
{
|
||||||
return Err(io::Error::new(
|
return Err(io::Error::new(
|
||||||
io::ErrorKind::InvalidData,
|
io::ErrorKind::InvalidData,
|
||||||
"ASPA provider list must not contain AS0",
|
"ASPA provider list containing AS0 must be exactly [0]",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
if self.provider_asns.windows(2).any(|pair| pair[0] >= pair[1]) {
|
if self.provider_asns.windows(2).any(|pair| pair[0] >= pair[1]) {
|
||||||
|
|||||||
@ -3,20 +3,24 @@ use std::time::Duration;
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RtrServiceConfig {
|
pub struct RtrServiceConfig {
|
||||||
pub max_connections: usize,
|
pub max_connections: usize,
|
||||||
|
pub max_concurrent_handshakes: usize,
|
||||||
pub notify_queue_size: usize,
|
pub notify_queue_size: usize,
|
||||||
pub tcp_keepalive: Option<Duration>,
|
pub tcp_keepalive: Option<Duration>,
|
||||||
pub warn_insecure_tcp: bool,
|
pub warn_insecure_tcp: bool,
|
||||||
pub require_tls_server_dns_name_san: bool,
|
pub require_tls_server_dns_name_san: bool,
|
||||||
|
pub enforce_tls_client_san_ip_match: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for RtrServiceConfig {
|
impl Default for RtrServiceConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
max_connections: 1024,
|
max_connections: 1024,
|
||||||
|
max_concurrent_handshakes: 128,
|
||||||
notify_queue_size: 1024,
|
notify_queue_size: 1024,
|
||||||
tcp_keepalive: Some(Duration::from_secs(60)),
|
tcp_keepalive: Some(Duration::from_secs(60)),
|
||||||
warn_insecure_tcp: true,
|
warn_insecure_tcp: true,
|
||||||
require_tls_server_dns_name_san: false,
|
require_tls_server_dns_name_san: false,
|
||||||
|
enforce_tls_client_san_ip_match: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -60,8 +60,10 @@ pub async fn handle_tls_connection(
|
|||||||
stream: TcpStream,
|
stream: TcpStream,
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
acceptor: TlsAcceptor,
|
acceptor: TlsAcceptor,
|
||||||
|
enforce_client_san_ip_match: bool,
|
||||||
notify_rx: broadcast::Receiver<()>,
|
notify_rx: broadcast::Receiver<()>,
|
||||||
shutdown_rx: watch::Receiver<bool>,
|
shutdown_rx: watch::Receiver<bool>,
|
||||||
|
handshake_permit: Option<OwnedSemaphorePermit>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
info!("RTR TLS handshake started for {}", peer_addr);
|
info!("RTR TLS handshake started for {}", peer_addr);
|
||||||
let tls_stream = acceptor
|
let tls_stream = acceptor
|
||||||
@ -69,13 +71,24 @@ pub async fn handle_tls_connection(
|
|||||||
.await
|
.await
|
||||||
.with_context(|| format!("TLS handshake failed for {}", peer_addr))?;
|
.with_context(|| format!("TLS handshake failed for {}", peer_addr))?;
|
||||||
info!("RTR TLS handshake completed for {}", peer_addr);
|
info!("RTR TLS handshake completed for {}", peer_addr);
|
||||||
verify_peer_certificate_ip(&tls_stream, peer_addr.ip()).with_context(|| {
|
match verify_peer_certificate_ip(&tls_stream, peer_addr.ip()) {
|
||||||
format!(
|
Ok(()) => info!("RTR TLS client certificate SAN IP validated for {}", peer_addr),
|
||||||
"TLS client certificate SAN IP validation failed for {}",
|
Err(err) => {
|
||||||
peer_addr
|
if enforce_client_san_ip_match {
|
||||||
)
|
return Err(err).with_context(|| {
|
||||||
})?;
|
format!(
|
||||||
info!("RTR TLS client certificate validated for {}", peer_addr);
|
"TLS client certificate SAN IP validation failed for {}",
|
||||||
|
peer_addr
|
||||||
|
)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
warn!(
|
||||||
|
"RTR TLS client certificate SAN IP validation failed but allowed by configuration for {}: {}",
|
||||||
|
peer_addr, err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(handshake_permit);
|
||||||
|
|
||||||
let session = RtrSession::new(cache, tls_stream, notify_rx, shutdown_rx);
|
let session = RtrSession::new(cache, tls_stream, notify_rx, shutdown_rx);
|
||||||
session.run().await?;
|
session.run().await?;
|
||||||
|
|||||||
@ -11,7 +11,7 @@ use russh::server::{self, Msg, Session};
|
|||||||
use russh::{Channel, ChannelId, Disconnect};
|
use russh::{Channel, ChannelId, Disconnect};
|
||||||
use socket2::{SockRef, TcpKeepalive};
|
use socket2::{SockRef, TcpKeepalive};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::sync::{Semaphore, broadcast, watch};
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, watch};
|
||||||
use tokio_rustls::TlsAcceptor;
|
use tokio_rustls::TlsAcceptor;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
@ -30,6 +30,9 @@ type TransportFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
|
|||||||
|
|
||||||
pub trait TransportAcceptor: Clone + Send + Sync + 'static {
|
pub trait TransportAcceptor: Clone + Send + Sync + 'static {
|
||||||
fn name(&self) -> &'static str;
|
fn name(&self) -> &'static str;
|
||||||
|
fn requires_handshake_limit(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_connection(
|
fn handle_connection(
|
||||||
&self,
|
&self,
|
||||||
@ -38,6 +41,7 @@ pub trait TransportAcceptor: Clone + Send + Sync + 'static {
|
|||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
notify_tx: broadcast::Sender<()>,
|
notify_tx: broadcast::Sender<()>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
|
handshake_permit: Option<OwnedSemaphorePermit>,
|
||||||
) -> TransportFuture;
|
) -> TransportFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,6 +60,7 @@ impl TransportAcceptor for TcpTransport {
|
|||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
notify_tx: broadcast::Sender<()>,
|
notify_tx: broadcast::Sender<()>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
|
_handshake_permit: Option<OwnedSemaphorePermit>,
|
||||||
) -> TransportFuture {
|
) -> TransportFuture {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
handle_tcp_connection(
|
handle_tcp_connection(
|
||||||
@ -73,12 +78,16 @@ impl TransportAcceptor for TcpTransport {
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct TlsTransport {
|
struct TlsTransport {
|
||||||
acceptor: TlsAcceptor,
|
acceptor: TlsAcceptor,
|
||||||
|
enforce_client_san_ip_match: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TransportAcceptor for TlsTransport {
|
impl TransportAcceptor for TlsTransport {
|
||||||
fn name(&self) -> &'static str {
|
fn name(&self) -> &'static str {
|
||||||
"TLS"
|
"TLS"
|
||||||
}
|
}
|
||||||
|
fn requires_handshake_limit(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_connection(
|
fn handle_connection(
|
||||||
&self,
|
&self,
|
||||||
@ -87,16 +96,20 @@ impl TransportAcceptor for TlsTransport {
|
|||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
notify_tx: broadcast::Sender<()>,
|
notify_tx: broadcast::Sender<()>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
|
handshake_permit: Option<OwnedSemaphorePermit>,
|
||||||
) -> TransportFuture {
|
) -> TransportFuture {
|
||||||
let acceptor = self.acceptor.clone();
|
let acceptor = self.acceptor.clone();
|
||||||
|
let enforce_client_san_ip_match = self.enforce_client_san_ip_match;
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
handle_tls_connection(
|
handle_tls_connection(
|
||||||
cache,
|
cache,
|
||||||
stream,
|
stream,
|
||||||
peer_addr,
|
peer_addr,
|
||||||
acceptor,
|
acceptor,
|
||||||
|
enforce_client_san_ip_match,
|
||||||
notify_tx.subscribe(),
|
notify_tx.subscribe(),
|
||||||
shutdown_tx.subscribe(),
|
shutdown_tx.subscribe(),
|
||||||
|
handshake_permit,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
})
|
})
|
||||||
@ -120,6 +133,7 @@ impl TransportAcceptor for SshTransport {
|
|||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
notify_tx: broadcast::Sender<()>,
|
notify_tx: broadcast::Sender<()>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
|
_handshake_permit: Option<OwnedSemaphorePermit>,
|
||||||
) -> TransportFuture {
|
) -> TransportFuture {
|
||||||
let runtime = self.runtime.clone();
|
let runtime = self.runtime.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
@ -171,6 +185,7 @@ pub struct RtrServer {
|
|||||||
notify_tx: broadcast::Sender<()>,
|
notify_tx: broadcast::Sender<()>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
connection_limiter: Arc<Semaphore>,
|
connection_limiter: Arc<Semaphore>,
|
||||||
|
handshake_limiter: Arc<Semaphore>,
|
||||||
active_connections: Arc<AtomicUsize>,
|
active_connections: Arc<AtomicUsize>,
|
||||||
config: RtrServiceConfig,
|
config: RtrServiceConfig,
|
||||||
}
|
}
|
||||||
@ -182,6 +197,7 @@ impl RtrServer {
|
|||||||
notify_tx: broadcast::Sender<()>,
|
notify_tx: broadcast::Sender<()>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
connection_limiter: Arc<Semaphore>,
|
connection_limiter: Arc<Semaphore>,
|
||||||
|
handshake_limiter: Arc<Semaphore>,
|
||||||
active_connections: Arc<AtomicUsize>,
|
active_connections: Arc<AtomicUsize>,
|
||||||
config: RtrServiceConfig,
|
config: RtrServiceConfig,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
@ -191,6 +207,7 @@ impl RtrServer {
|
|||||||
notify_tx,
|
notify_tx,
|
||||||
shutdown_tx,
|
shutdown_tx,
|
||||||
connection_limiter,
|
connection_limiter,
|
||||||
|
handshake_limiter,
|
||||||
active_connections,
|
active_connections,
|
||||||
config,
|
config,
|
||||||
}
|
}
|
||||||
@ -231,6 +248,7 @@ impl RtrServer {
|
|||||||
pub async fn run_tls(self, tls_config: Arc<ServerConfig>) -> Result<()> {
|
pub async fn run_tls(self, tls_config: Arc<ServerConfig>) -> Result<()> {
|
||||||
let transport = TlsTransport {
|
let transport = TlsTransport {
|
||||||
acceptor: TlsAcceptor::from(tls_config),
|
acceptor: TlsAcceptor::from(tls_config),
|
||||||
|
enforce_client_san_ip_match: self.config.enforce_tls_client_san_ip_match,
|
||||||
};
|
};
|
||||||
self.run_with_transport(transport).await
|
self.run_with_transport(transport).await
|
||||||
}
|
}
|
||||||
@ -317,6 +335,24 @@ impl RtrServer {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let handshake_permit = if transport.requires_handshake_limit() {
|
||||||
|
match self.handshake_limiter.clone().try_acquire_owned() {
|
||||||
|
Ok(permit) => Some(permit),
|
||||||
|
Err(_) => {
|
||||||
|
warn!(
|
||||||
|
"RTR {} connection rejected for {}: max concurrent handshakes reached ({})",
|
||||||
|
transport.name(),
|
||||||
|
peer_addr,
|
||||||
|
self.config.max_concurrent_handshakes
|
||||||
|
);
|
||||||
|
drop(stream);
|
||||||
|
drop(permit);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let cache = self.cache.clone();
|
let cache = self.cache.clone();
|
||||||
let notify_tx = self.notify_tx.clone();
|
let notify_tx = self.notify_tx.clone();
|
||||||
@ -341,7 +377,14 @@ impl RtrServer {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if let Err(err) = transport_instance
|
if let Err(err) = transport_instance
|
||||||
.handle_connection(cache, stream, peer_addr, notify_tx, shutdown_tx)
|
.handle_connection(
|
||||||
|
cache,
|
||||||
|
stream,
|
||||||
|
peer_addr,
|
||||||
|
notify_tx,
|
||||||
|
shutdown_tx,
|
||||||
|
handshake_permit,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
let active_after_close = guard.active_count().saturating_sub(1);
|
let active_after_close = guard.active_count().saturating_sub(1);
|
||||||
|
|||||||
@ -20,6 +20,7 @@ pub struct RtrService {
|
|||||||
notify_tx: broadcast::Sender<()>,
|
notify_tx: broadcast::Sender<()>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
connection_limiter: Arc<Semaphore>,
|
connection_limiter: Arc<Semaphore>,
|
||||||
|
handshake_limiter: Arc<Semaphore>,
|
||||||
active_connections: Arc<AtomicUsize>,
|
active_connections: Arc<AtomicUsize>,
|
||||||
config: RtrServiceConfig,
|
config: RtrServiceConfig,
|
||||||
}
|
}
|
||||||
@ -38,6 +39,7 @@ impl RtrService {
|
|||||||
notify_tx,
|
notify_tx,
|
||||||
shutdown_tx,
|
shutdown_tx,
|
||||||
connection_limiter: Arc::new(Semaphore::new(config.max_connections)),
|
connection_limiter: Arc::new(Semaphore::new(config.max_connections)),
|
||||||
|
handshake_limiter: Arc::new(Semaphore::new(config.max_concurrent_handshakes)),
|
||||||
active_connections: Arc::new(AtomicUsize::new(0)),
|
active_connections: Arc::new(AtomicUsize::new(0)),
|
||||||
config,
|
config,
|
||||||
}
|
}
|
||||||
@ -70,6 +72,7 @@ impl RtrService {
|
|||||||
self.notify_tx.clone(),
|
self.notify_tx.clone(),
|
||||||
self.shutdown_tx.clone(),
|
self.shutdown_tx.clone(),
|
||||||
self.connection_limiter.clone(),
|
self.connection_limiter.clone(),
|
||||||
|
self.handshake_limiter.clone(),
|
||||||
self.active_connections.clone(),
|
self.active_connections.clone(),
|
||||||
self.config.clone(),
|
self.config.clone(),
|
||||||
)
|
)
|
||||||
@ -82,6 +85,7 @@ impl RtrService {
|
|||||||
self.notify_tx.clone(),
|
self.notify_tx.clone(),
|
||||||
self.shutdown_tx.clone(),
|
self.shutdown_tx.clone(),
|
||||||
self.connection_limiter.clone(),
|
self.connection_limiter.clone(),
|
||||||
|
self.handshake_limiter.clone(),
|
||||||
self.active_connections.clone(),
|
self.active_connections.clone(),
|
||||||
self.config.clone(),
|
self.config.clone(),
|
||||||
)
|
)
|
||||||
@ -94,6 +98,7 @@ impl RtrService {
|
|||||||
self.notify_tx.clone(),
|
self.notify_tx.clone(),
|
||||||
self.shutdown_tx.clone(),
|
self.shutdown_tx.clone(),
|
||||||
self.connection_limiter.clone(),
|
self.connection_limiter.clone(),
|
||||||
|
self.handshake_limiter.clone(),
|
||||||
self.active_connections.clone(),
|
self.active_connections.clone(),
|
||||||
self.config.clone(),
|
self.config.clone(),
|
||||||
)
|
)
|
||||||
|
|||||||
@ -157,7 +157,9 @@ async fn start_tls_session_server_with_cert(
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = handle_tls_connection(cache, stream, peer_addr, acceptor, notify_rx, shutdown_rx).await;
|
let _ =
|
||||||
|
handle_tls_connection(cache, stream, peer_addr, acceptor, notify_rx, shutdown_rx, None)
|
||||||
|
.await;
|
||||||
});
|
});
|
||||||
|
|
||||||
(addr, shutdown_tx, handle)
|
(addr, shutdown_tx, handle)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user