diff options
| author | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-02 18:59:03 +0400 | 
|---|---|---|
| committer | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-02 18:59:03 +0400 | 
| commit | f8c2a144e2f3e47371f5e8352e7a7a0b6707bf88 (patch) | |
| tree | 8c1e6bd157ac599429c806f9aa9bc9dbc28140ed /exes/rest | |
| parent | 46fd26962ef55f8b557f7e36d3aee915a819c88c (diff) | |
restructure project
Diffstat (limited to 'exes/rest')
| -rw-r--r-- | exes/rest/Cargo.toml | 15 | ||||
| -rw-r--r-- | exes/rest/src/config.rs | 20 | ||||
| -rw-r--r-- | exes/rest/src/handler.rs | 141 | ||||
| -rw-r--r-- | exes/rest/src/main.rs | 90 | ||||
| -rw-r--r-- | exes/rest/src/proxy/mod.rs | 138 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit/mod.rs | 155 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/mod.rs | 137 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/remote_hashring.rs | 67 | 
8 files changed, 425 insertions, 338 deletions
diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml index 7b5b2b5..f4c5ecc 100644 --- a/exes/rest/Cargo.toml +++ b/exes/rest/Cargo.toml @@ -7,10 +7,23 @@ edition = "2018"  [dependencies]  shared = { path = "../../libs/shared" } +proto = { path = "../../libs/proto" } +leash = { path = "../../libs/leash" } +  hyper = { version = "0.14", features = ["full"] }  tokio = { version = "1", features = ["full"] }  serde = { version = "1.0.8", features = ["derive"] }  futures-util = "0.3.17"  hyper-tls = "0.5.0"  lazy_static = "1.4.0" -xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
\ No newline at end of file +xxhash-rust = { version = "0.8.2", features = ["xxh32"] } +twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" } +tracing = "0.1.37" +hashring = "0.3.0" +anyhow = "*" +tonic = "0.8.3" +serde_json = { version = "1.0" } +http = "0.2.8" +tokio-stream = "0.1.11" +dns-lookup = "1.0.8" +tokio-scoped = "0.2.0"
\ No newline at end of file diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 559929f..9261de2 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -1,9 +1,21 @@ +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};  use serde::Deserialize; -#[derive(Debug, Deserialize, Clone, Default)] +fn default_listening_address() -> SocketAddr { +    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)) +} + +#[derive(Debug, Deserialize, Clone)]  pub struct ServerSettings { -    pub port: u16, -    pub address: String, +    #[serde(default = "default_listening_address")] +    pub listening_adress: SocketAddr +} +impl Default for ServerSettings { +    fn default() -> Self { +        Self { +            listening_adress: default_listening_address(), +        } +    }  }  #[derive(Debug, Deserialize, Clone, Default)] @@ -12,7 +24,7 @@ pub struct Discord {  }  #[derive(Debug, Deserialize, Clone, Default)] -pub struct Config { +pub struct ReverseProxyConfig {      pub server: ServerSettings,      pub discord: Discord,  } diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs new file mode 100644 index 0000000..8b0dd52 --- /dev/null +++ b/exes/rest/src/handler.rs @@ -0,0 +1,141 @@ +use std::{ +    collections::hash_map::DefaultHasher, +    convert::TryFrom, +    hash::{Hash, Hasher}, +    str::FromStr, +}; + +use anyhow::bail; +use http::{ +    header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE}, +    HeaderValue, Method as HttpMethod, Request, Response, Uri, +}; +use hyper::{client::HttpConnector, Body, Client}; +use hyper_tls::HttpsConnector; +use shared::log::error; +use twilight_http_ratelimiting::{Method, Path}; + +use crate::ratelimit_client::RemoteRatelimiter; + +/// Normalizes the path +fn normalize_path(request_path: &str) -> (&str, &str) { +    if let Some(trimmed_path) = request_path.strip_prefix("/api") { +        if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) { +            if let Some(version_number) = maybe_api_version.strip_prefix('v') { +                if version_number.parse::<u8>().is_ok() { +                    let len = "/api/v".len() + version_number.len(); +                    return (&request_path[..len], &request_path[len..]); +                }; +            }; +        } + +        ("/api", trimmed_path) +    } else { +        ("/api", request_path) +    } +} + +pub async fn handle_request( +    client: Client<HttpsConnector<HttpConnector>, Body>, +    ratelimiter: RemoteRatelimiter, +    token: String, +    mut request: Request<Body>, +) -> Result<Response<Body>, anyhow::Error> { +    let (hash, uri_string) = { +        let method = match *request.method() { +            HttpMethod::DELETE => Method::Delete, +            HttpMethod::GET => Method::Get, +            HttpMethod::PATCH => Method::Patch, +            HttpMethod::POST => Method::Post, +            HttpMethod::PUT => Method::Put, +            _ => { +                error!("Unsupported HTTP method in request, {}", request.method()); +                bail!("unsupported method"); +            } +        }; + +        let request_path = request.uri().path(); +        let (api_path, trimmed_path) = normalize_path(&request_path); + +        let mut uri_string = format!("http://192.168.0.27:8000{}{}", api_path, trimmed_path); +        if let Some(query) = request.uri().query() { +            uri_string.push('?'); +            uri_string.push_str(query); +        } + +        let mut hash = DefaultHasher::new(); +        match Path::try_from((method, trimmed_path)) { +            Ok(path) => path, +            Err(e) => { +                error!( +                    "Failed to parse path for {:?} {}: {:?}", +                    method, trimmed_path, e +                ); +                bail!("failed o parse"); +            } +        } +        .hash(&mut hash); + +        (hash.finish().to_string(), uri_string) +    }; + +    let header_sender = match ratelimiter.ticket(hash).await { +        Ok(sender) => sender, +        Err(e) => { +            error!("Failed to receive ticket for ratelimiting: {:?}", e); +            bail!("failed to reteive ticket"); +        } +    }; + +    request.headers_mut().insert( +        AUTHORIZATION, +        HeaderValue::from_bytes(token.as_bytes()) +            .expect("strings are guaranteed to be valid utf-8"), +    ); +    request +        .headers_mut() +        .insert(HOST, HeaderValue::from_static("discord.com")); + +    // Remove forbidden HTTP/2 headers +    // https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2 +    request.headers_mut().remove(CONNECTION); +    request.headers_mut().remove("keep-alive"); +    request.headers_mut().remove("proxy-connection"); +    request.headers_mut().remove(TRANSFER_ENCODING); +    request.headers_mut().remove(UPGRADE); +    request.headers_mut().remove(AUTHORIZATION); +    request.headers_mut().append( +        AUTHORIZATION, +        HeaderValue::from_static( +            "Bot ODA3MTg4MzM1NzE3Mzg0MjEy.G3sXFM.8gY2sVYDAq2WuPWwDskAAEFLfTg8htooxME-LE", +        ), +    ); + +    let uri = match Uri::from_str(&uri_string) { +        Ok(uri) => uri, +        Err(e) => { +            error!("Failed to create URI for requesting Discord API: {:?}", e); +            bail!("failed to create uri"); +        } +    }; +    *request.uri_mut() = uri; +    let resp = match client.request(request).await { +        Ok(response) => response, +        Err(e) => { +            error!("Error when requesting the Discord API: {:?}", e); +            bail!("failed to request the discord api"); +        } +    }; + +    let ratelimit_headers = resp +        .headers() +        .into_iter() +        .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string())) +        .collect(); + +    if header_sender.send(ratelimit_headers).is_err() { +        error!("Error when sending ratelimit headers to ratelimiter"); +    }; + +    Ok(resp) +} diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs index 9fa6ce7..8d014ab 100644 --- a/exes/rest/src/main.rs +++ b/exes/rest/src/main.rs @@ -1,46 +1,56 @@ -use std::{convert::Infallible, sync::Arc}; +use config::ReverseProxyConfig; -use crate::{config::Config, ratelimit::Ratelimiter}; -use shared::{ -    config::Settings, -    log::{error, info}, -    redis_crate::Client, +use handler::handle_request; +use hyper::{ +    server::conn::AddrStream, +    service::{make_service_fn, service_fn}, +    Body, Client, Request, Server,  }; -use hyper::{server::conn::AddrStream, service::make_service_fn, Server}; -use std::net::ToSocketAddrs; -use tokio::sync::Mutex; - -use crate::proxy::ServiceProxy; +use hyper_tls::HttpsConnector; +use leash::{ignite, AnyhowResultFuture, Component}; +use shared::config::Settings; +use std::convert::Infallible;  mod config; -mod proxy; -mod ratelimit; - -#[tokio::main] -async fn main() { -    let settings: Settings<Config> = Settings::new("rest").unwrap(); -    let config = Arc::new(settings.config); -    let redis_client: Client = settings.redis.into(); -    let redis = Arc::new(Mutex::new( -        redis_client.get_async_connection().await.unwrap(), -    )); -    let ratelimiter = Arc::new(Ratelimiter::new(redis)); - -    let addr = format!("{}:{}", config.server.address, config.server.port) -        .to_socket_addrs() -        .unwrap() -        .next() -        .unwrap(); - -    let service_fn = make_service_fn(move |_: &AddrStream| { -        let service_proxy = ServiceProxy::new(config.clone(), ratelimiter.clone()); -        async move { Ok::<_, Infallible>(service_proxy) } -    }); - -    let server = Server::bind(&addr).serve(service_fn); - -    info!("starting ratelimit server"); -    if let Err(e) = server.await { -        error!("server error: {}", e); +mod handler; +mod ratelimit_client; + +struct ReverseProxyServer {} +impl Component for ReverseProxyServer { +    type Config = ReverseProxyConfig; +    const SERVICE_NAME: &'static str = "rest"; + +    fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> { +        Box::pin(async move { +            // Client to the remote ratelimiters +            let ratelimiter = ratelimit_client::RemoteRatelimiter::new(); +            let client = Client::builder().build(HttpsConnector::new()); + +            let service_fn = make_service_fn(move |_: &AddrStream| { +                let client = client.clone(); +                let ratelimiter = ratelimiter.clone(); +                async move { +                    Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { +                        let client = client.clone(); +                        let ratelimiter = ratelimiter.clone(); +                        async move { +                            handle_request(client, ratelimiter, "token".to_string(), request).await +                        } +                    })) +                } +            }); + +            let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn); + +            server.await?; + +            Ok(()) +        }) +    } + +    fn new() -> Self { +        Self {}      }  } + +ignite!(ReverseProxyServer); diff --git a/exes/rest/src/proxy/mod.rs b/exes/rest/src/proxy/mod.rs deleted file mode 100644 index 65d77aa..0000000 --- a/exes/rest/src/proxy/mod.rs +++ /dev/null @@ -1,138 +0,0 @@ -use crate::{config::Config, ratelimit::Ratelimiter}; -use hyper::{ -    client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client, -    Request, Response, Uri, -}; -use hyper_tls::HttpsConnector; -use shared::{ -    log::debug, -    prometheus::{labels, opts, register_counter, register_histogram_vec, Counter, HistogramVec}, -}; -use std::{future::Future, pin::Pin, sync::Arc, task::Poll}; -use tokio::sync::Mutex; - -lazy_static::lazy_static! { -    static ref HTTP_COUNTER: Counter = register_counter!(opts!( -        "nova_rest_http_requests_total", -        "Number of HTTP requests made.", -        labels! {"handler" => "all",} -    )) -    .unwrap(); - -    static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!( -        "nova_rest_http_request_duration_seconds", -        "The HTTP request latencies in seconds.", -        &["handler"] -    ) -    .unwrap(); - -    static ref HTTP_COUNTER_STATUS: Counter = register_counter!(opts!( -        "nova_rest_http_requests_status", -        "Number of HTTP requests made by status", -        labels! {"" => ""} -    )) -    .unwrap(); -} - -#[derive(Clone)] -pub struct ServiceProxy { -    client: Client<HttpsConnector<HttpConnector>>, -    ratelimiter: Arc<Ratelimiter>, -    config: Arc<Config>, -    fail: Arc<Mutex<i32>>, -} - -impl Service<Request<Body>> for ServiceProxy { -    type Response = Response<Body>; -    type Error = hyper::Error; -    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; - -    fn poll_ready( -        &mut self, -        cx: &mut std::task::Context<'_>, -    ) -> std::task::Poll<Result<(), Self::Error>> { -        match self.client.poll_ready(cx) { -            Poll::Ready(Ok(())) => Poll::Ready(Ok(())), -            Poll::Ready(Err(e)) => Poll::Ready(Err(e)), -            Poll::Pending => Poll::Pending, -        } -    } - -    fn call(&mut self, mut req: Request<hyper::Body>) -> Self::Future { -        HTTP_COUNTER.inc(); - -        let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer(); -        let host = "discord.com"; -        let mut new_parts = Parts::default(); - -        let path = req.uri().path().to_string(); - -        new_parts.scheme = Some("https".parse().unwrap()); -        new_parts.authority = Some(host.parse().unwrap()); -        new_parts.path_and_query = Some(path.parse().unwrap()); - -        *req.uri_mut() = Uri::from_parts(new_parts).unwrap(); - -        let headers = req.headers_mut(); -        headers.remove("user-agent"); -        headers.insert("Host", HeaderValue::from_str("discord.com").unwrap()); -        headers.insert( -            "Authorization", -            HeaderValue::from_str(&format!("Bot {}", self.config.discord.token)).unwrap(), -        ); - -        println!("{:?}", headers); - -        let client = self.client.clone(); -        let ratelimiter = self.ratelimiter.clone(); -        let fail = self.fail.clone(); - -        return Box::pin(async move { -            let resp = match ratelimiter.before_request(&req).await { -                Ok(allowed) => match allowed { -                    crate::ratelimit::RatelimiterResponse::Ratelimited => { -                        debug!("ratelimited"); -                        Ok(Response::builder().body("ratelimited".into()).unwrap()) -                    } -                    _ => { -                        debug!("forwarding request"); -                        match client.request(req).await { -                            Ok(mut response) => { -                                ratelimiter.after_request(&path, &response).await; -                                if response.status() != 200 { -                                    *fail.lock().await += 1 -                                } -                                response.headers_mut().insert( -                                    "x-fails", -                                    HeaderValue::from_str(&format!("{}", fail.lock().await)) -                                        .unwrap(), -                                ); -                                Ok(response) -                            } -                            Err(e) => Err(e), -                        } -                    } -                }, -                Err(e) => Ok(Response::builder() -                    .body(format!("server error: {}", e).into()) -                    .unwrap()), -            }; -            timer.observe_duration(); -            resp -        }); -    } -} - -impl ServiceProxy { -    pub fn new(config: Arc<Config>, ratelimiter: Arc<Ratelimiter>) -> Self { -        let https = HttpsConnector::new(); -        let client = Client::builder().build::<_, hyper::Body>(https); -        let fail = Arc::new(Mutex::new(0)); -        ServiceProxy { -            client, -            config, -            ratelimiter, -            fail, -        } -    } -} diff --git a/exes/rest/src/ratelimit/mod.rs b/exes/rest/src/ratelimit/mod.rs deleted file mode 100644 index 132bfd3..0000000 --- a/exes/rest/src/ratelimit/mod.rs +++ /dev/null @@ -1,155 +0,0 @@ -use shared::{ -    log::debug, -    redis_crate::{aio::Connection, AsyncCommands}, error::GenericError, -}; -use hyper::{Body, Request, Response}; -use std::{ -    convert::TryInto, -    sync::Arc, -    time::{SystemTime, UNIX_EPOCH}, -}; -use tokio::sync::Mutex; -use xxhash_rust::xxh32::xxh32; - -pub enum RatelimiterResponse { -    NoSuchUrl, -    Ratelimited, -    Pass, -} - -pub struct Ratelimiter { -    redis: Arc<Mutex<Connection>>, -} - -impl Ratelimiter { -    pub fn new(redis: Arc<Mutex<Connection>>) -> Ratelimiter { -        return Ratelimiter { redis }; -    } - -    pub async fn before_request( -        &self, -        request: &Request<Body>, -    ) -> Result<RatelimiterResponse, GenericError> { -        // we lookup if the route hash is stored in the redis table -        let path = request.uri().path(); -        let hash = xxh32(path.as_bytes(), 32); -        let mut redis = self.redis.lock().await; - -        let start = SystemTime::now(); -        let since_the_epoch = start -            .duration_since(UNIX_EPOCH) -            .expect("Time went backwards"); - -        // global rate litmit -        match redis -            .get::<String, Option<i32>>(format!( -                "nova:rest:ratelimit:global:{}", -                since_the_epoch.as_secs() -            )) -            .await -        { -            Ok(value) => { -                match value { -                    Some(value) => { -                        debug!("incr: {}", value); -                        if value >= 49 { -                            return Ok(RatelimiterResponse::Ratelimited); -                        } -                    } -                    None => { -                        let key = -                            format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()); -                        // init global ratelimit -                        redis.set_ex::<String, i32, ()>(key, 0, 2).await.unwrap(); -                    } -                } -            } -            Err(_) => { -                return Err(GenericError::StepFailed("radis ratelimit check".to_string())); -            } -        }; - -        // we lookup the corresponding bucket for this url -        match redis -            .get::<String, Option<String>>(format!("nova:rest:ratelimit:url_bucket:{}", hash)) -            .await -        { -            Ok(bucket) => match bucket { -                Some(bucket) => { -                    match redis -                        .exists::<String, bool>(format!("nova:rest:ratelimit:lock:{}", bucket)) -                        .await -                    { -                        Ok(exists) => { -                            if exists { -                                Ok(RatelimiterResponse::Ratelimited) -                            } else { -                                Ok(RatelimiterResponse::Pass) -                            } -                        } -                        Err(_) =>  Err(GenericError::StepFailed("radis ratelimit check".to_string())), -                    } -                } -                None => Ok(RatelimiterResponse::NoSuchUrl), -            }, -            Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())), -        } -    } - -    fn parse_headers(&self, response: &Response<Body>) -> Option<(String, i32, i32)> { -        if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") { -            let bucket = bucket.to_str().unwrap().to_string(); - -            let remaining = response.headers().get("X-RateLimit-Remaining").unwrap(); -            let reset = response.headers().get("X-RateLimit-Reset-After").unwrap(); - -            let remaining_i32 = remaining.to_str().unwrap().parse::<i32>().unwrap(); -            let reset_ms_i32 = reset.to_str().unwrap().parse::<f32>().unwrap().ceil() as i32; -            return Some((bucket, remaining_i32, reset_ms_i32)); -        } else { -            None -        } -    } - -    pub async fn after_request(&self, path: &str, response: &Response<Body>) { -        let hash = xxh32(path.as_bytes(), 32); -        // verified earlier - -        let mut redis = self.redis.lock().await; - -        let start = SystemTime::now(); -        let since_the_epoch = start -            .duration_since(UNIX_EPOCH) -            .expect("Time went backwards"); - -        redis -            .incr::<String, i32, ()>( -                format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()), -                1, -            ) -            .await -            .unwrap(); -        if let Some((bucket, remaining, reset)) = self.parse_headers(response) { -            if remaining <= 1 { -                // we set a lock for the bucket until the timeout passes -                redis -                    .set_ex::<String, bool, ()>( -                        format!("nova:rest:ratelimit:lock:{}", bucket), -                        true, -                        reset.try_into().unwrap(), -                    ) -                    .await -                    .unwrap(); -            } - -            redis -                .set_ex::<String, String, ()>( -                    format!("nova:rest:ratelimit:url_bucket:{}", hash), -                    bucket, -                    reset.try_into().unwrap(), -                ) -                .await -                .unwrap(); -        } -    } -} diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs new file mode 100644 index 0000000..8263d15 --- /dev/null +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -0,0 +1,137 @@ +use self::remote_hashring::{HashRingWrapper, VNode}; +use futures_util::Future; +use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers}; +use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest; +use shared::log::debug; +use std::collections::HashMap; +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::Arc; +use std::time::UNIX_EPOCH; +use std::time::{Duration, SystemTime}; +use tokio::sync::oneshot::{self}; +use tokio::sync::{broadcast, mpsc, RwLock}; +use tokio_stream::wrappers::ReceiverStream; + +mod remote_hashring; + +#[derive(Clone, Debug)] +pub struct RemoteRatelimiter { +    remotes: Arc<RwLock<HashRingWrapper>>, +    stop: Arc<tokio::sync::broadcast::Sender<()>>, +} + +impl Drop for RemoteRatelimiter { +    fn drop(&mut self) { +        self.stop.clone().send(()).unwrap(); +    } +} + +impl RemoteRatelimiter { +    async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> { +        // get list of dns responses +        let responses = dns_lookup::lookup_host("ratelimit") +            .unwrap() +            .into_iter() +            .map(|f| f.to_string()); + +        let mut write = self.remotes.write().await; + +        for ip in responses { +            let a = VNode::new(ip.into()).await?; +            write.add(a.clone()); +        } + +        return Ok(()); +    } + +    #[must_use] +    pub fn new() -> Self { +        let (rx, mut tx) = broadcast::channel(1); +        let obj = Self { +            remotes: Arc::new(RwLock::new(HashRingWrapper::default())), +            stop: Arc::new(rx), +        }; + +        let obj_clone = obj.clone(); +        // Task to update the ratelimiters in the background +        tokio::spawn(async move { +            loop { +                let sleep = tokio::time::sleep(Duration::from_secs(10)); +                tokio::pin!(sleep); + +                debug!("refreshing"); +                obj_clone.get_ratelimiters().await.unwrap(); +                tokio::select! { +                    () = &mut sleep => { +                        println!("timer elapsed"); +                    }, +                    _ = tx.recv() => {} +                } +            } +        }); + +        obj +    } + +    pub fn ticket( +        &self, +        path: String, +    ) -> Pin< +        Box< +            dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>> +                + Send +                + 'static, +        >, +    > { +        let remotes = self.remotes.clone(); +        let (tx, rx) = oneshot::channel::<HashMap<String, String>>(); + +        Box::pin(async move { +            // Get node managing this path +            let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); + +            // Buffers for the gRPC streaming channel. +            let (send, remote) = mpsc::channel(5); +            let (do_request, wait) = oneshot::channel(); +            // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it +            let stream = ReceiverStream::new(remote); + +            // Start the grpc streaming +            let ticket = node.submit_ticket(stream).await?; + +            // First, send the request +            send.send(BucketSubmitTicketRequest { +                data: Some(Data::Path(path)), +            }) +            .await?; + +            // We continuously listen for events in the channel. +            tokio::spawn(async move { +                let message = ticket.into_inner().message().await.unwrap().unwrap(); + +                if message.accepted == 1 { +                    do_request.send(()).unwrap(); +                    let headers = rx.await.unwrap(); + +                    send.send(BucketSubmitTicketRequest { +                        data: Some(Data::Headers(Headers { +                            precise_time: SystemTime::now() +                                .duration_since(UNIX_EPOCH) +                                .expect("time went backwards") +                                .as_millis() as u64, +                            headers, +                        })), +                    }) +                    .await +                    .unwrap(); +                } +            }); + +            // Wait for the message to be sent +            wait.await?; + +            Ok(tx) +        }) +    } +} diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs new file mode 100644 index 0000000..b9f7800 --- /dev/null +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -0,0 +1,67 @@ +use core::fmt::Debug; +use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient; +use std::hash::Hash; +use std::ops::Deref; +use std::ops::DerefMut; +use tonic::transport::Channel; + +#[derive(Debug, Clone)] +pub struct VNode { +    address: String, + +    client: RatelimiterClient<Channel>, +} + +impl Deref for VNode { +    type Target = RatelimiterClient<Channel>; + +    fn deref(&self) -> &Self::Target { +        &self.client +    } +} + +impl DerefMut for VNode { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.client +    } +} + +impl Hash for VNode { +    fn hash<H: std::hash::Hasher>(&self, state: &mut H) { +        self.address.hash(state); +    } +} + +impl VNode { +    pub async fn new(address: String) -> Result<Self, tonic::transport::Error> { +        let client = RatelimiterClient::connect(format!("http://{}:8080", address.clone())).await?; + +        Ok(VNode { client, address }) +    } +} + +unsafe impl Send for VNode {} + +#[repr(transparent)] +#[derive(Default)] +pub struct HashRingWrapper(hashring::HashRing<VNode>); + +impl Deref for HashRingWrapper { +    type Target = hashring::HashRing<VNode>; + +    fn deref(&self) -> &Self::Target { +        &self.0 +    } +} + +impl DerefMut for HashRingWrapper { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.0 +    } +} + +impl Debug for HashRingWrapper { +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +        f.debug_tuple("HashRing").finish() +    } +}  | 
