diff options
| -rw-r--r-- | Cargo.lock | 39 | ||||
| -rw-r--r-- | Dockerfile | 2 | ||||
| -rw-r--r-- | docker-compose.yaml | 12 | ||||
| -rw-r--r-- | exes/cache/Cargo.toml | 2 | ||||
| -rw-r--r-- | exes/gateway/Cargo.toml | 7 | ||||
| -rw-r--r-- | exes/gateway/src/lib.rs | 19 | ||||
| -rw-r--r-- | exes/gateway/src/main.rs | 7 | ||||
| -rw-r--r-- | exes/ratelimit/Cargo.toml | 3 | ||||
| -rw-r--r-- | exes/ratelimit/src/main.rs | 7 | ||||
| -rw-r--r-- | exes/rest/Cargo.toml | 5 | ||||
| -rw-r--r-- | exes/rest/src/config.rs | 7 | ||||
| -rw-r--r-- | exes/rest/src/handler.rs | 144 | ||||
| -rw-r--r-- | exes/rest/src/lib.rs | 6 | ||||
| -rw-r--r-- | exes/rest/src/main.rs | 7 | ||||
| -rw-r--r-- | exes/webhook/Cargo.toml | 5 | ||||
| -rw-r--r-- | exes/webhook/src/main.rs | 7 | ||||
| -rw-r--r-- | libs/leash/Cargo.toml | 2 | ||||
| -rw-r--r-- | libs/leash/src/lib.rs | 134 | ||||
| -rw-r--r-- | libs/shared/Cargo.toml | 6 | ||||
| -rw-r--r-- | libs/shared/src/config.rs | 1 | ||||
| -rw-r--r-- | libs/shared/src/lib.rs | 1 | ||||
| -rw-r--r-- | libs/shared/src/opentelemetry.rs | 91 | 
22 files changed, 429 insertions, 85 deletions
@@ -86,9 +86,9 @@ checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"  [[package]]  name = "async-nats" -version = "0.25.1" +version = "0.26.0"  source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32" +checksum = "0b0e90b3cd41350d89a242b981fb888f0eb8e3cb81c0fcb4563338f7e96a1084"  dependencies = [   "base64",   "base64-url", @@ -771,6 +771,12 @@ dependencies = [  ]  [[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + +[[package]]  name = "futures"  version = "0.3.25"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -873,6 +879,7 @@ dependencies = [   "serde",   "serde_json",   "shared", + "tikv-jemallocator",   "tokio",   "tokio-stream",   "tracing", @@ -1930,7 +1937,7 @@ version = "0.1.0"  dependencies = [   "anyhow",   "criterion", - "env_logger 0.7.1", + "env_logger 0.10.0",   "hyper",   "leash",   "opentelemetry", @@ -1940,6 +1947,7 @@ dependencies = [   "serde",   "shared",   "test-log", + "tikv-jemallocator",   "tokio",   "tokio-stream",   "tokio-test", @@ -2059,6 +2067,7 @@ dependencies = [   "proto",   "serde",   "shared", + "tikv-jemallocator",   "tokio",   "tokio-stream",   "tonic", @@ -2354,6 +2363,8 @@ dependencies = [   "anyhow",   "async-nats",   "config", + "opentelemetry", + "opentelemetry-otlp",   "redis",   "serde",   "serde_json", @@ -2564,6 +2575,27 @@ dependencies = [  ]  [[package]] +name = "tikv-jemalloc-sys" +version = "0.5.2+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec45c14da997d0925c7835883e4d5c181f196fa142f8c19d7643d1e9af2592c3" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20612db8a13a6c06d57ec83953694185a367e16945f66565e8028d2c0bd76979" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + +[[package]]  name = "time"  version = "0.3.17"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3264,6 +3296,7 @@ dependencies = [   "serde",   "serde_json",   "shared", + "tikv-jemallocator",   "tokio",   "tracing",   "twilight-model", @@ -1,7 +1,7 @@  FROM rust AS chef  USER root  RUN cargo install cargo-chef -RUN apt-get update && apt-get install -y protobuf-compiler  +RUN apt-get update && apt-get install -y protobuf-compiler  WORKDIR /app  # Planning install diff --git a/docker-compose.yaml b/docker-compose.yaml index 7126433..ac1fb12 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,7 +1,7 @@  version: "3.3"  services:    nats: -    image: bitnami/nats +    image: nats      restart: always      ports:        - 4222:4222 @@ -11,7 +11,8 @@ services:      image: redis      ports:        - 6379:6379 - +  mock: +    image: nginx    cache:      image: ghcr.io/discordnova/nova/cache      restart: always @@ -23,7 +24,6 @@ services:        - ./config/default.yml:/config/default.yml      environment:        - RUST_LOG=debug -      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317      depends_on:        - nats        - redis @@ -40,7 +40,6 @@ services:        - ./config/default.yml:/config/default.yml      environment:        - RUST_LOG=debug -      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317      depends_on:        - nats        - otelcol @@ -56,7 +55,6 @@ services:        - ./config/default.yml:/config/default.yml      environment:        - RUST_LOG=debug -      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317      depends_on:        - ratelimit        - otelcol @@ -76,7 +74,6 @@ services:        - ./config/default.yml:/config/default.yml      environment:        - RUST_LOG=debug -      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317      depends_on:        - nats        - otelcol @@ -94,7 +91,6 @@ services:        - ./config/default.yml:/config/default.yml      environment:        - RUST_LOG=debug -      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317      depends_on:        - nats        - redis @@ -108,8 +104,6 @@ services:      image: jaegertracing/all-in-one      container_name: jaeger      command: -      - "--memory.max-debugs" -      - "10000"        - "--query.base-path"        - "/jaeger/ui"        - "--prometheus.server-url" diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml index 70cb6a3..4dd7c97 100644 --- a/exes/cache/Cargo.toml +++ b/exes/cache/Cargo.toml @@ -19,7 +19,7 @@ tokio-stream = "0.1.11"  serde = { version = "1.0.8", features = ["derive"] }  serde_json = { version = "1.0" } -async-nats = "0.25.1" +async-nats = "0.26.0"  twilight-model = "0.14"  anyhow = "1.0.68" diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml index a174710..98a1ae8 100644 --- a/exes/gateway/Cargo.toml +++ b/exes/gateway/Cargo.toml @@ -23,8 +23,11 @@ serde_json = { version = "1.0" }  tracing = "0.1.37"  tracing-futures = "0.2.5" -async-nats = "0.25.1" +async-nats = "0.26.0"  tracing-opentelemetry = "0.18.0"  opentelemetry = "0.18.0" -opentelemetry-http = "0.7.0"
\ No newline at end of file +opentelemetry-http = "0.7.0" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5"
\ No newline at end of file diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs index c4da8f4..ddc4bbd 100644 --- a/exes/gateway/src/lib.rs +++ b/exes/gateway/src/lib.rs @@ -24,7 +24,7 @@ use tokio_stream::StreamExt;  use tracing_opentelemetry::OpenTelemetrySpanExt;  use twilight_gateway::{Event, Shard};  pub mod config; -use tracing::{debug, error, info, trace_span}; +use tracing::{debug, error, info, info_span, instrument, Instrument};  use twilight_model::gateway::event::DispatchEvent;  struct MetadataMap<'a>(&'a mut HeaderMap); @@ -84,13 +84,15 @@ impl Component for GatewayServer {      }  } +#[instrument]  async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {      if let Event::Ready(ready) = event { -        info!("Logged in as {}", ready.user.name); +        info!(username = ready.user.name, "logged in");      } else {          let name = event.kind().name();          if let Ok(dispatch_event) = DispatchEvent::try_from(event) { -            debug!("handling event {}", name.unwrap()); +            let name = name.unwrap(); +            debug!(event_name = name, "handling dispatch event");              let data = CachePayload {                  data: DispatchEventTagged(dispatch_event), @@ -98,7 +100,7 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {              let value = serde_json::to_string(&data)?;              let bytes = bytes::Bytes::from(value); -            let span = trace_span!("nats send"); +            let span = info_span!("nats send");              let mut header_map = HeaderMap::new();              let context = span.context(); @@ -106,12 +108,9 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {                  propagator.inject_context(&context, &mut MetadataMap(&mut header_map));              }); -            nats.publish_with_headers( -                format!("nova.cache.dispatch.{}", name.unwrap()), -                header_map, -                bytes, -            ) -            .await?; +            nats.publish_with_headers(format!("nova.cache.dispatch.{name}"), header_map, bytes) +                .instrument(info_span!("sending to nats")) +                .await?;          }      } diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index f1b0298..6cac7e1 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -1,4 +1,11 @@  use gateway::GatewayServer;  use leash::ignite; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; +  ignite!(GatewayServer); diff --git a/exes/ratelimit/Cargo.toml b/exes/ratelimit/Cargo.toml index 3989735..b011d38 100644 --- a/exes/ratelimit/Cargo.toml +++ b/exes/ratelimit/Cargo.toml @@ -39,3 +39,6 @@ test-log = { version = "0.2.11", features = ["log", "trace"] }  [[bench]]  name = "bucket"  harness = false + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5"
\ No newline at end of file diff --git a/exes/ratelimit/src/main.rs b/exes/ratelimit/src/main.rs index 2de812b..d16e7d3 100644 --- a/exes/ratelimit/src/main.rs +++ b/exes/ratelimit/src/main.rs @@ -1,4 +1,11 @@  use leash::ignite;  use ratelimit::RatelimiterServerComponent; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; +  ignite!(RatelimiterServerComponent); diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml index 5583416..3d362b2 100644 --- a/exes/rest/Cargo.toml +++ b/exes/rest/Cargo.toml @@ -29,4 +29,7 @@ tokio-stream = "0.1.11"  dns-lookup = "1.0.8"  opentelemetry = "0.18.0"  opentelemetry-http = "0.7.0" -tracing-opentelemetry = "0.18.0"
\ No newline at end of file +tracing-opentelemetry = "0.18.0" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5"
\ No newline at end of file diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 9593ac8..900d3fb 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -28,4 +28,11 @@ pub struct ReverseProxy {      pub discord: Discord,      pub ratelimiter_address: String,      pub ratelimiter_port: u16, +    #[serde(default = "default_upstream")] +    pub upstream: Option<String>, +} + +#[allow(clippy::unnecessary_wraps)] +fn default_upstream() -> Option<String> { +    Some("https://discord.com".to_string())  } diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index f053af7..3d47935 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -1,23 +1,78 @@ -use anyhow::bail; +use anyhow::{bail, Context}; +use futures_util::FutureExt;  use http::{      header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},      HeaderValue, Method as HttpMethod, Request, Response, Uri,  };  use hyper::{client::HttpConnector, Body, Client};  use hyper_rustls::HttpsConnector; +use opentelemetry::{ +    global, +    metrics::{Counter, Histogram}, +    Context as OpenTelemetryContext, KeyValue, +};  use std::{      collections::hash_map::DefaultHasher,      convert::TryFrom,      hash::{Hash, Hasher},      str::FromStr,      sync::Arc, +    time::SystemTime,  }; -use tracing::{debug_span, error, info_span, instrument, Instrument}; +use tracing::{debug_span, error, info_span, log::trace, Instrument};  use twilight_http_ratelimiting::{Method, Path}; -use crate::ratelimit_client::RemoteRatelimiter; +use crate::{config::ReverseProxy, ratelimit_client::RemoteRatelimiter}; +use lazy_static::lazy_static; + +lazy_static! { +    static ref METER_NAME: &'static str = ""; +    static ref REQUESTS: Counter<u64> = { +        global::meter(&METER_NAME) +            .u64_counter("rest.http_requests_total") +            .with_description("Amount of requests processed by the rest reverse proxy") +            .init() +    }; +    static ref UPSTREAM_CALLS: Counter<u64> = { +        global::meter(&METER_NAME) +            .u64_counter("rest.upstream_http_requests_total") +            .with_description("Amount of requests sent to discord") +            .init() +    }; +    static ref TICKET_CALLS: Counter<u64> = { +        global::meter(&METER_NAME) +            .u64_counter("rest.ticket_http_requests_total") +            .with_description("Amount of requests sent to the ratelimiter") +            .init() +    }; +    static ref HEADERS_SUBMIT_CALLS: Counter<u64> = { +        global::meter(&METER_NAME) +            .u64_counter("rest.header_submit_http_requests_total") +            .with_description("Amount of requests sent to the ratelimiter") +            .init() +    }; +    static ref UPSTREAM_TIMES: Histogram<f64> = { +        global::meter(&METER_NAME) +            .f64_histogram("rest.upstream_http_request_duration_seconds") +            .with_description("Time took to request discord") +            .init() +    }; +    static ref TICKET_TIMES: Histogram<f64> = { +        global::meter(&METER_NAME) +            .f64_histogram("rest.ticket_http_request_duration_seconds") +            .with_description("Time took to get a ticket from the ratelimiter") +            .init() +    }; +    static ref HEADERS_SUBMIT_TIMES: Histogram<f64> = { +        global::meter(&METER_NAME) +            .f64_histogram("rest.header_submit_http_request_duration_seconds") +            .with_description("Time took to get a ticket from the ratelimiter") +            .init() +    }; +}  /// Normalizes the path +#[inline]  fn normalize_path(request_path: &str) -> (&str, &str) {      if let Some(trimmed_path) = request_path.strip_prefix("/api") {          if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) { @@ -35,14 +90,18 @@ fn normalize_path(request_path: &str) -> (&str, &str) {      }  } -#[instrument] +#[inline] +#[allow(clippy::too_many_lines)]  pub async fn handle_request(      client: Client<HttpsConnector<HttpConnector>, Body>,      ratelimiter: Arc<RemoteRatelimiter>, +    config: ReverseProxy,      token: String,      mut request: Request<Body>,  ) -> Result<Response<Body>, anyhow::Error> { -    let (hash, uri_string) = { +    let cx = OpenTelemetryContext::current(); + +    let (bucket, uri_string) = {          let method = match *request.method() {              HttpMethod::DELETE => Method::Delete,              HttpMethod::GET => Method::Get, @@ -50,20 +109,25 @@ pub async fn handle_request(              HttpMethod::POST => Method::Post,              HttpMethod::PUT => Method::Put,              _ => { -                error!("Unsupported HTTP method in request, {}", request.method()); +                error!(method =? request.method(), "unsupported HTTP method in request");                  bail!("unsupported method");              }          }; -          let request_path = request.uri().path();          let (api_path, trimmed_path) = normalize_path(request_path); +        trace!("normalized path to {trimmed_path}"); -        let mut uri_string = format!("https://discord.com{api_path}{trimmed_path}"); +        let mut uri_string = format!( +            "{}{api_path}{trimmed_path}", +            config.upstream.expect("no upstream") +        );          if let Some(query) = request.uri().query() {              uri_string.push('?');              uri_string.push_str(query);          } +        trace!("full request uri is {uri_string}"); +          let mut hash = DefaultHasher::new();          match Path::try_from((method, trimmed_path)) {              Ok(path) => path, @@ -76,14 +140,36 @@ pub async fn handle_request(              }          }          .hash(&mut hash); +        let bucket = hash.finish().to_string(); +        trace!("Request bucket is {}", bucket); -        (hash.finish().to_string(), uri_string) +        (bucket, uri_string)      }; + +    REQUESTS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]); + +    let ticket_start = SystemTime::now(); +    TICKET_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);      // waits for the request to be authorized -    ratelimiter -        .ticket(hash.clone()) +    match ratelimiter +        .ticket(bucket.clone())          .instrument(debug_span!("ticket validation request")) -        .await?; +        .then(|v| async { +            TICKET_TIMES.record( +                &cx, +                ticket_start.elapsed()?.as_secs_f64(), +                &[KeyValue::new("bucket", bucket.clone())], +            ); +            v +        }) +        .await +    { +        Ok(_) => {} +        Err(e) => { +            error!("Error when requesting the ratelimiter: {:?}", e); +            bail!("failed to request the ratelimiter"); +        } +    }      request          .headers_mut() @@ -121,7 +207,21 @@ pub async fn handle_request(      };      *request.uri_mut() = uri;      let span = debug_span!("upstream request to discord"); -    let resp = match client.request(request).instrument(span).await { +    let upstream_start = SystemTime::now(); +    UPSTREAM_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]); +    let resp = match client +        .request(request) +        .instrument(span) +        .then(|v| async { +            UPSTREAM_TIMES.record( +                &cx, +                upstream_start.elapsed()?.as_secs_f64(), +                &[KeyValue::new("bucket", bucket.clone())], +            ); +            v.context("") +        }) +        .await +    {          Ok(response) => response,          Err(e) => {              error!("Error when requesting the Discord API: {:?}", e); @@ -132,14 +232,28 @@ pub async fn handle_request(      let headers = resp          .headers()          .into_iter() -        .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string))) +        .map(|(k, v)| { +            ( +                k.to_string(), +                v.to_str().map(std::string::ToString::to_string), +            ) +        })          .filter(|f| f.1.is_ok())          .map(|f| (f.0, f.1.expect("errors should be filtered")))          .collect(); +    HEADERS_SUBMIT_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);      let _submit_headers = ratelimiter -        .submit_headers(hash, headers) +        .submit_headers(bucket.clone(), headers)          .instrument(info_span!("submitting headers")) +        .then(|v| async { +            HEADERS_SUBMIT_TIMES.record( +                &cx, +                upstream_start.elapsed()?.as_secs_f64(), +                &[KeyValue::new("bucket", bucket.clone())], +            ); +            v +        })          .await;      Ok(resp) diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs index 174bea0..917f524 100644 --- a/exes/rest/src/lib.rs +++ b/exes/rest/src/lib.rs @@ -53,11 +53,12 @@ impl Component for ReverseProxyServer {              let client: Client<_, hyper::Body> = Client::builder().build(https);              let token = settings.config.discord.token.clone(); - +            let config = settings.config.clone();              let service_fn = make_service_fn(move |_: &AddrStream| {                  let client = client.clone();                  let ratelimiter = ratelimiter.clone();                  let token = token.clone(); +                let config = config.clone();                  async move {                      Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {                          let token = token.clone(); @@ -71,10 +72,11 @@ impl Component for ReverseProxyServer {                          let client = client.clone();                          let ratelimiter = ratelimiter.clone(); +                        let config = config.clone();                          async move {                              let token = token.clone();                              let ratelimiter = ratelimiter.clone(); -                            handle_request(client, ratelimiter, token, request).await +                            handle_request(client, ratelimiter, config, token, request).await                          }                      }))                  } diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs index fe8ada7..643bc89 100644 --- a/exes/rest/src/main.rs +++ b/exes/rest/src/main.rs @@ -1,4 +1,11 @@  use leash::ignite;  use rest::ReverseProxyServer; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; +  ignite!(ReverseProxyServer); diff --git a/exes/webhook/Cargo.toml b/exes/webhook/Cargo.toml index 0c50009..af256c8 100644 --- a/exes/webhook/Cargo.toml +++ b/exes/webhook/Cargo.toml @@ -19,4 +19,7 @@ ed25519-dalek = "1"  twilight-model = { version = "0.14" }  anyhow = "1.0.68" -async-nats = "0.25.1" +async-nats = "0.26.0" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5"
\ No newline at end of file diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs index f531725..9432dbd 100644 --- a/exes/webhook/src/main.rs +++ b/exes/webhook/src/main.rs @@ -1,4 +1,11 @@  use leash::ignite;  use webhook::WebhookServer; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; +  ignite!(WebhookServer); diff --git a/libs/leash/Cargo.toml b/libs/leash/Cargo.toml index ca110d0..b96f384 100644 --- a/libs/leash/Cargo.toml +++ b/libs/leash/Cargo.toml @@ -16,4 +16,4 @@ tracing = "0.1.37"  env_logger = "0.10.0"  tracing-opentelemetry = "0.18.0"  opentelemetry = { version ="0.18.0", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.11.0" } +opentelemetry-otlp = { version = "0.11.0", features = ["metrics"] } diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs index 91bfaf5..f3c9472 100644 --- a/libs/leash/src/lib.rs +++ b/libs/leash/src/lib.rs @@ -6,10 +6,13 @@      clippy::complexity,      clippy::perf,      clippy::pedantic, -    clippy::nursery, +    clippy::nursery  )]  use anyhow::Result; +use opentelemetry::global::shutdown_tracer_provider; +use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector; +use opentelemetry::sdk::metrics::selectors;  use opentelemetry::sdk::propagation::TraceContextPropagator;  use opentelemetry::sdk::trace::{self};  use opentelemetry::sdk::Resource; @@ -18,8 +21,10 @@ use opentelemetry_otlp::WithExportConfig;  use serde::de::DeserializeOwned;  use shared::config::Settings;  use std::str::FromStr; +use std::time::Duration;  use std::{future::Future, pin::Pin};  use tokio::sync::oneshot; +use tracing::log::error;  use tracing::{info, log::trace};  use tracing_subscriber::filter::Directive;  use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -35,54 +40,111 @@ pub trait Component: Send + Sync + 'static + Sized {          stop: oneshot::Receiver<()>,      ) -> AnyhowResultFuture<()>;      fn new() -> Self; +} -    fn _internal_start(self) -> AnyhowResultFuture<()> { -        Box::pin(async move { -            global::set_text_map_propagator(TraceContextPropagator::new()); +/// # Panics +/// Panics in case of an invalid `RUST_LOG` variable. +pub fn start_component<Y, C>(component: Y) -> AnyhowResultFuture<()> +where +    Y: Component<Config = C>, +    C: Default + Clone + DeserializeOwned + Send, +{ +    Box::pin(async move { +        let settings = Settings::<Y::Config>::new(Y::SERVICE_NAME)?; + +        if let Some(meter_config) = settings +            .opentelemetry +            .as_ref() +            .and_then(|f| f.metrics.clone()) +        { +            let meter = opentelemetry_otlp::new_pipeline() +                .metrics( +                    selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), +                    stateless_temporality_selector(), +                    opentelemetry::runtime::Tokio, +                ) +                .with_exporter( +                    opentelemetry_otlp::new_exporter() +                        .tonic() +                        .with_export_config(meter_config.into()), +                ) +                .with_period(Duration::from_secs(3)) +                .with_timeout(Duration::from_secs(10)) +                .build()?; +            // Using the opentelemetry_otlp meter +            global::set_meter_provider(meter); +        } +        // Use the text propagator +        global::set_text_map_propagator(TraceContextPropagator::new()); +        // Print debug errors +        global::set_error_handler(|error| { +            error!("OpenTelemetry error: {}", error); +        })?; + +        if let Some(tracer_config) = settings +            .opentelemetry +            .as_ref() +            .and_then(|f| f.traces.clone()) +        {              let tracer = opentelemetry_otlp::new_pipeline()                  .tracing()                  .with_trace_config(trace::config().with_resource(Resource::new(vec![ -                    KeyValue::new("service.name", Self::SERVICE_NAME), +                    KeyValue::new("service.name", Y::SERVICE_NAME),                  ]))) -                .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env()) +                .with_exporter( +                    opentelemetry_otlp::new_exporter() +                        .tonic() +                        .with_export_config(tracer_config.into()), +                )                  .install_batch(opentelemetry::runtime::Tokio)?; +            let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); -            let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - +            tracing_subscriber::registry() +                .with(otel_layer) +                .with( +                    // Use the info level as default +                    EnvFilter::builder() +                        .with_default_directive(Directive::from_str("info").unwrap()) +                        .from_env()?, +                ) +                .init(); +        } else { +            // Setup tracing              tracing_subscriber::registry()                  .with(fmt::layer()) -                .with(telemetry)                  .with( +                    // Use the info level as default                      EnvFilter::builder()                          .with_default_directive(Directive::from_str("info").unwrap())                          .from_env()?,                  )                  .init(); +        } -            info!("Starting nova"); -            let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME); -            let (stop, stop_channel) = oneshot::channel(); - -            tokio::spawn(async move { -                trace!("started signal watching"); -                #[cfg(unix)] -                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) -                    .unwrap() -                    .recv() -                    .await; -                #[cfg(not(unix))] -                return tokio::signal::ctrl_c().await.unwrap(); - -                stop.send(()).unwrap(); -            }); - -            trace!( -                "Starting component {component}", -                component = Self::SERVICE_NAME -            ); -            self.start(settings?, stop_channel).await -        }) -    } +        // Finally starting nova +        info!("Starting nova component {}", Y::SERVICE_NAME); +        let (stop, stop_channel) = oneshot::channel(); + +        tokio::spawn(async move { +            trace!("started signal watching"); +            #[cfg(unix)] +            tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) +                .unwrap() +                .recv() +                .await; +            #[cfg(not(unix))] +            return tokio::signal::ctrl_c().await.unwrap(); + +            stop.send(()).unwrap(); +            shutdown_tracer_provider(); +        }); + +        trace!( +            "Starting component {component}", +            component = Y::SERVICE_NAME +        ); +        component.start(settings, stop_channel).await +    })  }  #[macro_export] @@ -90,9 +152,9 @@ macro_rules! ignite {      ($c:ty) => {          #[allow(dead_code)]          fn main() -> anyhow::Result<()> { -            use leash::Component; +            use $crate::Component;              let rt = tokio::runtime::Runtime::new()?; -            rt.block_on(<$c as Component>::new()._internal_start())?; +            rt.block_on($crate::start_component(<$c as Component>::new()))?;              Ok(())          }      }; @@ -128,7 +190,5 @@ mod test {          }      } -     -      ignite!(TestComponent);  } diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml index 419ad0c..9b29bf2 100644 --- a/libs/shared/Cargo.toml +++ b/libs/shared/Cargo.toml @@ -10,7 +10,7 @@ serde_repr = "0.1"  config = { version = "0.13", default-features = false, features = ["json", "yaml-rust", "ini"] } -async-nats = "0.25.1" +async-nats = "0.26.0"  redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }  tokio = { version = "1", features = ["signal", "rt"] } @@ -21,4 +21,6 @@ anyhow = "1.0.68"  serde_test = "1.0.152" -tracing = "0.1.37"
\ No newline at end of file +tracing = "0.1.37" +opentelemetry-otlp = "0.11.0" +opentelemetry = "0.18.0"
\ No newline at end of file diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs index cdf0bd3..4967e3c 100644 --- a/libs/shared/src/config.rs +++ b/libs/shared/src/config.rs @@ -10,6 +10,7 @@ pub struct Settings<T: Clone + DeserializeOwned> {      pub config: T,      pub nats: crate::nats::Configuration,      pub redis: crate::redis::Configuration, +    pub opentelemetry: Option<crate::opentelemetry::Configuration>,  }  impl<T: Clone + DeserializeOwned + Default> Settings<T> { diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index 92e7a05..33a6a13 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -13,3 +13,4 @@ pub mod config;  pub mod nats;  pub mod payloads;  pub mod redis; +pub mod opentelemetry;
\ No newline at end of file diff --git a/libs/shared/src/opentelemetry.rs b/libs/shared/src/opentelemetry.rs new file mode 100644 index 0000000..ca6542d --- /dev/null +++ b/libs/shared/src/opentelemetry.rs @@ -0,0 +1,91 @@ +use std::ops::{Deref, DerefMut}; + +use opentelemetry_otlp::{ExportConfig, Protocol}; +use serde::{de::Visitor, Deserialize}; + +#[derive(Debug, Default)] +#[repr(transparent)] +pub struct ExportConfigDeserialize(ExportConfig); +impl Clone for ExportConfigDeserialize { +    fn clone(&self) -> Self { +        Self(ExportConfig { +            endpoint: self.0.endpoint.clone(), +            protocol: self.0.protocol, +            timeout: self.0.timeout, +        }) +    } +} + +impl From<ExportConfigDeserialize> for ExportConfig { +    fn from(val: ExportConfigDeserialize) -> Self { +        val.0 +    } +} + +impl Deref for ExportConfigDeserialize { +    type Target = ExportConfig; + +    fn deref(&self) -> &Self::Target { +        &self.0 +    } +} + +impl DerefMut for ExportConfigDeserialize { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.0 +    } +} + +impl<'de> Deserialize<'de> for ExportConfigDeserialize { +    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error> +    where +        D: serde::Deserializer<'de>, +    { +        #[derive(Deserialize, Debug)] +        #[serde(field_identifier, rename_all = "lowercase")] +        enum Fields { +            Endpoint, +            Timeout, +        } + +        struct OpenTelemetryExportConfigVisitor; +        impl<'de> Visitor<'de> for OpenTelemetryExportConfigVisitor { +            type Value = ExportConfigDeserialize; +            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { +                formatter.write_str("struct OpenTelemetryExportConfig") +            } + +            fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error> +            where +                A: serde::de::MapAccess<'de>, +            { +                let mut export_config = ExportConfigDeserialize::default(); +                export_config.0.protocol = Protocol::Grpc; +                while let Some(name) = map.next_key::<Fields>()? { +                    match name { +                        Fields::Endpoint => { +                            export_config.0.endpoint = map.next_value()?; +                        } +                        Fields::Timeout => { +                            export_config.0.timeout = map.next_value()?; +                        } +                    } +                } + +                Ok(export_config) +            } +        } + +        deserializer.deserialize_struct( +            "OpenTelemetryExportConfig", +            &["endpoint", "protocol", "timeout"], +            OpenTelemetryExportConfigVisitor, +        ) +    } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Configuration { +    pub traces: Option<ExportConfigDeserialize>, +    pub metrics: Option<ExportConfigDeserialize>, +}  | 
