summaryrefslogtreecommitdiff
path: root/exes/rest
diff options
context:
space:
mode:
authorMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-15 17:03:29 +0400
committerMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-15 17:03:29 +0400
commitcbcaa3c01ec4d9ed95dc5af8232de1d10191bc44 (patch)
tree02cd56bb6eba762024c33f2df3cbec4f7a1a27e1 /exes/rest
parentc06cf52e887cc560f1fd4fa72713b239fe0102a6 (diff)
add metrics and jemalloc
Diffstat (limited to 'exes/rest')
-rw-r--r--exes/rest/Cargo.toml5
-rw-r--r--exes/rest/src/config.rs7
-rw-r--r--exes/rest/src/handler.rs144
-rw-r--r--exes/rest/src/lib.rs6
-rw-r--r--exes/rest/src/main.rs7
5 files changed, 151 insertions, 18 deletions
diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml
index 5583416..3d362b2 100644
--- a/exes/rest/Cargo.toml
+++ b/exes/rest/Cargo.toml
@@ -29,4 +29,7 @@ tokio-stream = "0.1.11"
dns-lookup = "1.0.8"
opentelemetry = "0.18.0"
opentelemetry-http = "0.7.0"
-tracing-opentelemetry = "0.18.0" \ No newline at end of file
+tracing-opentelemetry = "0.18.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5" \ No newline at end of file
diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs
index 9593ac8..900d3fb 100644
--- a/exes/rest/src/config.rs
+++ b/exes/rest/src/config.rs
@@ -28,4 +28,11 @@ pub struct ReverseProxy {
pub discord: Discord,
pub ratelimiter_address: String,
pub ratelimiter_port: u16,
+ #[serde(default = "default_upstream")]
+ pub upstream: Option<String>,
+}
+
+#[allow(clippy::unnecessary_wraps)]
+fn default_upstream() -> Option<String> {
+ Some("https://discord.com".to_string())
}
diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs
index f053af7..3d47935 100644
--- a/exes/rest/src/handler.rs
+++ b/exes/rest/src/handler.rs
@@ -1,23 +1,78 @@
-use anyhow::bail;
+use anyhow::{bail, Context};
+use futures_util::FutureExt;
use http::{
header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
HeaderValue, Method as HttpMethod, Request, Response, Uri,
};
use hyper::{client::HttpConnector, Body, Client};
use hyper_rustls::HttpsConnector;
+use opentelemetry::{
+ global,
+ metrics::{Counter, Histogram},
+ Context as OpenTelemetryContext, KeyValue,
+};
use std::{
collections::hash_map::DefaultHasher,
convert::TryFrom,
hash::{Hash, Hasher},
str::FromStr,
sync::Arc,
+ time::SystemTime,
};
-use tracing::{debug_span, error, info_span, instrument, Instrument};
+use tracing::{debug_span, error, info_span, log::trace, Instrument};
use twilight_http_ratelimiting::{Method, Path};
-use crate::ratelimit_client::RemoteRatelimiter;
+use crate::{config::ReverseProxy, ratelimit_client::RemoteRatelimiter};
+use lazy_static::lazy_static;
+
+lazy_static! {
+ static ref METER_NAME: &'static str = "";
+ static ref REQUESTS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.http_requests_total")
+ .with_description("Amount of requests processed by the rest reverse proxy")
+ .init()
+ };
+ static ref UPSTREAM_CALLS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.upstream_http_requests_total")
+ .with_description("Amount of requests sent to discord")
+ .init()
+ };
+ static ref TICKET_CALLS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.ticket_http_requests_total")
+ .with_description("Amount of requests sent to the ratelimiter")
+ .init()
+ };
+ static ref HEADERS_SUBMIT_CALLS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.header_submit_http_requests_total")
+ .with_description("Amount of requests sent to the ratelimiter")
+ .init()
+ };
+ static ref UPSTREAM_TIMES: Histogram<f64> = {
+ global::meter(&METER_NAME)
+ .f64_histogram("rest.upstream_http_request_duration_seconds")
+ .with_description("Time took to request discord")
+ .init()
+ };
+ static ref TICKET_TIMES: Histogram<f64> = {
+ global::meter(&METER_NAME)
+ .f64_histogram("rest.ticket_http_request_duration_seconds")
+ .with_description("Time took to get a ticket from the ratelimiter")
+ .init()
+ };
+ static ref HEADERS_SUBMIT_TIMES: Histogram<f64> = {
+ global::meter(&METER_NAME)
+ .f64_histogram("rest.header_submit_http_request_duration_seconds")
+ .with_description("Time took to get a ticket from the ratelimiter")
+ .init()
+ };
+}
/// Normalizes the path
+#[inline]
fn normalize_path(request_path: &str) -> (&str, &str) {
if let Some(trimmed_path) = request_path.strip_prefix("/api") {
if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) {
@@ -35,14 +90,18 @@ fn normalize_path(request_path: &str) -> (&str, &str) {
}
}
-#[instrument]
+#[inline]
+#[allow(clippy::too_many_lines)]
pub async fn handle_request(
client: Client<HttpsConnector<HttpConnector>, Body>,
ratelimiter: Arc<RemoteRatelimiter>,
+ config: ReverseProxy,
token: String,
mut request: Request<Body>,
) -> Result<Response<Body>, anyhow::Error> {
- let (hash, uri_string) = {
+ let cx = OpenTelemetryContext::current();
+
+ let (bucket, uri_string) = {
let method = match *request.method() {
HttpMethod::DELETE => Method::Delete,
HttpMethod::GET => Method::Get,
@@ -50,20 +109,25 @@ pub async fn handle_request(
HttpMethod::POST => Method::Post,
HttpMethod::PUT => Method::Put,
_ => {
- error!("Unsupported HTTP method in request, {}", request.method());
+ error!(method =? request.method(), "unsupported HTTP method in request");
bail!("unsupported method");
}
};
-
let request_path = request.uri().path();
let (api_path, trimmed_path) = normalize_path(request_path);
+ trace!("normalized path to {trimmed_path}");
- let mut uri_string = format!("https://discord.com{api_path}{trimmed_path}");
+ let mut uri_string = format!(
+ "{}{api_path}{trimmed_path}",
+ config.upstream.expect("no upstream")
+ );
if let Some(query) = request.uri().query() {
uri_string.push('?');
uri_string.push_str(query);
}
+ trace!("full request uri is {uri_string}");
+
let mut hash = DefaultHasher::new();
match Path::try_from((method, trimmed_path)) {
Ok(path) => path,
@@ -76,14 +140,36 @@ pub async fn handle_request(
}
}
.hash(&mut hash);
+ let bucket = hash.finish().to_string();
+ trace!("Request bucket is {}", bucket);
- (hash.finish().to_string(), uri_string)
+ (bucket, uri_string)
};
+
+ REQUESTS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
+
+ let ticket_start = SystemTime::now();
+ TICKET_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
// waits for the request to be authorized
- ratelimiter
- .ticket(hash.clone())
+ match ratelimiter
+ .ticket(bucket.clone())
.instrument(debug_span!("ticket validation request"))
- .await?;
+ .then(|v| async {
+ TICKET_TIMES.record(
+ &cx,
+ ticket_start.elapsed()?.as_secs_f64(),
+ &[KeyValue::new("bucket", bucket.clone())],
+ );
+ v
+ })
+ .await
+ {
+ Ok(_) => {}
+ Err(e) => {
+ error!("Error when requesting the ratelimiter: {:?}", e);
+ bail!("failed to request the ratelimiter");
+ }
+ }
request
.headers_mut()
@@ -121,7 +207,21 @@ pub async fn handle_request(
};
*request.uri_mut() = uri;
let span = debug_span!("upstream request to discord");
- let resp = match client.request(request).instrument(span).await {
+ let upstream_start = SystemTime::now();
+ UPSTREAM_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
+ let resp = match client
+ .request(request)
+ .instrument(span)
+ .then(|v| async {
+ UPSTREAM_TIMES.record(
+ &cx,
+ upstream_start.elapsed()?.as_secs_f64(),
+ &[KeyValue::new("bucket", bucket.clone())],
+ );
+ v.context("")
+ })
+ .await
+ {
Ok(response) => response,
Err(e) => {
error!("Error when requesting the Discord API: {:?}", e);
@@ -132,14 +232,28 @@ pub async fn handle_request(
let headers = resp
.headers()
.into_iter()
- .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string)))
+ .map(|(k, v)| {
+ (
+ k.to_string(),
+ v.to_str().map(std::string::ToString::to_string),
+ )
+ })
.filter(|f| f.1.is_ok())
.map(|f| (f.0, f.1.expect("errors should be filtered")))
.collect();
+ HEADERS_SUBMIT_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
let _submit_headers = ratelimiter
- .submit_headers(hash, headers)
+ .submit_headers(bucket.clone(), headers)
.instrument(info_span!("submitting headers"))
+ .then(|v| async {
+ HEADERS_SUBMIT_TIMES.record(
+ &cx,
+ upstream_start.elapsed()?.as_secs_f64(),
+ &[KeyValue::new("bucket", bucket.clone())],
+ );
+ v
+ })
.await;
Ok(resp)
diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs
index 174bea0..917f524 100644
--- a/exes/rest/src/lib.rs
+++ b/exes/rest/src/lib.rs
@@ -53,11 +53,12 @@ impl Component for ReverseProxyServer {
let client: Client<_, hyper::Body> = Client::builder().build(https);
let token = settings.config.discord.token.clone();
-
+ let config = settings.config.clone();
let service_fn = make_service_fn(move |_: &AddrStream| {
let client = client.clone();
let ratelimiter = ratelimiter.clone();
let token = token.clone();
+ let config = config.clone();
async move {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
let token = token.clone();
@@ -71,10 +72,11 @@ impl Component for ReverseProxyServer {
let client = client.clone();
let ratelimiter = ratelimiter.clone();
+ let config = config.clone();
async move {
let token = token.clone();
let ratelimiter = ratelimiter.clone();
- handle_request(client, ratelimiter, token, request).await
+ handle_request(client, ratelimiter, config, token, request).await
}
}))
}
diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs
index fe8ada7..643bc89 100644
--- a/exes/rest/src/main.rs
+++ b/exes/rest/src/main.rs
@@ -1,4 +1,11 @@
use leash::ignite;
use rest::ReverseProxyServer;
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
ignite!(ReverseProxyServer);