diff options
| author | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-15 17:03:29 +0400 | 
|---|---|---|
| committer | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-15 17:03:29 +0400 | 
| commit | cbcaa3c01ec4d9ed95dc5af8232de1d10191bc44 (patch) | |
| tree | 02cd56bb6eba762024c33f2df3cbec4f7a1a27e1 /exes/rest | |
| parent | c06cf52e887cc560f1fd4fa72713b239fe0102a6 (diff) | |
add metrics and jemalloc
Diffstat (limited to 'exes/rest')
| -rw-r--r-- | exes/rest/Cargo.toml | 5 | ||||
| -rw-r--r-- | exes/rest/src/config.rs | 7 | ||||
| -rw-r--r-- | exes/rest/src/handler.rs | 144 | ||||
| -rw-r--r-- | exes/rest/src/lib.rs | 6 | ||||
| -rw-r--r-- | exes/rest/src/main.rs | 7 | 
5 files changed, 151 insertions, 18 deletions
diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml index 5583416..3d362b2 100644 --- a/exes/rest/Cargo.toml +++ b/exes/rest/Cargo.toml @@ -29,4 +29,7 @@ tokio-stream = "0.1.11"  dns-lookup = "1.0.8"  opentelemetry = "0.18.0"  opentelemetry-http = "0.7.0" -tracing-opentelemetry = "0.18.0"
\ No newline at end of file +tracing-opentelemetry = "0.18.0" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5"
\ No newline at end of file diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 9593ac8..900d3fb 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -28,4 +28,11 @@ pub struct ReverseProxy {      pub discord: Discord,      pub ratelimiter_address: String,      pub ratelimiter_port: u16, +    #[serde(default = "default_upstream")] +    pub upstream: Option<String>, +} + +#[allow(clippy::unnecessary_wraps)] +fn default_upstream() -> Option<String> { +    Some("https://discord.com".to_string())  } diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index f053af7..3d47935 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -1,23 +1,78 @@ -use anyhow::bail; +use anyhow::{bail, Context}; +use futures_util::FutureExt;  use http::{      header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},      HeaderValue, Method as HttpMethod, Request, Response, Uri,  };  use hyper::{client::HttpConnector, Body, Client};  use hyper_rustls::HttpsConnector; +use opentelemetry::{ +    global, +    metrics::{Counter, Histogram}, +    Context as OpenTelemetryContext, KeyValue, +};  use std::{      collections::hash_map::DefaultHasher,      convert::TryFrom,      hash::{Hash, Hasher},      str::FromStr,      sync::Arc, +    time::SystemTime,  }; -use tracing::{debug_span, error, info_span, instrument, Instrument}; +use tracing::{debug_span, error, info_span, log::trace, Instrument};  use twilight_http_ratelimiting::{Method, Path}; -use crate::ratelimit_client::RemoteRatelimiter; +use crate::{config::ReverseProxy, ratelimit_client::RemoteRatelimiter}; +use lazy_static::lazy_static; + +lazy_static! { +    static ref METER_NAME: &'static str = ""; +    static ref REQUESTS: Counter<u64> = { +        global::meter(&METER_NAME) +            .u64_counter("rest.http_requests_total") +            .with_description("Amount of requests processed by the rest reverse proxy") +            .init() +    }; +    static ref UPSTREAM_CALLS: Counter<u64> = { +        global::meter(&METER_NAME) +            .u64_counter("rest.upstream_http_requests_total") +            .with_description("Amount of requests sent to discord") +            .init() +    }; +    static ref TICKET_CALLS: Counter<u64> = { +        global::meter(&METER_NAME) +            .u64_counter("rest.ticket_http_requests_total") +            .with_description("Amount of requests sent to the ratelimiter") +            .init() +    }; +    static ref HEADERS_SUBMIT_CALLS: Counter<u64> = { +        global::meter(&METER_NAME) +            .u64_counter("rest.header_submit_http_requests_total") +            .with_description("Amount of requests sent to the ratelimiter") +            .init() +    }; +    static ref UPSTREAM_TIMES: Histogram<f64> = { +        global::meter(&METER_NAME) +            .f64_histogram("rest.upstream_http_request_duration_seconds") +            .with_description("Time took to request discord") +            .init() +    }; +    static ref TICKET_TIMES: Histogram<f64> = { +        global::meter(&METER_NAME) +            .f64_histogram("rest.ticket_http_request_duration_seconds") +            .with_description("Time took to get a ticket from the ratelimiter") +            .init() +    }; +    static ref HEADERS_SUBMIT_TIMES: Histogram<f64> = { +        global::meter(&METER_NAME) +            .f64_histogram("rest.header_submit_http_request_duration_seconds") +            .with_description("Time took to get a ticket from the ratelimiter") +            .init() +    }; +}  /// Normalizes the path +#[inline]  fn normalize_path(request_path: &str) -> (&str, &str) {      if let Some(trimmed_path) = request_path.strip_prefix("/api") {          if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) { @@ -35,14 +90,18 @@ fn normalize_path(request_path: &str) -> (&str, &str) {      }  } -#[instrument] +#[inline] +#[allow(clippy::too_many_lines)]  pub async fn handle_request(      client: Client<HttpsConnector<HttpConnector>, Body>,      ratelimiter: Arc<RemoteRatelimiter>, +    config: ReverseProxy,      token: String,      mut request: Request<Body>,  ) -> Result<Response<Body>, anyhow::Error> { -    let (hash, uri_string) = { +    let cx = OpenTelemetryContext::current(); + +    let (bucket, uri_string) = {          let method = match *request.method() {              HttpMethod::DELETE => Method::Delete,              HttpMethod::GET => Method::Get, @@ -50,20 +109,25 @@ pub async fn handle_request(              HttpMethod::POST => Method::Post,              HttpMethod::PUT => Method::Put,              _ => { -                error!("Unsupported HTTP method in request, {}", request.method()); +                error!(method =? request.method(), "unsupported HTTP method in request");                  bail!("unsupported method");              }          }; -          let request_path = request.uri().path();          let (api_path, trimmed_path) = normalize_path(request_path); +        trace!("normalized path to {trimmed_path}"); -        let mut uri_string = format!("https://discord.com{api_path}{trimmed_path}"); +        let mut uri_string = format!( +            "{}{api_path}{trimmed_path}", +            config.upstream.expect("no upstream") +        );          if let Some(query) = request.uri().query() {              uri_string.push('?');              uri_string.push_str(query);          } +        trace!("full request uri is {uri_string}"); +          let mut hash = DefaultHasher::new();          match Path::try_from((method, trimmed_path)) {              Ok(path) => path, @@ -76,14 +140,36 @@ pub async fn handle_request(              }          }          .hash(&mut hash); +        let bucket = hash.finish().to_string(); +        trace!("Request bucket is {}", bucket); -        (hash.finish().to_string(), uri_string) +        (bucket, uri_string)      }; + +    REQUESTS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]); + +    let ticket_start = SystemTime::now(); +    TICKET_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);      // waits for the request to be authorized -    ratelimiter -        .ticket(hash.clone()) +    match ratelimiter +        .ticket(bucket.clone())          .instrument(debug_span!("ticket validation request")) -        .await?; +        .then(|v| async { +            TICKET_TIMES.record( +                &cx, +                ticket_start.elapsed()?.as_secs_f64(), +                &[KeyValue::new("bucket", bucket.clone())], +            ); +            v +        }) +        .await +    { +        Ok(_) => {} +        Err(e) => { +            error!("Error when requesting the ratelimiter: {:?}", e); +            bail!("failed to request the ratelimiter"); +        } +    }      request          .headers_mut() @@ -121,7 +207,21 @@ pub async fn handle_request(      };      *request.uri_mut() = uri;      let span = debug_span!("upstream request to discord"); -    let resp = match client.request(request).instrument(span).await { +    let upstream_start = SystemTime::now(); +    UPSTREAM_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]); +    let resp = match client +        .request(request) +        .instrument(span) +        .then(|v| async { +            UPSTREAM_TIMES.record( +                &cx, +                upstream_start.elapsed()?.as_secs_f64(), +                &[KeyValue::new("bucket", bucket.clone())], +            ); +            v.context("") +        }) +        .await +    {          Ok(response) => response,          Err(e) => {              error!("Error when requesting the Discord API: {:?}", e); @@ -132,14 +232,28 @@ pub async fn handle_request(      let headers = resp          .headers()          .into_iter() -        .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string))) +        .map(|(k, v)| { +            ( +                k.to_string(), +                v.to_str().map(std::string::ToString::to_string), +            ) +        })          .filter(|f| f.1.is_ok())          .map(|f| (f.0, f.1.expect("errors should be filtered")))          .collect(); +    HEADERS_SUBMIT_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);      let _submit_headers = ratelimiter -        .submit_headers(hash, headers) +        .submit_headers(bucket.clone(), headers)          .instrument(info_span!("submitting headers")) +        .then(|v| async { +            HEADERS_SUBMIT_TIMES.record( +                &cx, +                upstream_start.elapsed()?.as_secs_f64(), +                &[KeyValue::new("bucket", bucket.clone())], +            ); +            v +        })          .await;      Ok(resp) diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs index 174bea0..917f524 100644 --- a/exes/rest/src/lib.rs +++ b/exes/rest/src/lib.rs @@ -53,11 +53,12 @@ impl Component for ReverseProxyServer {              let client: Client<_, hyper::Body> = Client::builder().build(https);              let token = settings.config.discord.token.clone(); - +            let config = settings.config.clone();              let service_fn = make_service_fn(move |_: &AddrStream| {                  let client = client.clone();                  let ratelimiter = ratelimiter.clone();                  let token = token.clone(); +                let config = config.clone();                  async move {                      Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {                          let token = token.clone(); @@ -71,10 +72,11 @@ impl Component for ReverseProxyServer {                          let client = client.clone();                          let ratelimiter = ratelimiter.clone(); +                        let config = config.clone();                          async move {                              let token = token.clone();                              let ratelimiter = ratelimiter.clone(); -                            handle_request(client, ratelimiter, token, request).await +                            handle_request(client, ratelimiter, config, token, request).await                          }                      }))                  } diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs index fe8ada7..643bc89 100644 --- a/exes/rest/src/main.rs +++ b/exes/rest/src/main.rs @@ -1,4 +1,11 @@  use leash::ignite;  use rest::ReverseProxyServer; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; +  ignite!(ReverseProxyServer);  | 
