summaryrefslogtreecommitdiff
path: root/exes/rest/src
diff options
context:
space:
mode:
Diffstat (limited to 'exes/rest/src')
-rw-r--r--exes/rest/src/config.rs7
-rw-r--r--exes/rest/src/handler.rs51
-rw-r--r--exes/rest/src/lib.rs10
-rw-r--r--exes/rest/src/ratelimit_client/mod.rs130
-rw-r--r--exes/rest/src/ratelimit_client/remote_hashring.rs16
5 files changed, 125 insertions, 89 deletions
diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs
index 4e27a30..3bfe8db 100644
--- a/exes/rest/src/config.rs
+++ b/exes/rest/src/config.rs
@@ -1,5 +1,5 @@
-use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use serde::Deserialize;
+use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
fn default_listening_address() -> SocketAddr {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8090))
@@ -7,8 +7,7 @@ fn default_listening_address() -> SocketAddr {
#[derive(Debug, Deserialize, Clone)]
pub struct ServerSettings {
- #[serde(default = "default_listening_address")]
- pub listening_adress: SocketAddr
+ pub listening_adress: SocketAddr,
}
impl Default for ServerSettings {
fn default() -> Self {
@@ -20,7 +19,7 @@ impl Default for ServerSettings {
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Discord {
- pub token: String
+ pub token: String,
}
#[derive(Debug, Deserialize, Clone, Default)]
diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs
index 004f763..3ad4cea 100644
--- a/exes/rest/src/handler.rs
+++ b/exes/rest/src/handler.rs
@@ -1,11 +1,3 @@
-use std::{
- collections::hash_map::DefaultHasher,
- convert::TryFrom,
- hash::{Hash, Hasher},
- str::FromStr,
- time::Instant,
-};
-
use anyhow::bail;
use http::{
header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
@@ -13,7 +5,13 @@ use http::{
};
use hyper::{client::HttpConnector, Body, Client};
use hyper_tls::HttpsConnector;
-use shared::log::error;
+use std::{
+ collections::hash_map::DefaultHasher,
+ convert::TryFrom,
+ hash::{Hash, Hasher},
+ str::FromStr,
+};
+use tracing::{debug_span, error, instrument, Instrument};
use twilight_http_ratelimiting::{Method, Path};
use crate::ratelimit_client::RemoteRatelimiter;
@@ -36,6 +34,7 @@ fn normalize_path(request_path: &str) -> (&str, &str) {
}
}
+#[instrument]
pub async fn handle_request(
client: Client<HttpsConnector<HttpConnector>, Body>,
ratelimiter: RemoteRatelimiter,
@@ -72,7 +71,7 @@ pub async fn handle_request(
"Failed to parse path for {:?} {}: {:?}",
method, trimmed_path, e
);
- bail!("failed o parse");
+ bail!("failed to parse");
}
}
.hash(&mut hash);
@@ -80,21 +79,18 @@ pub async fn handle_request(
(hash.finish().to_string(), uri_string)
};
- let start_ticket_request = Instant::now();
- let header_sender = match ratelimiter.ticket(hash).await {
+ let span = debug_span!("ticket validation request");
+ let header_sender = match span
+ .in_scope(|| ratelimiter.ticket(hash))
+ .await
+ {
Ok(sender) => sender,
Err(e) => {
error!("Failed to receive ticket for ratelimiting: {:?}", e);
bail!("failed to reteive ticket");
}
};
- let time_took_ticket = Instant::now() - start_ticket_request;
-
- request.headers_mut().insert(
- AUTHORIZATION,
- HeaderValue::from_bytes(token.as_bytes())
- .expect("strings are guaranteed to be valid utf-8"),
- );
+
request
.headers_mut()
.insert(HOST, HeaderValue::from_static("discord.com"));
@@ -106,7 +102,7 @@ pub async fn handle_request(
request.headers_mut().remove("proxy-connection");
request.headers_mut().remove(TRANSFER_ENCODING);
request.headers_mut().remove(UPGRADE);
-
+
if let Some(auth) = request.headers_mut().get_mut(AUTHORIZATION) {
if auth
.to_str()
@@ -130,25 +126,14 @@ pub async fn handle_request(
}
};
*request.uri_mut() = uri;
-
- let start_upstream_req = Instant::now();
- let mut resp = match client.request(request).await {
+ let span = debug_span!("upstream request to discord");
+ let resp = match client.request(request).instrument(span).await {
Ok(response) => response,
Err(e) => {
error!("Error when requesting the Discord API: {:?}", e);
bail!("failed to request the discord api");
}
};
- let upstream_time_took = Instant::now() - start_upstream_req;
-
- resp.headers_mut().append(
- "X-TicketRequest-Ms",
- HeaderValue::from_str(&time_took_ticket.as_millis().to_string()).unwrap(),
- );
- resp.headers_mut().append(
- "X-Upstream-Ms",
- HeaderValue::from_str(&upstream_time_took.as_millis().to_string()).unwrap(),
- );
let ratelimit_headers = resp
.headers()
diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs
index 02721cc..158fc97 100644
--- a/exes/rest/src/lib.rs
+++ b/exes/rest/src/lib.rs
@@ -8,6 +8,8 @@ use hyper::{
};
use hyper_tls::HttpsConnector;
use leash::{AnyhowResultFuture, Component};
+use opentelemetry::{global, trace::{Tracer}};
+use opentelemetry_http::HeaderExtractor;
use shared::config::Settings;
use std::{convert::Infallible, sync::Arc};
use tokio::sync::oneshot;
@@ -38,6 +40,12 @@ impl Component for ReverseProxyServer {
let token = token.clone();
async move {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
+ let parent_cx = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&HeaderExtractor(request.headers()))
+ });
+ let _span = global::tracer("")
+ .start_with_context("handle_request", &parent_cx);
+
let client = client.clone();
let ratelimiter = ratelimiter.clone();
let token = token.clone();
@@ -64,4 +72,4 @@ impl Component for ReverseProxyServer {
fn new() -> Self {
Self {}
}
-} \ No newline at end of file
+}
diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs
index ea34ad9..7493af9 100644
--- a/exes/rest/src/ratelimit_client/mod.rs
+++ b/exes/rest/src/ratelimit_client/mod.rs
@@ -1,10 +1,10 @@
-use self::remote_hashring::{HashRingWrapper, VNode};
-use futures_util::Future;
+use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode};
+use opentelemetry::global;
use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};
use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest;
-use shared::log::debug;
use std::collections::HashMap;
use std::fmt::Debug;
+use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
@@ -12,6 +12,9 @@ use std::time::{Duration, SystemTime};
use tokio::sync::oneshot::{self};
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
+use tonic::Request;
+use tracing::{debug, debug_span, Instrument, Span, instrument};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
mod remote_hashring;
@@ -45,7 +48,7 @@ impl RemoteRatelimiter {
let mut write = self.remotes.write().await;
- for ip in ["localhost"] {
+ for ip in ["ratelimit"] {
let a = VNode::new(ip.into()).await?;
write.add(a.clone());
}
@@ -82,55 +85,80 @@ impl RemoteRatelimiter {
obj
}
+ #[instrument(name = "ticket task")]
pub fn ticket(&self, path: String) -> IssueTicket {
let remotes = self.remotes.clone();
let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
-
- Box::pin(async move {
- // Get node managing this path
- let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
-
- // Buffers for the gRPC streaming channel.
- let (send, remote) = mpsc::channel(5);
- let (do_request, wait) = oneshot::channel();
- // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
- let stream = ReceiverStream::new(remote);
-
- // Start the grpc streaming
- let ticket = node.submit_ticket(stream).await?;
-
- // First, send the request
- send.send(BucketSubmitTicketRequest {
- data: Some(Data::Path(path)),
- })
- .await?;
-
- // We continuously listen for events in the channel.
- tokio::spawn(async move {
- let message = ticket.into_inner().message().await.unwrap().unwrap();
-
- if message.accepted == 1 {
- do_request.send(()).unwrap();
- let headers = rx.await.unwrap();
-
- send.send(BucketSubmitTicketRequest {
- data: Some(Data::Headers(Headers {
- precise_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("time went backwards")
- .as_millis() as u64,
- headers,
- })),
- })
- .await
- .unwrap();
- }
- });
-
- // Wait for the message to be sent
- wait.await?;
-
- Ok(tx)
- })
+ Box::pin(
+ async move {
+ // Get node managing this path
+ let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
+
+ // Buffers for the gRPC streaming channel.
+ let (send, remote) = mpsc::channel(5);
+ let (do_request, wait) = oneshot::channel();
+ // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
+ let stream = ReceiverStream::new(remote);
+
+ let mut request = Request::new(stream);
+
+ let span = debug_span!("remote request");
+ let context = span.context();
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()))
+ });
+
+ // Start the grpc streaming
+ let ticket = node.submit_ticket(request).await?;
+
+ // First, send the request
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Path(path)),
+ })
+ .await?;
+
+ // We continuously listen for events in the channel.
+ let span = debug_span!("stream worker");
+ tokio::spawn(
+ async move {
+ let span = debug_span!("waiting for ticket upstream");
+ let message = ticket
+ .into_inner()
+ .message()
+ .instrument(span)
+ .await
+ .unwrap()
+ .unwrap();
+
+ if message.accepted == 1 {
+ debug!("request ticket was accepted");
+ do_request.send(()).unwrap();
+ let span = debug_span!("waiting for response headers");
+ let headers = rx.instrument(span).await.unwrap();
+
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Headers(Headers {
+ precise_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("time went backwards")
+ .as_millis()
+ as u64,
+ headers,
+ })),
+ })
+ .await
+ .unwrap();
+ }
+ }
+ .instrument(span),
+ );
+
+ // Wait for the message to be sent
+ wait.await?;
+
+ Ok(tx)
+ }
+ .instrument(Span::current()),
+ )
}
}
diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs
index 4e3fa06..d1c1702 100644
--- a/exes/rest/src/ratelimit_client/remote_hashring.rs
+++ b/exes/rest/src/ratelimit_client/remote_hashring.rs
@@ -1,4 +1,6 @@
use core::fmt::Debug;
+use std::convert::TryFrom;
+use opentelemetry::propagation::Injector;
use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient;
use std::hash::Hash;
use std::ops::Deref;
@@ -32,6 +34,20 @@ impl Hash for VNode {
}
}
+pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap);
+
+impl<'a> Injector for MetadataMap<'a> {
+ /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs
+ fn set(&mut self, key: &str, value: String) {
+ if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
+ if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) {
+ self.0.insert(key, val);
+ }
+ }
+ }
+}
+
+
impl VNode {
pub async fn new(address: String) -> Result<Self, tonic::transport::Error> {
let client = RatelimiterClient::connect(format!("http://{}:8093", address.clone())).await?;