summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--exes/ratelimit/src/config.rs23
-rw-r--r--exes/ratelimit/src/lib.rs14
-rw-r--r--exes/rest/src/config.rs2
-rw-r--r--exes/rest/src/lib.rs8
-rw-r--r--exes/rest/src/ratelimit_client/mod.rs20
-rw-r--r--exes/rest/src/ratelimit_client/remote_hashring.rs8
6 files changed, 52 insertions, 23 deletions
diff --git a/exes/ratelimit/src/config.rs b/exes/ratelimit/src/config.rs
new file mode 100644
index 0000000..df18b76
--- /dev/null
+++ b/exes/ratelimit/src/config.rs
@@ -0,0 +1,23 @@
+use serde::Deserialize;
+use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
+
+fn default_listening_address() -> SocketAddr {
+ SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8090))
+}
+
+#[derive(Debug, Deserialize, Clone)]
+pub struct ServerSettings {
+ pub listening_adress: SocketAddr,
+}
+impl Default for ServerSettings {
+ fn default() -> Self {
+ Self {
+ listening_adress: default_listening_address(),
+ }
+ }
+}
+
+#[derive(Debug, Deserialize, Clone, Default)]
+pub struct RatelimitServerConfig {
+ pub server: ServerSettings,
+}
diff --git a/exes/ratelimit/src/lib.rs b/exes/ratelimit/src/lib.rs
index 7a1f98c..349c8ac 100644
--- a/exes/ratelimit/src/lib.rs
+++ b/exes/ratelimit/src/lib.rs
@@ -1,3 +1,4 @@
+use config::RatelimitServerConfig;
use grpc::RLServer;
use leash::{AnyhowResultFuture, Component};
use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer;
@@ -9,12 +10,13 @@ use std::{net::ToSocketAddrs, pin::Pin};
use tokio::sync::oneshot;
use tonic::transport::Server;
+mod config;
mod grpc;
mod redis_global_local_bucket_ratelimiter;
pub struct RatelimiterServerComponent {}
impl Component for RatelimiterServerComponent {
- type Config = ();
+ type Config = RatelimitServerConfig;
const SERVICE_NAME: &'static str = "ratelimiter";
fn start(
@@ -23,6 +25,7 @@ impl Component for RatelimiterServerComponent {
stop: oneshot::Receiver<()>,
) -> AnyhowResultFuture<()> {
Box::pin(async move {
+ let listening_address = settings.server.listening_adress;
let redis = Into::<
Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>> + Send>>,
>::into(settings.redis)
@@ -32,12 +35,9 @@ impl Component for RatelimiterServerComponent {
Server::builder()
.add_service(RatelimiterServer::new(server))
- .serve_with_shutdown(
- "0.0.0.0:8093".to_socket_addrs().unwrap().next().unwrap(),
- async move {
- let _ = stop.await;
- },
- )
+ .serve_with_shutdown(listening_address, async move {
+ let _ = stop.await;
+ })
.await?;
Ok(())
diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs
index 3bfe8db..e87757c 100644
--- a/exes/rest/src/config.rs
+++ b/exes/rest/src/config.rs
@@ -26,4 +26,6 @@ pub struct Discord {
pub struct ReverseProxyConfig {
pub server: ServerSettings,
pub discord: Discord,
+ pub ratelimiter_address: String,
+ pub ratelimiter_port: u16,
}
diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs
index 0910d5e..19ecdde 100644
--- a/exes/rest/src/lib.rs
+++ b/exes/rest/src/lib.rs
@@ -7,7 +7,7 @@ use hyper::{
Body, Client, Request, Server,
};
use leash::{AnyhowResultFuture, Component};
-use opentelemetry::{global, trace::{Tracer}};
+use opentelemetry::{global, trace::Tracer};
use opentelemetry_http::HeaderExtractor;
use shared::config::Settings;
use std::{convert::Infallible, sync::Arc};
@@ -29,7 +29,7 @@ impl Component for ReverseProxyServer {
) -> AnyhowResultFuture<()> {
Box::pin(async move {
// Client to the remote ratelimiters
- let ratelimiter = ratelimit_client::RemoteRatelimiter::new();
+ let ratelimiter = ratelimit_client::RemoteRatelimiter::new(settings.config.clone());
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_only()
@@ -47,8 +47,8 @@ impl Component for ReverseProxyServer {
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(request.headers()))
});
- let _span = global::tracer("")
- .start_with_context("handle_request", &parent_cx);
+ let _span =
+ global::tracer("").start_with_context("handle_request", &parent_cx);
let client = client.clone();
let ratelimiter = ratelimiter.clone();
diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs
index 7493af9..6212529 100644
--- a/exes/rest/src/ratelimit_client/mod.rs
+++ b/exes/rest/src/ratelimit_client/mod.rs
@@ -1,3 +1,5 @@
+use crate::config::ReverseProxyConfig;
+
use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode};
use opentelemetry::global;
use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};
@@ -13,7 +15,7 @@ use tokio::sync::oneshot::{self};
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Request;
-use tracing::{debug, debug_span, Instrument, Span, instrument};
+use tracing::{debug, debug_span, instrument, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
mod remote_hashring;
@@ -22,6 +24,7 @@ mod remote_hashring;
pub struct RemoteRatelimiter {
remotes: Arc<RwLock<HashRingWrapper>>,
stop: Arc<tokio::sync::broadcast::Sender<()>>,
+ config: ReverseProxyConfig,
}
impl Drop for RemoteRatelimiter {
@@ -41,15 +44,15 @@ type IssueTicket = Pin<
impl RemoteRatelimiter {
async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> {
// get list of dns responses
- /*let responses = dns_lookup::lookup_host("localhost")
- .unwrap()
- .into_iter()
- .map(|f| f.to_string());*/
+ let responses = dns_lookup::lookup_host(&self.config.ratelimiter_address)
+ .unwrap()
+ .into_iter()
+ .map(|f| f.to_string());
let mut write = self.remotes.write().await;
- for ip in ["ratelimit"] {
- let a = VNode::new(ip.into()).await?;
+ for ip in responses {
+ let a = VNode::new(ip, self.config.ratelimiter_port).await?;
write.add(a.clone());
}
@@ -57,11 +60,12 @@ impl RemoteRatelimiter {
}
#[must_use]
- pub fn new() -> Self {
+ pub fn new(config: ReverseProxyConfig) -> Self {
let (rx, mut tx) = broadcast::channel(1);
let obj = Self {
remotes: Arc::new(RwLock::new(HashRingWrapper::default())),
stop: Arc::new(rx),
+ config,
};
let obj_clone = obj.clone();
diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs
index d1c1702..b8771e5 100644
--- a/exes/rest/src/ratelimit_client/remote_hashring.rs
+++ b/exes/rest/src/ratelimit_client/remote_hashring.rs
@@ -1,7 +1,7 @@
use core::fmt::Debug;
-use std::convert::TryFrom;
use opentelemetry::propagation::Injector;
use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient;
+use std::convert::TryFrom;
use std::hash::Hash;
use std::ops::Deref;
use std::ops::DerefMut;
@@ -47,10 +47,10 @@ impl<'a> Injector for MetadataMap<'a> {
}
}
-
impl VNode {
- pub async fn new(address: String) -> Result<Self, tonic::transport::Error> {
- let client = RatelimiterClient::connect(format!("http://{}:8093", address.clone())).await?;
+ pub async fn new(address: String, port: u16) -> Result<Self, tonic::transport::Error> {
+ let client =
+ RatelimiterClient::connect(format!("http://{}:{}", address.clone(), port)).await?;
Ok(VNode { client, address })
}