From d7ca603becbd91a9b271538c37c19442908ab598 Mon Sep 17 00:00:00 2001 From: MatthieuCoder Date: Tue, 10 Jan 2023 01:11:10 +0400 Subject: [PATCH] add config options for ratelimiter and rest --- exes/ratelimit/src/config.rs | 23 +++++++++++++++++++ exes/ratelimit/src/lib.rs | 14 +++++------ exes/rest/src/config.rs | 2 ++ exes/rest/src/lib.rs | 8 +++---- exes/rest/src/ratelimit_client/mod.rs | 20 +++++++++------- .../src/ratelimit_client/remote_hashring.rs | 8 +++---- 6 files changed, 52 insertions(+), 23 deletions(-) create mode 100644 exes/ratelimit/src/config.rs diff --git a/exes/ratelimit/src/config.rs b/exes/ratelimit/src/config.rs new file mode 100644 index 0000000..df18b76 --- /dev/null +++ b/exes/ratelimit/src/config.rs @@ -0,0 +1,23 @@ +use serde::Deserialize; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + +fn default_listening_address() -> SocketAddr { + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8090)) +} + +#[derive(Debug, Deserialize, Clone)] +pub struct ServerSettings { + pub listening_adress: SocketAddr, +} +impl Default for ServerSettings { + fn default() -> Self { + Self { + listening_adress: default_listening_address(), + } + } +} + +#[derive(Debug, Deserialize, Clone, Default)] +pub struct RatelimitServerConfig { + pub server: ServerSettings, +} diff --git a/exes/ratelimit/src/lib.rs b/exes/ratelimit/src/lib.rs index 7a1f98c..349c8ac 100644 --- a/exes/ratelimit/src/lib.rs +++ b/exes/ratelimit/src/lib.rs @@ -1,3 +1,4 @@ +use config::RatelimitServerConfig; use grpc::RLServer; use leash::{AnyhowResultFuture, Component}; use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer; @@ -9,12 +10,13 @@ use std::{net::ToSocketAddrs, pin::Pin}; use tokio::sync::oneshot; use tonic::transport::Server; +mod config; mod grpc; mod redis_global_local_bucket_ratelimiter; pub struct RatelimiterServerComponent {} impl Component for RatelimiterServerComponent { - type Config = (); + type Config = RatelimitServerConfig; const SERVICE_NAME: &'static str = "ratelimiter"; fn start( @@ -23,6 +25,7 @@ impl Component for RatelimiterServerComponent { stop: oneshot::Receiver<()>, ) -> AnyhowResultFuture<()> { Box::pin(async move { + let listening_address = settings.server.listening_adress; let redis = Into::< Pin> + Send>>, >::into(settings.redis) @@ -32,12 +35,9 @@ impl Component for RatelimiterServerComponent { Server::builder() .add_service(RatelimiterServer::new(server)) - .serve_with_shutdown( - "0.0.0.0:8093".to_socket_addrs().unwrap().next().unwrap(), - async move { - let _ = stop.await; - }, - ) + .serve_with_shutdown(listening_address, async move { + let _ = stop.await; + }) .await?; Ok(()) diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 3bfe8db..e87757c 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -26,4 +26,6 @@ pub struct Discord { pub struct ReverseProxyConfig { pub server: ServerSettings, pub discord: Discord, + pub ratelimiter_address: String, + pub ratelimiter_port: u16, } diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs index 0910d5e..19ecdde 100644 --- a/exes/rest/src/lib.rs +++ b/exes/rest/src/lib.rs @@ -7,7 +7,7 @@ use hyper::{ Body, Client, Request, Server, }; use leash::{AnyhowResultFuture, Component}; -use opentelemetry::{global, trace::{Tracer}}; +use opentelemetry::{global, trace::Tracer}; use opentelemetry_http::HeaderExtractor; use shared::config::Settings; use std::{convert::Infallible, sync::Arc}; @@ -29,7 +29,7 @@ impl Component for ReverseProxyServer { ) -> AnyhowResultFuture<()> { Box::pin(async move { // Client to the remote ratelimiters - let ratelimiter = ratelimit_client::RemoteRatelimiter::new(); + let ratelimiter = ratelimit_client::RemoteRatelimiter::new(settings.config.clone()); let https = hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() .https_only() @@ -47,8 +47,8 @@ impl Component for ReverseProxyServer { let parent_cx = global::get_text_map_propagator(|propagator| { propagator.extract(&HeaderExtractor(request.headers())) }); - let _span = global::tracer("") - .start_with_context("handle_request", &parent_cx); + let _span = + global::tracer("").start_with_context("handle_request", &parent_cx); let client = client.clone(); let ratelimiter = ratelimiter.clone(); diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index 7493af9..6212529 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -1,3 +1,5 @@ +use crate::config::ReverseProxyConfig; + use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode}; use opentelemetry::global; use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers}; @@ -13,7 +15,7 @@ use tokio::sync::oneshot::{self}; use tokio::sync::{broadcast, mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; use tonic::Request; -use tracing::{debug, debug_span, Instrument, Span, instrument}; +use tracing::{debug, debug_span, instrument, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; mod remote_hashring; @@ -22,6 +24,7 @@ mod remote_hashring; pub struct RemoteRatelimiter { remotes: Arc>, stop: Arc>, + config: ReverseProxyConfig, } impl Drop for RemoteRatelimiter { @@ -41,15 +44,15 @@ type IssueTicket = Pin< impl RemoteRatelimiter { async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> { // get list of dns responses - /*let responses = dns_lookup::lookup_host("localhost") - .unwrap() - .into_iter() - .map(|f| f.to_string());*/ + let responses = dns_lookup::lookup_host(&self.config.ratelimiter_address) + .unwrap() + .into_iter() + .map(|f| f.to_string()); let mut write = self.remotes.write().await; - for ip in ["ratelimit"] { - let a = VNode::new(ip.into()).await?; + for ip in responses { + let a = VNode::new(ip, self.config.ratelimiter_port).await?; write.add(a.clone()); } @@ -57,11 +60,12 @@ impl RemoteRatelimiter { } #[must_use] - pub fn new() -> Self { + pub fn new(config: ReverseProxyConfig) -> Self { let (rx, mut tx) = broadcast::channel(1); let obj = Self { remotes: Arc::new(RwLock::new(HashRingWrapper::default())), stop: Arc::new(rx), + config, }; let obj_clone = obj.clone(); diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs index d1c1702..b8771e5 100644 --- a/exes/rest/src/ratelimit_client/remote_hashring.rs +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -1,7 +1,7 @@ use core::fmt::Debug; -use std::convert::TryFrom; use opentelemetry::propagation::Injector; use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient; +use std::convert::TryFrom; use std::hash::Hash; use std::ops::Deref; use std::ops::DerefMut; @@ -47,10 +47,10 @@ impl<'a> Injector for MetadataMap<'a> { } } - impl VNode { - pub async fn new(address: String) -> Result { - let client = RatelimiterClient::connect(format!("http://{}:8093", address.clone())).await?; + pub async fn new(address: String, port: u16) -> Result { + let client = + RatelimiterClient::connect(format!("http://{}:{}", address.clone(), port)).await?; Ok(VNode { client, address }) } -- 2.39.5