diff options
Diffstat (limited to 'exes/rest/src')
| -rw-r--r-- | exes/rest/src/config.rs | 7 | ||||
| -rw-r--r-- | exes/rest/src/handler.rs | 51 | ||||
| -rw-r--r-- | exes/rest/src/lib.rs | 10 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/mod.rs | 130 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/remote_hashring.rs | 16 |
5 files changed, 125 insertions, 89 deletions
diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 4e27a30..3bfe8db 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -1,5 +1,5 @@ -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use serde::Deserialize; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; fn default_listening_address() -> SocketAddr { SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8090)) @@ -7,8 +7,7 @@ fn default_listening_address() -> SocketAddr { #[derive(Debug, Deserialize, Clone)] pub struct ServerSettings { - #[serde(default = "default_listening_address")] - pub listening_adress: SocketAddr + pub listening_adress: SocketAddr, } impl Default for ServerSettings { fn default() -> Self { @@ -20,7 +19,7 @@ impl Default for ServerSettings { #[derive(Debug, Deserialize, Clone, Default)] pub struct Discord { - pub token: String + pub token: String, } #[derive(Debug, Deserialize, Clone, Default)] diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index 004f763..3ad4cea 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -1,11 +1,3 @@ -use std::{ - collections::hash_map::DefaultHasher, - convert::TryFrom, - hash::{Hash, Hasher}, - str::FromStr, - time::Instant, -}; - use anyhow::bail; use http::{ header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE}, @@ -13,7 +5,13 @@ use http::{ }; use hyper::{client::HttpConnector, Body, Client}; use hyper_tls::HttpsConnector; -use shared::log::error; +use std::{ + collections::hash_map::DefaultHasher, + convert::TryFrom, + hash::{Hash, Hasher}, + str::FromStr, +}; +use tracing::{debug_span, error, instrument, Instrument}; use twilight_http_ratelimiting::{Method, Path}; use crate::ratelimit_client::RemoteRatelimiter; @@ -36,6 +34,7 @@ fn normalize_path(request_path: &str) -> (&str, &str) { } } +#[instrument] pub async fn handle_request( client: Client<HttpsConnector<HttpConnector>, Body>, ratelimiter: RemoteRatelimiter, @@ -72,7 +71,7 @@ pub async fn handle_request( "Failed to parse path for {:?} {}: {:?}", method, trimmed_path, e ); - bail!("failed o parse"); + bail!("failed to parse"); } } .hash(&mut hash); @@ -80,21 +79,18 @@ pub async fn handle_request( (hash.finish().to_string(), uri_string) }; - let start_ticket_request = Instant::now(); - let header_sender = match ratelimiter.ticket(hash).await { + let span = debug_span!("ticket validation request"); + let header_sender = match span + .in_scope(|| ratelimiter.ticket(hash)) + .await + { Ok(sender) => sender, Err(e) => { error!("Failed to receive ticket for ratelimiting: {:?}", e); bail!("failed to reteive ticket"); } }; - let time_took_ticket = Instant::now() - start_ticket_request; - - request.headers_mut().insert( - AUTHORIZATION, - HeaderValue::from_bytes(token.as_bytes()) - .expect("strings are guaranteed to be valid utf-8"), - ); + request .headers_mut() .insert(HOST, HeaderValue::from_static("discord.com")); @@ -106,7 +102,7 @@ pub async fn handle_request( request.headers_mut().remove("proxy-connection"); request.headers_mut().remove(TRANSFER_ENCODING); request.headers_mut().remove(UPGRADE); - + if let Some(auth) = request.headers_mut().get_mut(AUTHORIZATION) { if auth .to_str() @@ -130,25 +126,14 @@ pub async fn handle_request( } }; *request.uri_mut() = uri; - - let start_upstream_req = Instant::now(); - let mut resp = match client.request(request).await { + let span = debug_span!("upstream request to discord"); + let resp = match client.request(request).instrument(span).await { Ok(response) => response, Err(e) => { error!("Error when requesting the Discord API: {:?}", e); bail!("failed to request the discord api"); } }; - let upstream_time_took = Instant::now() - start_upstream_req; - - resp.headers_mut().append( - "X-TicketRequest-Ms", - HeaderValue::from_str(&time_took_ticket.as_millis().to_string()).unwrap(), - ); - resp.headers_mut().append( - "X-Upstream-Ms", - HeaderValue::from_str(&upstream_time_took.as_millis().to_string()).unwrap(), - ); let ratelimit_headers = resp .headers() diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs index 02721cc..158fc97 100644 --- a/exes/rest/src/lib.rs +++ b/exes/rest/src/lib.rs @@ -8,6 +8,8 @@ use hyper::{ }; use hyper_tls::HttpsConnector; use leash::{AnyhowResultFuture, Component}; +use opentelemetry::{global, trace::{Tracer}}; +use opentelemetry_http::HeaderExtractor; use shared::config::Settings; use std::{convert::Infallible, sync::Arc}; use tokio::sync::oneshot; @@ -38,6 +40,12 @@ impl Component for ReverseProxyServer { let token = token.clone(); async move { Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { + let parent_cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&HeaderExtractor(request.headers())) + }); + let _span = global::tracer("") + .start_with_context("handle_request", &parent_cx); + let client = client.clone(); let ratelimiter = ratelimiter.clone(); let token = token.clone(); @@ -64,4 +72,4 @@ impl Component for ReverseProxyServer { fn new() -> Self { Self {} } -}
\ No newline at end of file +} diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index ea34ad9..7493af9 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -1,10 +1,10 @@ -use self::remote_hashring::{HashRingWrapper, VNode}; -use futures_util::Future; +use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode}; +use opentelemetry::global; use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers}; use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest; -use shared::log::debug; use std::collections::HashMap; use std::fmt::Debug; +use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::time::UNIX_EPOCH; @@ -12,6 +12,9 @@ use std::time::{Duration, SystemTime}; use tokio::sync::oneshot::{self}; use tokio::sync::{broadcast, mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; +use tonic::Request; +use tracing::{debug, debug_span, Instrument, Span, instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; mod remote_hashring; @@ -45,7 +48,7 @@ impl RemoteRatelimiter { let mut write = self.remotes.write().await; - for ip in ["localhost"] { + for ip in ["ratelimit"] { let a = VNode::new(ip.into()).await?; write.add(a.clone()); } @@ -82,55 +85,80 @@ impl RemoteRatelimiter { obj } + #[instrument(name = "ticket task")] pub fn ticket(&self, path: String) -> IssueTicket { let remotes = self.remotes.clone(); let (tx, rx) = oneshot::channel::<HashMap<String, String>>(); - - Box::pin(async move { - // Get node managing this path - let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); - - // Buffers for the gRPC streaming channel. - let (send, remote) = mpsc::channel(5); - let (do_request, wait) = oneshot::channel(); - // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it - let stream = ReceiverStream::new(remote); - - // Start the grpc streaming - let ticket = node.submit_ticket(stream).await?; - - // First, send the request - send.send(BucketSubmitTicketRequest { - data: Some(Data::Path(path)), - }) - .await?; - - // We continuously listen for events in the channel. - tokio::spawn(async move { - let message = ticket.into_inner().message().await.unwrap().unwrap(); - - if message.accepted == 1 { - do_request.send(()).unwrap(); - let headers = rx.await.unwrap(); - - send.send(BucketSubmitTicketRequest { - data: Some(Data::Headers(Headers { - precise_time: SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("time went backwards") - .as_millis() as u64, - headers, - })), - }) - .await - .unwrap(); - } - }); - - // Wait for the message to be sent - wait.await?; - - Ok(tx) - }) + Box::pin( + async move { + // Get node managing this path + let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); + + // Buffers for the gRPC streaming channel. + let (send, remote) = mpsc::channel(5); + let (do_request, wait) = oneshot::channel(); + // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it + let stream = ReceiverStream::new(remote); + + let mut request = Request::new(stream); + + let span = debug_span!("remote request"); + let context = span.context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())) + }); + + // Start the grpc streaming + let ticket = node.submit_ticket(request).await?; + + // First, send the request + send.send(BucketSubmitTicketRequest { + data: Some(Data::Path(path)), + }) + .await?; + + // We continuously listen for events in the channel. + let span = debug_span!("stream worker"); + tokio::spawn( + async move { + let span = debug_span!("waiting for ticket upstream"); + let message = ticket + .into_inner() + .message() + .instrument(span) + .await + .unwrap() + .unwrap(); + + if message.accepted == 1 { + debug!("request ticket was accepted"); + do_request.send(()).unwrap(); + let span = debug_span!("waiting for response headers"); + let headers = rx.instrument(span).await.unwrap(); + + send.send(BucketSubmitTicketRequest { + data: Some(Data::Headers(Headers { + precise_time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_millis() + as u64, + headers, + })), + }) + .await + .unwrap(); + } + } + .instrument(span), + ); + + // Wait for the message to be sent + wait.await?; + + Ok(tx) + } + .instrument(Span::current()), + ) } } diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs index 4e3fa06..d1c1702 100644 --- a/exes/rest/src/ratelimit_client/remote_hashring.rs +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -1,4 +1,6 @@ use core::fmt::Debug; +use std::convert::TryFrom; +use opentelemetry::propagation::Injector; use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient; use std::hash::Hash; use std::ops::Deref; @@ -32,6 +34,20 @@ impl Hash for VNode { } } +pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap); + +impl<'a> Injector for MetadataMap<'a> { + /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs + fn set(&mut self, key: &str, value: String) { + if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { + if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { + self.0.insert(key, val); + } + } + } +} + + impl VNode { pub async fn new(address: String) -> Result<Self, tonic::transport::Error> { let client = RatelimiterClient::connect(format!("http://{}:8093", address.clone())).await?; |
