diff options
| author | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-08 21:56:15 +0400 | 
|---|---|---|
| committer | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-08 21:56:15 +0400 | 
| commit | c925e01d4a96d88d9cbbe4f3d55aceaf5bff017f (patch) | |
| tree | c3129cc8f71c342385b085dbcc48a2bf7914d2c1 /exes/rest | |
| parent | 19dbd364caf8f3d701a17a06846716dcf6765e58 (diff) | |
| parent | 038add4d5e8465f8bb36f1a1fa5817a02cab833b (diff) | |
merge conflicts
Diffstat (limited to 'exes/rest')
| -rw-r--r-- | exes/rest/Cargo.toml | 15 | ||||
| -rw-r--r-- | exes/rest/src/config.rs | 7 | ||||
| -rw-r--r-- | exes/rest/src/handler.rs | 44 | ||||
| -rw-r--r-- | exes/rest/src/lib.rs | 8 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/mod.rs | 130 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/remote_hashring.rs | 16 | 
6 files changed, 132 insertions, 88 deletions
diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml index f28018e..5583416 100644 --- a/exes/rest/Cargo.toml +++ b/exes/rest/Cargo.toml @@ -10,20 +10,23 @@ shared = { path = "../../libs/shared" }  proto = { path = "../../libs/proto" }  leash = { path = "../../libs/leash" } -hyper = { version = "0.14", features = ["full"] } -tokio = { version = "1", features = ["full"] } +hyper= "0.14" +http = "0.2.8" + +tokio = { version = "1", features = ["rt"] }  serde = { version = "1.0.8", features = ["derive"] }  futures-util = "0.3.17"  hyper-rustls = "0.23.2"  lazy_static = "1.4.0"  xxhash-rust = { version = "0.8.2", features = ["xxh32"] }  twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" } +  tracing = "0.1.37" +anyhow = "1.0.68"  hashring = "0.3.0" -anyhow = "*"  tonic = "0.8.3" -serde_json = { version = "1.0" } -http = "0.2.8"  tokio-stream = "0.1.11"  dns-lookup = "1.0.8" -tokio-scoped = "0.2.0"
\ No newline at end of file +opentelemetry = "0.18.0" +opentelemetry-http = "0.7.0" +tracing-opentelemetry = "0.18.0"
\ No newline at end of file diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 4e27a30..3bfe8db 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -1,5 +1,5 @@ -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};  use serde::Deserialize; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};  fn default_listening_address() -> SocketAddr {      SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8090)) @@ -7,8 +7,7 @@ fn default_listening_address() -> SocketAddr {  #[derive(Debug, Deserialize, Clone)]  pub struct ServerSettings { -    #[serde(default = "default_listening_address")] -    pub listening_adress: SocketAddr +    pub listening_adress: SocketAddr,  }  impl Default for ServerSettings {      fn default() -> Self { @@ -20,7 +19,7 @@ impl Default for ServerSettings {  #[derive(Debug, Deserialize, Clone, Default)]  pub struct Discord { -    pub token: String +    pub token: String,  }  #[derive(Debug, Deserialize, Clone, Default)] diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index 8b9463f..6d58dce 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -1,11 +1,3 @@ -use std::{ -    collections::hash_map::DefaultHasher, -    convert::TryFrom, -    hash::{Hash, Hasher}, -    str::FromStr, -    time::Instant, -}; -  use anyhow::bail;  use http::{      header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE}, @@ -13,7 +5,13 @@ use http::{  };  use hyper::{client::HttpConnector, Body, Client};  use hyper_rustls::HttpsConnector; -use shared::log::error; +use std::{ +    collections::hash_map::DefaultHasher, +    convert::TryFrom, +    hash::{Hash, Hasher}, +    str::FromStr, +}; +use tracing::{debug_span, error, instrument, Instrument};  use twilight_http_ratelimiting::{Method, Path};  use crate::ratelimit_client::RemoteRatelimiter; @@ -36,6 +34,7 @@ fn normalize_path(request_path: &str) -> (&str, &str) {      }  } +#[instrument]  pub async fn handle_request(      client: Client<HttpsConnector<HttpConnector>, Body>,      ratelimiter: RemoteRatelimiter, @@ -72,7 +71,7 @@ pub async fn handle_request(                      "Failed to parse path for {:?} {}: {:?}",                      method, trimmed_path, e                  ); -                bail!("failed o parse"); +                bail!("failed to parse");              }          }          .hash(&mut hash); @@ -80,15 +79,17 @@ pub async fn handle_request(          (hash.finish().to_string(), uri_string)      }; -    let start_ticket_request = Instant::now(); -    let header_sender = match ratelimiter.ticket(hash).await { +    let span = debug_span!("ticket validation request"); +    let header_sender = match span +        .in_scope(|| ratelimiter.ticket(hash)) +        .await +    {          Ok(sender) => sender,          Err(e) => {              error!("Failed to receive ticket for ratelimiting: {:?}", e);              bail!("failed to reteive ticket");          }      }; -    let time_took_ticket = Instant::now() - start_ticket_request;      request          .headers_mut() @@ -101,7 +102,7 @@ pub async fn handle_request(      request.headers_mut().remove("proxy-connection");      request.headers_mut().remove(TRANSFER_ENCODING);      request.headers_mut().remove(UPGRADE); -     +      if let Some(auth) = request.headers_mut().get_mut(AUTHORIZATION) {          if auth              .to_str() @@ -125,25 +126,14 @@ pub async fn handle_request(          }      };      *request.uri_mut() = uri; - -    let start_upstream_req = Instant::now(); -    let mut resp = match client.request(request).await { +    let span = debug_span!("upstream request to discord"); +    let resp = match client.request(request).instrument(span).await {          Ok(response) => response,          Err(e) => {              error!("Error when requesting the Discord API: {:?}", e);              bail!("failed to request the discord api");          }      }; -    let upstream_time_took = Instant::now() - start_upstream_req; - -    resp.headers_mut().append( -        "X-TicketRequest-Ms", -        HeaderValue::from_str(&time_took_ticket.as_millis().to_string()).unwrap(), -    ); -    resp.headers_mut().append( -        "X-Upstream-Ms", -        HeaderValue::from_str(&upstream_time_took.as_millis().to_string()).unwrap(), -    );      let ratelimit_headers = resp          .headers() diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs index 02196f4..0910d5e 100644 --- a/exes/rest/src/lib.rs +++ b/exes/rest/src/lib.rs @@ -7,6 +7,8 @@ use hyper::{      Body, Client, Request, Server,  };  use leash::{AnyhowResultFuture, Component}; +use opentelemetry::{global, trace::{Tracer}}; +use opentelemetry_http::HeaderExtractor;  use shared::config::Settings;  use std::{convert::Infallible, sync::Arc};  use tokio::sync::oneshot; @@ -42,6 +44,12 @@ impl Component for ReverseProxyServer {                  let token = token.clone();                  async move {                      Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { +                        let parent_cx = global::get_text_map_propagator(|propagator| { +                            propagator.extract(&HeaderExtractor(request.headers())) +                        }); +                        let _span = global::tracer("") +                            .start_with_context("handle_request", &parent_cx); +                          let client = client.clone();                          let ratelimiter = ratelimiter.clone();                          let token = token.clone(); diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index ea34ad9..7493af9 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -1,10 +1,10 @@ -use self::remote_hashring::{HashRingWrapper, VNode}; -use futures_util::Future; +use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode}; +use opentelemetry::global;  use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};  use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest; -use shared::log::debug;  use std::collections::HashMap;  use std::fmt::Debug; +use std::future::Future;  use std::pin::Pin;  use std::sync::Arc;  use std::time::UNIX_EPOCH; @@ -12,6 +12,9 @@ use std::time::{Duration, SystemTime};  use tokio::sync::oneshot::{self};  use tokio::sync::{broadcast, mpsc, RwLock};  use tokio_stream::wrappers::ReceiverStream; +use tonic::Request; +use tracing::{debug, debug_span, Instrument, Span, instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt;  mod remote_hashring; @@ -45,7 +48,7 @@ impl RemoteRatelimiter {          let mut write = self.remotes.write().await; -        for ip in ["localhost"] { +        for ip in ["ratelimit"] {              let a = VNode::new(ip.into()).await?;              write.add(a.clone());          } @@ -82,55 +85,80 @@ impl RemoteRatelimiter {          obj      } +    #[instrument(name = "ticket task")]      pub fn ticket(&self, path: String) -> IssueTicket {          let remotes = self.remotes.clone();          let (tx, rx) = oneshot::channel::<HashMap<String, String>>(); - -        Box::pin(async move { -            // Get node managing this path -            let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); - -            // Buffers for the gRPC streaming channel. -            let (send, remote) = mpsc::channel(5); -            let (do_request, wait) = oneshot::channel(); -            // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it -            let stream = ReceiverStream::new(remote); - -            // Start the grpc streaming -            let ticket = node.submit_ticket(stream).await?; - -            // First, send the request -            send.send(BucketSubmitTicketRequest { -                data: Some(Data::Path(path)), -            }) -            .await?; - -            // We continuously listen for events in the channel. -            tokio::spawn(async move { -                let message = ticket.into_inner().message().await.unwrap().unwrap(); - -                if message.accepted == 1 { -                    do_request.send(()).unwrap(); -                    let headers = rx.await.unwrap(); - -                    send.send(BucketSubmitTicketRequest { -                        data: Some(Data::Headers(Headers { -                            precise_time: SystemTime::now() -                                .duration_since(UNIX_EPOCH) -                                .expect("time went backwards") -                                .as_millis() as u64, -                            headers, -                        })), -                    }) -                    .await -                    .unwrap(); -                } -            }); - -            // Wait for the message to be sent -            wait.await?; - -            Ok(tx) -        }) +        Box::pin( +            async move { +                // Get node managing this path +                let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); + +                // Buffers for the gRPC streaming channel. +                let (send, remote) = mpsc::channel(5); +                let (do_request, wait) = oneshot::channel(); +                // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it +                let stream = ReceiverStream::new(remote); + +                let mut request = Request::new(stream); + +                let span = debug_span!("remote request"); +                let context = span.context(); +                global::get_text_map_propagator(|propagator| { +                    propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())) +                }); + +                // Start the grpc streaming +                let ticket = node.submit_ticket(request).await?; + +                // First, send the request +                send.send(BucketSubmitTicketRequest { +                    data: Some(Data::Path(path)), +                }) +                .await?; + +                // We continuously listen for events in the channel. +                let span = debug_span!("stream worker"); +                tokio::spawn( +                    async move { +                        let span = debug_span!("waiting for ticket upstream"); +                        let message = ticket +                            .into_inner() +                            .message() +                            .instrument(span) +                            .await +                            .unwrap() +                            .unwrap(); + +                        if message.accepted == 1 { +                            debug!("request ticket was accepted"); +                            do_request.send(()).unwrap(); +                            let span = debug_span!("waiting for response headers"); +                            let headers = rx.instrument(span).await.unwrap(); + +                            send.send(BucketSubmitTicketRequest { +                                data: Some(Data::Headers(Headers { +                                    precise_time: SystemTime::now() +                                        .duration_since(UNIX_EPOCH) +                                        .expect("time went backwards") +                                        .as_millis() +                                        as u64, +                                    headers, +                                })), +                            }) +                            .await +                            .unwrap(); +                        } +                    } +                    .instrument(span), +                ); + +                // Wait for the message to be sent +                wait.await?; + +                Ok(tx) +            } +            .instrument(Span::current()), +        )      }  } diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs index 4e3fa06..d1c1702 100644 --- a/exes/rest/src/ratelimit_client/remote_hashring.rs +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -1,4 +1,6 @@  use core::fmt::Debug; +use std::convert::TryFrom; +use opentelemetry::propagation::Injector;  use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient;  use std::hash::Hash;  use std::ops::Deref; @@ -32,6 +34,20 @@ impl Hash for VNode {      }  } +pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap); + +impl<'a> Injector for MetadataMap<'a> { +    /// Set a key and value in the MetadataMap.  Does nothing if the key or value are not valid inputs +    fn set(&mut self, key: &str, value: String) { +        if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { +            if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { +                self.0.insert(key, val); +            } +        } +    } +} + +  impl VNode {      pub async fn new(address: String) -> Result<Self, tonic::transport::Error> {          let client = RatelimiterClient::connect(format!("http://{}:8093", address.clone())).await?;  | 
