summaryrefslogtreecommitdiff
path: root/exes/rest
diff options
context:
space:
mode:
authorMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-02 18:59:03 +0400
committerMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-02 18:59:03 +0400
commitf8c2a144e2f3e47371f5e8352e7a7a0b6707bf88 (patch)
tree8c1e6bd157ac599429c806f9aa9bc9dbc28140ed /exes/rest
parent46fd26962ef55f8b557f7e36d3aee915a819c88c (diff)
restructure project
Diffstat (limited to 'exes/rest')
-rw-r--r--exes/rest/Cargo.toml15
-rw-r--r--exes/rest/src/config.rs20
-rw-r--r--exes/rest/src/handler.rs141
-rw-r--r--exes/rest/src/main.rs90
-rw-r--r--exes/rest/src/proxy/mod.rs138
-rw-r--r--exes/rest/src/ratelimit/mod.rs155
-rw-r--r--exes/rest/src/ratelimit_client/mod.rs137
-rw-r--r--exes/rest/src/ratelimit_client/remote_hashring.rs67
8 files changed, 425 insertions, 338 deletions
diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml
index 7b5b2b5..f4c5ecc 100644
--- a/exes/rest/Cargo.toml
+++ b/exes/rest/Cargo.toml
@@ -7,10 +7,23 @@ edition = "2018"
[dependencies]
shared = { path = "../../libs/shared" }
+proto = { path = "../../libs/proto" }
+leash = { path = "../../libs/leash" }
+
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.8", features = ["derive"] }
futures-util = "0.3.17"
hyper-tls = "0.5.0"
lazy_static = "1.4.0"
-xxhash-rust = { version = "0.8.2", features = ["xxh32"] } \ No newline at end of file
+xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
+twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" }
+tracing = "0.1.37"
+hashring = "0.3.0"
+anyhow = "*"
+tonic = "0.8.3"
+serde_json = { version = "1.0" }
+http = "0.2.8"
+tokio-stream = "0.1.11"
+dns-lookup = "1.0.8"
+tokio-scoped = "0.2.0" \ No newline at end of file
diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs
index 559929f..9261de2 100644
--- a/exes/rest/src/config.rs
+++ b/exes/rest/src/config.rs
@@ -1,9 +1,21 @@
+use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use serde::Deserialize;
-#[derive(Debug, Deserialize, Clone, Default)]
+fn default_listening_address() -> SocketAddr {
+ SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080))
+}
+
+#[derive(Debug, Deserialize, Clone)]
pub struct ServerSettings {
- pub port: u16,
- pub address: String,
+ #[serde(default = "default_listening_address")]
+ pub listening_adress: SocketAddr
+}
+impl Default for ServerSettings {
+ fn default() -> Self {
+ Self {
+ listening_adress: default_listening_address(),
+ }
+ }
}
#[derive(Debug, Deserialize, Clone, Default)]
@@ -12,7 +24,7 @@ pub struct Discord {
}
#[derive(Debug, Deserialize, Clone, Default)]
-pub struct Config {
+pub struct ReverseProxyConfig {
pub server: ServerSettings,
pub discord: Discord,
}
diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs
new file mode 100644
index 0000000..8b0dd52
--- /dev/null
+++ b/exes/rest/src/handler.rs
@@ -0,0 +1,141 @@
+use std::{
+ collections::hash_map::DefaultHasher,
+ convert::TryFrom,
+ hash::{Hash, Hasher},
+ str::FromStr,
+};
+
+use anyhow::bail;
+use http::{
+ header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
+ HeaderValue, Method as HttpMethod, Request, Response, Uri,
+};
+use hyper::{client::HttpConnector, Body, Client};
+use hyper_tls::HttpsConnector;
+use shared::log::error;
+use twilight_http_ratelimiting::{Method, Path};
+
+use crate::ratelimit_client::RemoteRatelimiter;
+
+/// Normalizes the path
+fn normalize_path(request_path: &str) -> (&str, &str) {
+ if let Some(trimmed_path) = request_path.strip_prefix("/api") {
+ if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) {
+ if let Some(version_number) = maybe_api_version.strip_prefix('v') {
+ if version_number.parse::<u8>().is_ok() {
+ let len = "/api/v".len() + version_number.len();
+ return (&request_path[..len], &request_path[len..]);
+ };
+ };
+ }
+
+ ("/api", trimmed_path)
+ } else {
+ ("/api", request_path)
+ }
+}
+
+pub async fn handle_request(
+ client: Client<HttpsConnector<HttpConnector>, Body>,
+ ratelimiter: RemoteRatelimiter,
+ token: String,
+ mut request: Request<Body>,
+) -> Result<Response<Body>, anyhow::Error> {
+ let (hash, uri_string) = {
+ let method = match *request.method() {
+ HttpMethod::DELETE => Method::Delete,
+ HttpMethod::GET => Method::Get,
+ HttpMethod::PATCH => Method::Patch,
+ HttpMethod::POST => Method::Post,
+ HttpMethod::PUT => Method::Put,
+ _ => {
+ error!("Unsupported HTTP method in request, {}", request.method());
+ bail!("unsupported method");
+ }
+ };
+
+ let request_path = request.uri().path();
+ let (api_path, trimmed_path) = normalize_path(&request_path);
+
+ let mut uri_string = format!("http://192.168.0.27:8000{}{}", api_path, trimmed_path);
+ if let Some(query) = request.uri().query() {
+ uri_string.push('?');
+ uri_string.push_str(query);
+ }
+
+ let mut hash = DefaultHasher::new();
+ match Path::try_from((method, trimmed_path)) {
+ Ok(path) => path,
+ Err(e) => {
+ error!(
+ "Failed to parse path for {:?} {}: {:?}",
+ method, trimmed_path, e
+ );
+ bail!("failed o parse");
+ }
+ }
+ .hash(&mut hash);
+
+ (hash.finish().to_string(), uri_string)
+ };
+
+ let header_sender = match ratelimiter.ticket(hash).await {
+ Ok(sender) => sender,
+ Err(e) => {
+ error!("Failed to receive ticket for ratelimiting: {:?}", e);
+ bail!("failed to reteive ticket");
+ }
+ };
+
+ request.headers_mut().insert(
+ AUTHORIZATION,
+ HeaderValue::from_bytes(token.as_bytes())
+ .expect("strings are guaranteed to be valid utf-8"),
+ );
+ request
+ .headers_mut()
+ .insert(HOST, HeaderValue::from_static("discord.com"));
+
+ // Remove forbidden HTTP/2 headers
+ // https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2
+ request.headers_mut().remove(CONNECTION);
+ request.headers_mut().remove("keep-alive");
+ request.headers_mut().remove("proxy-connection");
+ request.headers_mut().remove(TRANSFER_ENCODING);
+ request.headers_mut().remove(UPGRADE);
+ request.headers_mut().remove(AUTHORIZATION);
+ request.headers_mut().append(
+ AUTHORIZATION,
+ HeaderValue::from_static(
+ "Bot ODA3MTg4MzM1NzE3Mzg0MjEy.G3sXFM.8gY2sVYDAq2WuPWwDskAAEFLfTg8htooxME-LE",
+ ),
+ );
+
+ let uri = match Uri::from_str(&uri_string) {
+ Ok(uri) => uri,
+ Err(e) => {
+ error!("Failed to create URI for requesting Discord API: {:?}", e);
+ bail!("failed to create uri");
+ }
+ };
+ *request.uri_mut() = uri;
+ let resp = match client.request(request).await {
+ Ok(response) => response,
+ Err(e) => {
+ error!("Error when requesting the Discord API: {:?}", e);
+ bail!("failed to request the discord api");
+ }
+ };
+
+ let ratelimit_headers = resp
+ .headers()
+ .into_iter()
+ .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string()))
+ .collect();
+
+ if header_sender.send(ratelimit_headers).is_err() {
+ error!("Error when sending ratelimit headers to ratelimiter");
+ };
+
+ Ok(resp)
+}
diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs
index 9fa6ce7..8d014ab 100644
--- a/exes/rest/src/main.rs
+++ b/exes/rest/src/main.rs
@@ -1,46 +1,56 @@
-use std::{convert::Infallible, sync::Arc};
+use config::ReverseProxyConfig;
-use crate::{config::Config, ratelimit::Ratelimiter};
-use shared::{
- config::Settings,
- log::{error, info},
- redis_crate::Client,
+use handler::handle_request;
+use hyper::{
+ server::conn::AddrStream,
+ service::{make_service_fn, service_fn},
+ Body, Client, Request, Server,
};
-use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
-use std::net::ToSocketAddrs;
-use tokio::sync::Mutex;
-
-use crate::proxy::ServiceProxy;
+use hyper_tls::HttpsConnector;
+use leash::{ignite, AnyhowResultFuture, Component};
+use shared::config::Settings;
+use std::convert::Infallible;
mod config;
-mod proxy;
-mod ratelimit;
-
-#[tokio::main]
-async fn main() {
- let settings: Settings<Config> = Settings::new("rest").unwrap();
- let config = Arc::new(settings.config);
- let redis_client: Client = settings.redis.into();
- let redis = Arc::new(Mutex::new(
- redis_client.get_async_connection().await.unwrap(),
- ));
- let ratelimiter = Arc::new(Ratelimiter::new(redis));
-
- let addr = format!("{}:{}", config.server.address, config.server.port)
- .to_socket_addrs()
- .unwrap()
- .next()
- .unwrap();
-
- let service_fn = make_service_fn(move |_: &AddrStream| {
- let service_proxy = ServiceProxy::new(config.clone(), ratelimiter.clone());
- async move { Ok::<_, Infallible>(service_proxy) }
- });
-
- let server = Server::bind(&addr).serve(service_fn);
-
- info!("starting ratelimit server");
- if let Err(e) = server.await {
- error!("server error: {}", e);
+mod handler;
+mod ratelimit_client;
+
+struct ReverseProxyServer {}
+impl Component for ReverseProxyServer {
+ type Config = ReverseProxyConfig;
+ const SERVICE_NAME: &'static str = "rest";
+
+ fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+ Box::pin(async move {
+ // Client to the remote ratelimiters
+ let ratelimiter = ratelimit_client::RemoteRatelimiter::new();
+ let client = Client::builder().build(HttpsConnector::new());
+
+ let service_fn = make_service_fn(move |_: &AddrStream| {
+ let client = client.clone();
+ let ratelimiter = ratelimiter.clone();
+ async move {
+ Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
+ let client = client.clone();
+ let ratelimiter = ratelimiter.clone();
+ async move {
+ handle_request(client, ratelimiter, "token".to_string(), request).await
+ }
+ }))
+ }
+ });
+
+ let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn);
+
+ server.await?;
+
+ Ok(())
+ })
+ }
+
+ fn new() -> Self {
+ Self {}
}
}
+
+ignite!(ReverseProxyServer);
diff --git a/exes/rest/src/proxy/mod.rs b/exes/rest/src/proxy/mod.rs
deleted file mode 100644
index 65d77aa..0000000
--- a/exes/rest/src/proxy/mod.rs
+++ /dev/null
@@ -1,138 +0,0 @@
-use crate::{config::Config, ratelimit::Ratelimiter};
-use hyper::{
- client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client,
- Request, Response, Uri,
-};
-use hyper_tls::HttpsConnector;
-use shared::{
- log::debug,
- prometheus::{labels, opts, register_counter, register_histogram_vec, Counter, HistogramVec},
-};
-use std::{future::Future, pin::Pin, sync::Arc, task::Poll};
-use tokio::sync::Mutex;
-
-lazy_static::lazy_static! {
- static ref HTTP_COUNTER: Counter = register_counter!(opts!(
- "nova_rest_http_requests_total",
- "Number of HTTP requests made.",
- labels! {"handler" => "all",}
- ))
- .unwrap();
-
- static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!(
- "nova_rest_http_request_duration_seconds",
- "The HTTP request latencies in seconds.",
- &["handler"]
- )
- .unwrap();
-
- static ref HTTP_COUNTER_STATUS: Counter = register_counter!(opts!(
- "nova_rest_http_requests_status",
- "Number of HTTP requests made by status",
- labels! {"" => ""}
- ))
- .unwrap();
-}
-
-#[derive(Clone)]
-pub struct ServiceProxy {
- client: Client<HttpsConnector<HttpConnector>>,
- ratelimiter: Arc<Ratelimiter>,
- config: Arc<Config>,
- fail: Arc<Mutex<i32>>,
-}
-
-impl Service<Request<Body>> for ServiceProxy {
- type Response = Response<Body>;
- type Error = hyper::Error;
- type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
-
- fn poll_ready(
- &mut self,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), Self::Error>> {
- match self.client.poll_ready(cx) {
- Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
- Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
- Poll::Pending => Poll::Pending,
- }
- }
-
- fn call(&mut self, mut req: Request<hyper::Body>) -> Self::Future {
- HTTP_COUNTER.inc();
-
- let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer();
- let host = "discord.com";
- let mut new_parts = Parts::default();
-
- let path = req.uri().path().to_string();
-
- new_parts.scheme = Some("https".parse().unwrap());
- new_parts.authority = Some(host.parse().unwrap());
- new_parts.path_and_query = Some(path.parse().unwrap());
-
- *req.uri_mut() = Uri::from_parts(new_parts).unwrap();
-
- let headers = req.headers_mut();
- headers.remove("user-agent");
- headers.insert("Host", HeaderValue::from_str("discord.com").unwrap());
- headers.insert(
- "Authorization",
- HeaderValue::from_str(&format!("Bot {}", self.config.discord.token)).unwrap(),
- );
-
- println!("{:?}", headers);
-
- let client = self.client.clone();
- let ratelimiter = self.ratelimiter.clone();
- let fail = self.fail.clone();
-
- return Box::pin(async move {
- let resp = match ratelimiter.before_request(&req).await {
- Ok(allowed) => match allowed {
- crate::ratelimit::RatelimiterResponse::Ratelimited => {
- debug!("ratelimited");
- Ok(Response::builder().body("ratelimited".into()).unwrap())
- }
- _ => {
- debug!("forwarding request");
- match client.request(req).await {
- Ok(mut response) => {
- ratelimiter.after_request(&path, &response).await;
- if response.status() != 200 {
- *fail.lock().await += 1
- }
- response.headers_mut().insert(
- "x-fails",
- HeaderValue::from_str(&format!("{}", fail.lock().await))
- .unwrap(),
- );
- Ok(response)
- }
- Err(e) => Err(e),
- }
- }
- },
- Err(e) => Ok(Response::builder()
- .body(format!("server error: {}", e).into())
- .unwrap()),
- };
- timer.observe_duration();
- resp
- });
- }
-}
-
-impl ServiceProxy {
- pub fn new(config: Arc<Config>, ratelimiter: Arc<Ratelimiter>) -> Self {
- let https = HttpsConnector::new();
- let client = Client::builder().build::<_, hyper::Body>(https);
- let fail = Arc::new(Mutex::new(0));
- ServiceProxy {
- client,
- config,
- ratelimiter,
- fail,
- }
- }
-}
diff --git a/exes/rest/src/ratelimit/mod.rs b/exes/rest/src/ratelimit/mod.rs
deleted file mode 100644
index 132bfd3..0000000
--- a/exes/rest/src/ratelimit/mod.rs
+++ /dev/null
@@ -1,155 +0,0 @@
-use shared::{
- log::debug,
- redis_crate::{aio::Connection, AsyncCommands}, error::GenericError,
-};
-use hyper::{Body, Request, Response};
-use std::{
- convert::TryInto,
- sync::Arc,
- time::{SystemTime, UNIX_EPOCH},
-};
-use tokio::sync::Mutex;
-use xxhash_rust::xxh32::xxh32;
-
-pub enum RatelimiterResponse {
- NoSuchUrl,
- Ratelimited,
- Pass,
-}
-
-pub struct Ratelimiter {
- redis: Arc<Mutex<Connection>>,
-}
-
-impl Ratelimiter {
- pub fn new(redis: Arc<Mutex<Connection>>) -> Ratelimiter {
- return Ratelimiter { redis };
- }
-
- pub async fn before_request(
- &self,
- request: &Request<Body>,
- ) -> Result<RatelimiterResponse, GenericError> {
- // we lookup if the route hash is stored in the redis table
- let path = request.uri().path();
- let hash = xxh32(path.as_bytes(), 32);
- let mut redis = self.redis.lock().await;
-
- let start = SystemTime::now();
- let since_the_epoch = start
- .duration_since(UNIX_EPOCH)
- .expect("Time went backwards");
-
- // global rate litmit
- match redis
- .get::<String, Option<i32>>(format!(
- "nova:rest:ratelimit:global:{}",
- since_the_epoch.as_secs()
- ))
- .await
- {
- Ok(value) => {
- match value {
- Some(value) => {
- debug!("incr: {}", value);
- if value >= 49 {
- return Ok(RatelimiterResponse::Ratelimited);
- }
- }
- None => {
- let key =
- format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs());
- // init global ratelimit
- redis.set_ex::<String, i32, ()>(key, 0, 2).await.unwrap();
- }
- }
- }
- Err(_) => {
- return Err(GenericError::StepFailed("radis ratelimit check".to_string()));
- }
- };
-
- // we lookup the corresponding bucket for this url
- match redis
- .get::<String, Option<String>>(format!("nova:rest:ratelimit:url_bucket:{}", hash))
- .await
- {
- Ok(bucket) => match bucket {
- Some(bucket) => {
- match redis
- .exists::<String, bool>(format!("nova:rest:ratelimit:lock:{}", bucket))
- .await
- {
- Ok(exists) => {
- if exists {
- Ok(RatelimiterResponse::Ratelimited)
- } else {
- Ok(RatelimiterResponse::Pass)
- }
- }
- Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())),
- }
- }
- None => Ok(RatelimiterResponse::NoSuchUrl),
- },
- Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())),
- }
- }
-
- fn parse_headers(&self, response: &Response<Body>) -> Option<(String, i32, i32)> {
- if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") {
- let bucket = bucket.to_str().unwrap().to_string();
-
- let remaining = response.headers().get("X-RateLimit-Remaining").unwrap();
- let reset = response.headers().get("X-RateLimit-Reset-After").unwrap();
-
- let remaining_i32 = remaining.to_str().unwrap().parse::<i32>().unwrap();
- let reset_ms_i32 = reset.to_str().unwrap().parse::<f32>().unwrap().ceil() as i32;
- return Some((bucket, remaining_i32, reset_ms_i32));
- } else {
- None
- }
- }
-
- pub async fn after_request(&self, path: &str, response: &Response<Body>) {
- let hash = xxh32(path.as_bytes(), 32);
- // verified earlier
-
- let mut redis = self.redis.lock().await;
-
- let start = SystemTime::now();
- let since_the_epoch = start
- .duration_since(UNIX_EPOCH)
- .expect("Time went backwards");
-
- redis
- .incr::<String, i32, ()>(
- format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()),
- 1,
- )
- .await
- .unwrap();
- if let Some((bucket, remaining, reset)) = self.parse_headers(response) {
- if remaining <= 1 {
- // we set a lock for the bucket until the timeout passes
- redis
- .set_ex::<String, bool, ()>(
- format!("nova:rest:ratelimit:lock:{}", bucket),
- true,
- reset.try_into().unwrap(),
- )
- .await
- .unwrap();
- }
-
- redis
- .set_ex::<String, String, ()>(
- format!("nova:rest:ratelimit:url_bucket:{}", hash),
- bucket,
- reset.try_into().unwrap(),
- )
- .await
- .unwrap();
- }
- }
-}
diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs
new file mode 100644
index 0000000..8263d15
--- /dev/null
+++ b/exes/rest/src/ratelimit_client/mod.rs
@@ -0,0 +1,137 @@
+use self::remote_hashring::{HashRingWrapper, VNode};
+use futures_util::Future;
+use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};
+use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest;
+use shared::log::debug;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::time::UNIX_EPOCH;
+use std::time::{Duration, SystemTime};
+use tokio::sync::oneshot::{self};
+use tokio::sync::{broadcast, mpsc, RwLock};
+use tokio_stream::wrappers::ReceiverStream;
+
+mod remote_hashring;
+
+#[derive(Clone, Debug)]
+pub struct RemoteRatelimiter {
+ remotes: Arc<RwLock<HashRingWrapper>>,
+ stop: Arc<tokio::sync::broadcast::Sender<()>>,
+}
+
+impl Drop for RemoteRatelimiter {
+ fn drop(&mut self) {
+ self.stop.clone().send(()).unwrap();
+ }
+}
+
+impl RemoteRatelimiter {
+ async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> {
+ // get list of dns responses
+ let responses = dns_lookup::lookup_host("ratelimit")
+ .unwrap()
+ .into_iter()
+ .map(|f| f.to_string());
+
+ let mut write = self.remotes.write().await;
+
+ for ip in responses {
+ let a = VNode::new(ip.into()).await?;
+ write.add(a.clone());
+ }
+
+ return Ok(());
+ }
+
+ #[must_use]
+ pub fn new() -> Self {
+ let (rx, mut tx) = broadcast::channel(1);
+ let obj = Self {
+ remotes: Arc::new(RwLock::new(HashRingWrapper::default())),
+ stop: Arc::new(rx),
+ };
+
+ let obj_clone = obj.clone();
+ // Task to update the ratelimiters in the background
+ tokio::spawn(async move {
+ loop {
+ let sleep = tokio::time::sleep(Duration::from_secs(10));
+ tokio::pin!(sleep);
+
+ debug!("refreshing");
+ obj_clone.get_ratelimiters().await.unwrap();
+ tokio::select! {
+ () = &mut sleep => {
+ println!("timer elapsed");
+ },
+ _ = tx.recv() => {}
+ }
+ }
+ });
+
+ obj
+ }
+
+ pub fn ticket(
+ &self,
+ path: String,
+ ) -> Pin<
+ Box<
+ dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>>
+ + Send
+ + 'static,
+ >,
+ > {
+ let remotes = self.remotes.clone();
+ let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
+
+ Box::pin(async move {
+ // Get node managing this path
+ let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
+
+ // Buffers for the gRPC streaming channel.
+ let (send, remote) = mpsc::channel(5);
+ let (do_request, wait) = oneshot::channel();
+ // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
+ let stream = ReceiverStream::new(remote);
+
+ // Start the grpc streaming
+ let ticket = node.submit_ticket(stream).await?;
+
+ // First, send the request
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Path(path)),
+ })
+ .await?;
+
+ // We continuously listen for events in the channel.
+ tokio::spawn(async move {
+ let message = ticket.into_inner().message().await.unwrap().unwrap();
+
+ if message.accepted == 1 {
+ do_request.send(()).unwrap();
+ let headers = rx.await.unwrap();
+
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Headers(Headers {
+ precise_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("time went backwards")
+ .as_millis() as u64,
+ headers,
+ })),
+ })
+ .await
+ .unwrap();
+ }
+ });
+
+ // Wait for the message to be sent
+ wait.await?;
+
+ Ok(tx)
+ })
+ }
+}
diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs
new file mode 100644
index 0000000..b9f7800
--- /dev/null
+++ b/exes/rest/src/ratelimit_client/remote_hashring.rs
@@ -0,0 +1,67 @@
+use core::fmt::Debug;
+use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient;
+use std::hash::Hash;
+use std::ops::Deref;
+use std::ops::DerefMut;
+use tonic::transport::Channel;
+
+#[derive(Debug, Clone)]
+pub struct VNode {
+ address: String,
+
+ client: RatelimiterClient<Channel>,
+}
+
+impl Deref for VNode {
+ type Target = RatelimiterClient<Channel>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.client
+ }
+}
+
+impl DerefMut for VNode {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.client
+ }
+}
+
+impl Hash for VNode {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.address.hash(state);
+ }
+}
+
+impl VNode {
+ pub async fn new(address: String) -> Result<Self, tonic::transport::Error> {
+ let client = RatelimiterClient::connect(format!("http://{}:8080", address.clone())).await?;
+
+ Ok(VNode { client, address })
+ }
+}
+
+unsafe impl Send for VNode {}
+
+#[repr(transparent)]
+#[derive(Default)]
+pub struct HashRingWrapper(hashring::HashRing<VNode>);
+
+impl Deref for HashRingWrapper {
+ type Target = hashring::HashRing<VNode>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for HashRingWrapper {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+impl Debug for HashRingWrapper {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_tuple("HashRing").finish()
+ }
+}