summaryrefslogtreecommitdiff
path: root/exes
diff options
context:
space:
mode:
authorMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-15 17:03:29 +0400
committerMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-15 17:03:29 +0400
commitcbcaa3c01ec4d9ed95dc5af8232de1d10191bc44 (patch)
tree02cd56bb6eba762024c33f2df3cbec4f7a1a27e1 /exes
parentc06cf52e887cc560f1fd4fa72713b239fe0102a6 (diff)
add metrics and jemalloc
Diffstat (limited to 'exes')
-rw-r--r--exes/cache/Cargo.toml2
-rw-r--r--exes/gateway/Cargo.toml7
-rw-r--r--exes/gateway/src/lib.rs19
-rw-r--r--exes/gateway/src/main.rs7
-rw-r--r--exes/ratelimit/Cargo.toml3
-rw-r--r--exes/ratelimit/src/main.rs7
-rw-r--r--exes/rest/Cargo.toml5
-rw-r--r--exes/rest/src/config.rs7
-rw-r--r--exes/rest/src/handler.rs144
-rw-r--r--exes/rest/src/lib.rs6
-rw-r--r--exes/rest/src/main.rs7
-rw-r--r--exes/webhook/Cargo.toml5
-rw-r--r--exes/webhook/src/main.rs7
13 files changed, 194 insertions, 32 deletions
diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml
index 70cb6a3..4dd7c97 100644
--- a/exes/cache/Cargo.toml
+++ b/exes/cache/Cargo.toml
@@ -19,7 +19,7 @@ tokio-stream = "0.1.11"
serde = { version = "1.0.8", features = ["derive"] }
serde_json = { version = "1.0" }
-async-nats = "0.25.1"
+async-nats = "0.26.0"
twilight-model = "0.14"
anyhow = "1.0.68"
diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml
index a174710..98a1ae8 100644
--- a/exes/gateway/Cargo.toml
+++ b/exes/gateway/Cargo.toml
@@ -23,8 +23,11 @@ serde_json = { version = "1.0" }
tracing = "0.1.37"
tracing-futures = "0.2.5"
-async-nats = "0.25.1"
+async-nats = "0.26.0"
tracing-opentelemetry = "0.18.0"
opentelemetry = "0.18.0"
-opentelemetry-http = "0.7.0" \ No newline at end of file
+opentelemetry-http = "0.7.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5" \ No newline at end of file
diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs
index c4da8f4..ddc4bbd 100644
--- a/exes/gateway/src/lib.rs
+++ b/exes/gateway/src/lib.rs
@@ -24,7 +24,7 @@ use tokio_stream::StreamExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_gateway::{Event, Shard};
pub mod config;
-use tracing::{debug, error, info, trace_span};
+use tracing::{debug, error, info, info_span, instrument, Instrument};
use twilight_model::gateway::event::DispatchEvent;
struct MetadataMap<'a>(&'a mut HeaderMap);
@@ -84,13 +84,15 @@ impl Component for GatewayServer {
}
}
+#[instrument]
async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
if let Event::Ready(ready) = event {
- info!("Logged in as {}", ready.user.name);
+ info!(username = ready.user.name, "logged in");
} else {
let name = event.kind().name();
if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
- debug!("handling event {}", name.unwrap());
+ let name = name.unwrap();
+ debug!(event_name = name, "handling dispatch event");
let data = CachePayload {
data: DispatchEventTagged(dispatch_event),
@@ -98,7 +100,7 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
let value = serde_json::to_string(&data)?;
let bytes = bytes::Bytes::from(value);
- let span = trace_span!("nats send");
+ let span = info_span!("nats send");
let mut header_map = HeaderMap::new();
let context = span.context();
@@ -106,12 +108,9 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
propagator.inject_context(&context, &mut MetadataMap(&mut header_map));
});
- nats.publish_with_headers(
- format!("nova.cache.dispatch.{}", name.unwrap()),
- header_map,
- bytes,
- )
- .await?;
+ nats.publish_with_headers(format!("nova.cache.dispatch.{name}"), header_map, bytes)
+ .instrument(info_span!("sending to nats"))
+ .await?;
}
}
diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs
index f1b0298..6cac7e1 100644
--- a/exes/gateway/src/main.rs
+++ b/exes/gateway/src/main.rs
@@ -1,4 +1,11 @@
use gateway::GatewayServer;
use leash::ignite;
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
ignite!(GatewayServer);
diff --git a/exes/ratelimit/Cargo.toml b/exes/ratelimit/Cargo.toml
index 3989735..b011d38 100644
--- a/exes/ratelimit/Cargo.toml
+++ b/exes/ratelimit/Cargo.toml
@@ -39,3 +39,6 @@ test-log = { version = "0.2.11", features = ["log", "trace"] }
[[bench]]
name = "bucket"
harness = false
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5" \ No newline at end of file
diff --git a/exes/ratelimit/src/main.rs b/exes/ratelimit/src/main.rs
index 2de812b..d16e7d3 100644
--- a/exes/ratelimit/src/main.rs
+++ b/exes/ratelimit/src/main.rs
@@ -1,4 +1,11 @@
use leash::ignite;
use ratelimit::RatelimiterServerComponent;
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
ignite!(RatelimiterServerComponent);
diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml
index 5583416..3d362b2 100644
--- a/exes/rest/Cargo.toml
+++ b/exes/rest/Cargo.toml
@@ -29,4 +29,7 @@ tokio-stream = "0.1.11"
dns-lookup = "1.0.8"
opentelemetry = "0.18.0"
opentelemetry-http = "0.7.0"
-tracing-opentelemetry = "0.18.0" \ No newline at end of file
+tracing-opentelemetry = "0.18.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5" \ No newline at end of file
diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs
index 9593ac8..900d3fb 100644
--- a/exes/rest/src/config.rs
+++ b/exes/rest/src/config.rs
@@ -28,4 +28,11 @@ pub struct ReverseProxy {
pub discord: Discord,
pub ratelimiter_address: String,
pub ratelimiter_port: u16,
+ #[serde(default = "default_upstream")]
+ pub upstream: Option<String>,
+}
+
+#[allow(clippy::unnecessary_wraps)]
+fn default_upstream() -> Option<String> {
+ Some("https://discord.com".to_string())
}
diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs
index f053af7..3d47935 100644
--- a/exes/rest/src/handler.rs
+++ b/exes/rest/src/handler.rs
@@ -1,23 +1,78 @@
-use anyhow::bail;
+use anyhow::{bail, Context};
+use futures_util::FutureExt;
use http::{
header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
HeaderValue, Method as HttpMethod, Request, Response, Uri,
};
use hyper::{client::HttpConnector, Body, Client};
use hyper_rustls::HttpsConnector;
+use opentelemetry::{
+ global,
+ metrics::{Counter, Histogram},
+ Context as OpenTelemetryContext, KeyValue,
+};
use std::{
collections::hash_map::DefaultHasher,
convert::TryFrom,
hash::{Hash, Hasher},
str::FromStr,
sync::Arc,
+ time::SystemTime,
};
-use tracing::{debug_span, error, info_span, instrument, Instrument};
+use tracing::{debug_span, error, info_span, log::trace, Instrument};
use twilight_http_ratelimiting::{Method, Path};
-use crate::ratelimit_client::RemoteRatelimiter;
+use crate::{config::ReverseProxy, ratelimit_client::RemoteRatelimiter};
+use lazy_static::lazy_static;
+
+lazy_static! {
+ static ref METER_NAME: &'static str = "";
+ static ref REQUESTS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.http_requests_total")
+ .with_description("Amount of requests processed by the rest reverse proxy")
+ .init()
+ };
+ static ref UPSTREAM_CALLS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.upstream_http_requests_total")
+ .with_description("Amount of requests sent to discord")
+ .init()
+ };
+ static ref TICKET_CALLS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.ticket_http_requests_total")
+ .with_description("Amount of requests sent to the ratelimiter")
+ .init()
+ };
+ static ref HEADERS_SUBMIT_CALLS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.header_submit_http_requests_total")
+ .with_description("Amount of requests sent to the ratelimiter")
+ .init()
+ };
+ static ref UPSTREAM_TIMES: Histogram<f64> = {
+ global::meter(&METER_NAME)
+ .f64_histogram("rest.upstream_http_request_duration_seconds")
+ .with_description("Time took to request discord")
+ .init()
+ };
+ static ref TICKET_TIMES: Histogram<f64> = {
+ global::meter(&METER_NAME)
+ .f64_histogram("rest.ticket_http_request_duration_seconds")
+ .with_description("Time took to get a ticket from the ratelimiter")
+ .init()
+ };
+ static ref HEADERS_SUBMIT_TIMES: Histogram<f64> = {
+ global::meter(&METER_NAME)
+ .f64_histogram("rest.header_submit_http_request_duration_seconds")
+ .with_description("Time took to get a ticket from the ratelimiter")
+ .init()
+ };
+}
/// Normalizes the path
+#[inline]
fn normalize_path(request_path: &str) -> (&str, &str) {
if let Some(trimmed_path) = request_path.strip_prefix("/api") {
if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) {
@@ -35,14 +90,18 @@ fn normalize_path(request_path: &str) -> (&str, &str) {
}
}
-#[instrument]
+#[inline]
+#[allow(clippy::too_many_lines)]
pub async fn handle_request(
client: Client<HttpsConnector<HttpConnector>, Body>,
ratelimiter: Arc<RemoteRatelimiter>,
+ config: ReverseProxy,
token: String,
mut request: Request<Body>,
) -> Result<Response<Body>, anyhow::Error> {
- let (hash, uri_string) = {
+ let cx = OpenTelemetryContext::current();
+
+ let (bucket, uri_string) = {
let method = match *request.method() {
HttpMethod::DELETE => Method::Delete,
HttpMethod::GET => Method::Get,
@@ -50,20 +109,25 @@ pub async fn handle_request(
HttpMethod::POST => Method::Post,
HttpMethod::PUT => Method::Put,
_ => {
- error!("Unsupported HTTP method in request, {}", request.method());
+ error!(method =? request.method(), "unsupported HTTP method in request");
bail!("unsupported method");
}
};
-
let request_path = request.uri().path();
let (api_path, trimmed_path) = normalize_path(request_path);
+ trace!("normalized path to {trimmed_path}");
- let mut uri_string = format!("https://discord.com{api_path}{trimmed_path}");
+ let mut uri_string = format!(
+ "{}{api_path}{trimmed_path}",
+ config.upstream.expect("no upstream")
+ );
if let Some(query) = request.uri().query() {
uri_string.push('?');
uri_string.push_str(query);
}
+ trace!("full request uri is {uri_string}");
+
let mut hash = DefaultHasher::new();
match Path::try_from((method, trimmed_path)) {
Ok(path) => path,
@@ -76,14 +140,36 @@ pub async fn handle_request(
}
}
.hash(&mut hash);
+ let bucket = hash.finish().to_string();
+ trace!("Request bucket is {}", bucket);
- (hash.finish().to_string(), uri_string)
+ (bucket, uri_string)
};
+
+ REQUESTS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
+
+ let ticket_start = SystemTime::now();
+ TICKET_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
// waits for the request to be authorized
- ratelimiter
- .ticket(hash.clone())
+ match ratelimiter
+ .ticket(bucket.clone())
.instrument(debug_span!("ticket validation request"))
- .await?;
+ .then(|v| async {
+ TICKET_TIMES.record(
+ &cx,
+ ticket_start.elapsed()?.as_secs_f64(),
+ &[KeyValue::new("bucket", bucket.clone())],
+ );
+ v
+ })
+ .await
+ {
+ Ok(_) => {}
+ Err(e) => {
+ error!("Error when requesting the ratelimiter: {:?}", e);
+ bail!("failed to request the ratelimiter");
+ }
+ }
request
.headers_mut()
@@ -121,7 +207,21 @@ pub async fn handle_request(
};
*request.uri_mut() = uri;
let span = debug_span!("upstream request to discord");
- let resp = match client.request(request).instrument(span).await {
+ let upstream_start = SystemTime::now();
+ UPSTREAM_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
+ let resp = match client
+ .request(request)
+ .instrument(span)
+ .then(|v| async {
+ UPSTREAM_TIMES.record(
+ &cx,
+ upstream_start.elapsed()?.as_secs_f64(),
+ &[KeyValue::new("bucket", bucket.clone())],
+ );
+ v.context("")
+ })
+ .await
+ {
Ok(response) => response,
Err(e) => {
error!("Error when requesting the Discord API: {:?}", e);
@@ -132,14 +232,28 @@ pub async fn handle_request(
let headers = resp
.headers()
.into_iter()
- .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string)))
+ .map(|(k, v)| {
+ (
+ k.to_string(),
+ v.to_str().map(std::string::ToString::to_string),
+ )
+ })
.filter(|f| f.1.is_ok())
.map(|f| (f.0, f.1.expect("errors should be filtered")))
.collect();
+ HEADERS_SUBMIT_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
let _submit_headers = ratelimiter
- .submit_headers(hash, headers)
+ .submit_headers(bucket.clone(), headers)
.instrument(info_span!("submitting headers"))
+ .then(|v| async {
+ HEADERS_SUBMIT_TIMES.record(
+ &cx,
+ upstream_start.elapsed()?.as_secs_f64(),
+ &[KeyValue::new("bucket", bucket.clone())],
+ );
+ v
+ })
.await;
Ok(resp)
diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs
index 174bea0..917f524 100644
--- a/exes/rest/src/lib.rs
+++ b/exes/rest/src/lib.rs
@@ -53,11 +53,12 @@ impl Component for ReverseProxyServer {
let client: Client<_, hyper::Body> = Client::builder().build(https);
let token = settings.config.discord.token.clone();
-
+ let config = settings.config.clone();
let service_fn = make_service_fn(move |_: &AddrStream| {
let client = client.clone();
let ratelimiter = ratelimiter.clone();
let token = token.clone();
+ let config = config.clone();
async move {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
let token = token.clone();
@@ -71,10 +72,11 @@ impl Component for ReverseProxyServer {
let client = client.clone();
let ratelimiter = ratelimiter.clone();
+ let config = config.clone();
async move {
let token = token.clone();
let ratelimiter = ratelimiter.clone();
- handle_request(client, ratelimiter, token, request).await
+ handle_request(client, ratelimiter, config, token, request).await
}
}))
}
diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs
index fe8ada7..643bc89 100644
--- a/exes/rest/src/main.rs
+++ b/exes/rest/src/main.rs
@@ -1,4 +1,11 @@
use leash::ignite;
use rest::ReverseProxyServer;
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
ignite!(ReverseProxyServer);
diff --git a/exes/webhook/Cargo.toml b/exes/webhook/Cargo.toml
index 0c50009..af256c8 100644
--- a/exes/webhook/Cargo.toml
+++ b/exes/webhook/Cargo.toml
@@ -19,4 +19,7 @@ ed25519-dalek = "1"
twilight-model = { version = "0.14" }
anyhow = "1.0.68"
-async-nats = "0.25.1"
+async-nats = "0.26.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5" \ No newline at end of file
diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs
index f531725..9432dbd 100644
--- a/exes/webhook/src/main.rs
+++ b/exes/webhook/src/main.rs
@@ -1,4 +1,11 @@
use leash::ignite;
use webhook::WebhookServer;
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
ignite!(WebhookServer);