]> git.puffer.fish Git - matthieu/nova.git/commitdiff
add metrics and jemalloc
authorMatthieuCoder <matthieu@matthieu-dev.xyz>
Sun, 15 Jan 2023 13:03:29 +0000 (17:03 +0400)
committerMatthieuCoder <matthieu@matthieu-dev.xyz>
Sun, 15 Jan 2023 13:03:29 +0000 (17:03 +0400)
22 files changed:
Cargo.lock
Dockerfile
docker-compose.yaml
exes/cache/Cargo.toml
exes/gateway/Cargo.toml
exes/gateway/src/lib.rs
exes/gateway/src/main.rs
exes/ratelimit/Cargo.toml
exes/ratelimit/src/main.rs
exes/rest/Cargo.toml
exes/rest/src/config.rs
exes/rest/src/handler.rs
exes/rest/src/lib.rs
exes/rest/src/main.rs
exes/webhook/Cargo.toml
exes/webhook/src/main.rs
libs/leash/Cargo.toml
libs/leash/src/lib.rs
libs/shared/Cargo.toml
libs/shared/src/config.rs
libs/shared/src/lib.rs
libs/shared/src/opentelemetry.rs [new file with mode: 0644]

index 45885095514cf643142726ead7f5a139ca453f43..d47e07f6d490cf0ab539288f1ed32acd43434932 100644 (file)
@@ -86,9 +86,9 @@ checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
 
 [[package]]
 name = "async-nats"
-version = "0.25.1"
+version = "0.26.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32"
+checksum = "0b0e90b3cd41350d89a242b981fb888f0eb8e3cb81c0fcb4563338f7e96a1084"
 dependencies = [
  "base64",
  "base64-url",
@@ -770,6 +770,12 @@ dependencies = [
  "percent-encoding",
 ]
 
+[[package]]
+name = "fs_extra"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
+
 [[package]]
 name = "futures"
 version = "0.3.25"
@@ -873,6 +879,7 @@ dependencies = [
  "serde",
  "serde_json",
  "shared",
+ "tikv-jemallocator",
  "tokio",
  "tokio-stream",
  "tracing",
@@ -1930,7 +1937,7 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "criterion",
- "env_logger 0.7.1",
+ "env_logger 0.10.0",
  "hyper",
  "leash",
  "opentelemetry",
@@ -1940,6 +1947,7 @@ dependencies = [
  "serde",
  "shared",
  "test-log",
+ "tikv-jemallocator",
  "tokio",
  "tokio-stream",
  "tokio-test",
@@ -2059,6 +2067,7 @@ dependencies = [
  "proto",
  "serde",
  "shared",
+ "tikv-jemallocator",
  "tokio",
  "tokio-stream",
  "tonic",
@@ -2354,6 +2363,8 @@ dependencies = [
  "anyhow",
  "async-nats",
  "config",
+ "opentelemetry",
+ "opentelemetry-otlp",
  "redis",
  "serde",
  "serde_json",
@@ -2563,6 +2574,27 @@ dependencies = [
  "once_cell",
 ]
 
+[[package]]
+name = "tikv-jemalloc-sys"
+version = "0.5.2+5.3.0-patched"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ec45c14da997d0925c7835883e4d5c181f196fa142f8c19d7643d1e9af2592c3"
+dependencies = [
+ "cc",
+ "fs_extra",
+ "libc",
+]
+
+[[package]]
+name = "tikv-jemallocator"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "20612db8a13a6c06d57ec83953694185a367e16945f66565e8028d2c0bd76979"
+dependencies = [
+ "libc",
+ "tikv-jemalloc-sys",
+]
+
 [[package]]
 name = "time"
 version = "0.3.17"
@@ -3264,6 +3296,7 @@ dependencies = [
  "serde",
  "serde_json",
  "shared",
+ "tikv-jemallocator",
  "tokio",
  "tracing",
  "twilight-model",
index 4b82766e17e479c0a56661568c426b604cf0f79f..4e54bbfcdd36ce5a8df0ca86f3577af0f73e7355 100644 (file)
@@ -1,7 +1,7 @@
 FROM rust AS chef
 USER root
 RUN cargo install cargo-chef
-RUN apt-get update && apt-get install -y protobuf-compiler 
+RUN apt-get update && apt-get install -y protobuf-compiler
 WORKDIR /app
 
 # Planning install
index 712643319b5f9e11ae3265b6b7b9e33fcddebbfd..ac1fb12b4c53899c8b5d09fe8cb4e8c28cfb65d3 100644 (file)
@@ -1,7 +1,7 @@
 version: "3.3"
 services:
   nats:
-    image: bitnami/nats
+    image: nats
     restart: always
     ports:
       - 4222:4222
@@ -11,7 +11,8 @@ services:
     image: redis
     ports:
       - 6379:6379
-
+  mock:
+    image: nginx
   cache:
     image: ghcr.io/discordnova/nova/cache
     restart: always
@@ -23,7 +24,6 @@ services:
       - ./config/default.yml:/config/default.yml
     environment:
       - RUST_LOG=debug
-      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
     depends_on:
       - nats
       - redis
@@ -40,7 +40,6 @@ services:
       - ./config/default.yml:/config/default.yml
     environment:
       - RUST_LOG=debug
-      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
     depends_on:
       - nats
       - otelcol
@@ -56,7 +55,6 @@ services:
       - ./config/default.yml:/config/default.yml
     environment:
       - RUST_LOG=debug
-      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
     depends_on:
       - ratelimit
       - otelcol
@@ -76,7 +74,6 @@ services:
       - ./config/default.yml:/config/default.yml
     environment:
       - RUST_LOG=debug
-      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
     depends_on:
       - nats
       - otelcol
@@ -94,7 +91,6 @@ services:
       - ./config/default.yml:/config/default.yml
     environment:
       - RUST_LOG=debug
-      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
     depends_on:
       - nats
       - redis
@@ -108,8 +104,6 @@ services:
     image: jaegertracing/all-in-one
     container_name: jaeger
     command:
-      - "--memory.max-debugs"
-      - "10000"
       - "--query.base-path"
       - "/jaeger/ui"
       - "--prometheus.server-url"
index 70cb6a3903c3b96d7097ed778c33e7b07c7e35a2..4dd7c9712ad1d2149df6eacf39b2a6c07e82eb84 100644 (file)
@@ -19,7 +19,7 @@ tokio-stream = "0.1.11"
 serde = { version = "1.0.8", features = ["derive"] }
 serde_json = { version = "1.0" }
 
-async-nats = "0.25.1"
+async-nats = "0.26.0"
 twilight-model = "0.14"
 anyhow = "1.0.68"
 
index a17471013f01d9376a1ae5772219cc78937e2f49..98a1ae83e2dc3a8e1fadbadba0bb680e6b0cb072 100644 (file)
@@ -23,8 +23,11 @@ serde_json = { version = "1.0" }
 tracing = "0.1.37"
 tracing-futures = "0.2.5"
 
-async-nats = "0.25.1"
+async-nats = "0.26.0"
 
 tracing-opentelemetry = "0.18.0"
 opentelemetry = "0.18.0"
-opentelemetry-http = "0.7.0"
\ No newline at end of file
+opentelemetry-http = "0.7.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
\ No newline at end of file
index c4da8f44ef4487af4fa608c887c087e05788c79c..ddc4bbd36c78c0a150253aec23f9e2debf253730 100644 (file)
@@ -24,7 +24,7 @@ use tokio_stream::StreamExt;
 use tracing_opentelemetry::OpenTelemetrySpanExt;
 use twilight_gateway::{Event, Shard};
 pub mod config;
-use tracing::{debug, error, info, trace_span};
+use tracing::{debug, error, info, info_span, instrument, Instrument};
 use twilight_model::gateway::event::DispatchEvent;
 
 struct MetadataMap<'a>(&'a mut HeaderMap);
@@ -84,13 +84,15 @@ impl Component for GatewayServer {
     }
 }
 
+#[instrument]
 async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
     if let Event::Ready(ready) = event {
-        info!("Logged in as {}", ready.user.name);
+        info!(username = ready.user.name, "logged in");
     } else {
         let name = event.kind().name();
         if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
-            debug!("handling event {}", name.unwrap());
+            let name = name.unwrap();
+            debug!(event_name = name, "handling dispatch event");
 
             let data = CachePayload {
                 data: DispatchEventTagged(dispatch_event),
@@ -98,7 +100,7 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
             let value = serde_json::to_string(&data)?;
             let bytes = bytes::Bytes::from(value);
 
-            let span = trace_span!("nats send");
+            let span = info_span!("nats send");
 
             let mut header_map = HeaderMap::new();
             let context = span.context();
@@ -106,12 +108,9 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
                 propagator.inject_context(&context, &mut MetadataMap(&mut header_map));
             });
 
-            nats.publish_with_headers(
-                format!("nova.cache.dispatch.{}", name.unwrap()),
-                header_map,
-                bytes,
-            )
-            .await?;
+            nats.publish_with_headers(format!("nova.cache.dispatch.{name}"), header_map, bytes)
+                .instrument(info_span!("sending to nats"))
+                .await?;
         }
     }
 
index f1b029891f77ca4d838971c4692c0c72cfe3f778..6cac7e1c1b9f70c600c2730b98995e7fbc27c999 100644 (file)
@@ -1,4 +1,11 @@
 use gateway::GatewayServer;
 use leash::ignite;
 
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
 ignite!(GatewayServer);
index 3989735e146e2af72b2e640e165d59ee0fbdb40e..b011d381ee975da2469d3a61489a9b13ae99fd72 100644 (file)
@@ -39,3 +39,6 @@ test-log = { version = "0.2.11", features = ["log", "trace"] }
 [[bench]]
 name = "bucket"
 harness = false
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
\ No newline at end of file
index 2de812b531d439e6050b3b483277db0f42267f3c..d16e7d371009c384849ae8141b70081e75357e73 100644 (file)
@@ -1,4 +1,11 @@
 use leash::ignite;
 use ratelimit::RatelimiterServerComponent;
 
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
 ignite!(RatelimiterServerComponent);
index 55834166bf4e8ebd71b5ed4c222a928f40375608..3d362b21dfccfa5f9b8fcec9759ae103169ba54c 100644 (file)
@@ -29,4 +29,7 @@ tokio-stream = "0.1.11"
 dns-lookup = "1.0.8"
 opentelemetry = "0.18.0"
 opentelemetry-http = "0.7.0"
-tracing-opentelemetry = "0.18.0"
\ No newline at end of file
+tracing-opentelemetry = "0.18.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
\ No newline at end of file
index 9593ac8f66f67b2bbc852f2f01b64254d3c23689..900d3fb5dfee20b7f8770b3c5c8e2a013c58c258 100644 (file)
@@ -28,4 +28,11 @@ pub struct ReverseProxy {
     pub discord: Discord,
     pub ratelimiter_address: String,
     pub ratelimiter_port: u16,
+    #[serde(default = "default_upstream")]
+    pub upstream: Option<String>,
+}
+
+#[allow(clippy::unnecessary_wraps)]
+fn default_upstream() -> Option<String> {
+    Some("https://discord.com".to_string())
 }
index f053af73c3793442523385ca944543ff35a9b2bd..3d4793514fecd18bf834d5ca2285060becb1a3dc 100644 (file)
@@ -1,23 +1,78 @@
-use anyhow::bail;
+use anyhow::{bail, Context};
+use futures_util::FutureExt;
 use http::{
     header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
     HeaderValue, Method as HttpMethod, Request, Response, Uri,
 };
 use hyper::{client::HttpConnector, Body, Client};
 use hyper_rustls::HttpsConnector;
+use opentelemetry::{
+    global,
+    metrics::{Counter, Histogram},
+    Context as OpenTelemetryContext, KeyValue,
+};
 use std::{
     collections::hash_map::DefaultHasher,
     convert::TryFrom,
     hash::{Hash, Hasher},
     str::FromStr,
     sync::Arc,
+    time::SystemTime,
 };
-use tracing::{debug_span, error, info_span, instrument, Instrument};
+use tracing::{debug_span, error, info_span, log::trace, Instrument};
 use twilight_http_ratelimiting::{Method, Path};
 
-use crate::ratelimit_client::RemoteRatelimiter;
+use crate::{config::ReverseProxy, ratelimit_client::RemoteRatelimiter};
+use lazy_static::lazy_static;
+
+lazy_static! {
+    static ref METER_NAME: &'static str = "";
+    static ref REQUESTS: Counter<u64> = {
+        global::meter(&METER_NAME)
+            .u64_counter("rest.http_requests_total")
+            .with_description("Amount of requests processed by the rest reverse proxy")
+            .init()
+    };
+    static ref UPSTREAM_CALLS: Counter<u64> = {
+        global::meter(&METER_NAME)
+            .u64_counter("rest.upstream_http_requests_total")
+            .with_description("Amount of requests sent to discord")
+            .init()
+    };
+    static ref TICKET_CALLS: Counter<u64> = {
+        global::meter(&METER_NAME)
+            .u64_counter("rest.ticket_http_requests_total")
+            .with_description("Amount of requests sent to the ratelimiter")
+            .init()
+    };
+    static ref HEADERS_SUBMIT_CALLS: Counter<u64> = {
+        global::meter(&METER_NAME)
+            .u64_counter("rest.header_submit_http_requests_total")
+            .with_description("Amount of requests sent to the ratelimiter")
+            .init()
+    };
+    static ref UPSTREAM_TIMES: Histogram<f64> = {
+        global::meter(&METER_NAME)
+            .f64_histogram("rest.upstream_http_request_duration_seconds")
+            .with_description("Time took to request discord")
+            .init()
+    };
+    static ref TICKET_TIMES: Histogram<f64> = {
+        global::meter(&METER_NAME)
+            .f64_histogram("rest.ticket_http_request_duration_seconds")
+            .with_description("Time took to get a ticket from the ratelimiter")
+            .init()
+    };
+    static ref HEADERS_SUBMIT_TIMES: Histogram<f64> = {
+        global::meter(&METER_NAME)
+            .f64_histogram("rest.header_submit_http_request_duration_seconds")
+            .with_description("Time took to get a ticket from the ratelimiter")
+            .init()
+    };
+}
 
 /// Normalizes the path
+#[inline]
 fn normalize_path(request_path: &str) -> (&str, &str) {
     if let Some(trimmed_path) = request_path.strip_prefix("/api") {
         if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) {
@@ -35,14 +90,18 @@ fn normalize_path(request_path: &str) -> (&str, &str) {
     }
 }
 
-#[instrument]
+#[inline]
+#[allow(clippy::too_many_lines)]
 pub async fn handle_request(
     client: Client<HttpsConnector<HttpConnector>, Body>,
     ratelimiter: Arc<RemoteRatelimiter>,
+    config: ReverseProxy,
     token: String,
     mut request: Request<Body>,
 ) -> Result<Response<Body>, anyhow::Error> {
-    let (hash, uri_string) = {
+    let cx = OpenTelemetryContext::current();
+
+    let (bucket, uri_string) = {
         let method = match *request.method() {
             HttpMethod::DELETE => Method::Delete,
             HttpMethod::GET => Method::Get,
@@ -50,20 +109,25 @@ pub async fn handle_request(
             HttpMethod::POST => Method::Post,
             HttpMethod::PUT => Method::Put,
             _ => {
-                error!("Unsupported HTTP method in request, {}", request.method());
+                error!(method =? request.method(), "unsupported HTTP method in request");
                 bail!("unsupported method");
             }
         };
-
         let request_path = request.uri().path();
         let (api_path, trimmed_path) = normalize_path(request_path);
+        trace!("normalized path to {trimmed_path}");
 
-        let mut uri_string = format!("https://discord.com{api_path}{trimmed_path}");
+        let mut uri_string = format!(
+            "{}{api_path}{trimmed_path}",
+            config.upstream.expect("no upstream")
+        );
         if let Some(query) = request.uri().query() {
             uri_string.push('?');
             uri_string.push_str(query);
         }
 
+        trace!("full request uri is {uri_string}");
+
         let mut hash = DefaultHasher::new();
         match Path::try_from((method, trimmed_path)) {
             Ok(path) => path,
@@ -76,14 +140,36 @@ pub async fn handle_request(
             }
         }
         .hash(&mut hash);
+        let bucket = hash.finish().to_string();
+        trace!("Request bucket is {}", bucket);
 
-        (hash.finish().to_string(), uri_string)
+        (bucket, uri_string)
     };
+
+    REQUESTS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
+
+    let ticket_start = SystemTime::now();
+    TICKET_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
     // waits for the request to be authorized
-    ratelimiter
-        .ticket(hash.clone())
+    match ratelimiter
+        .ticket(bucket.clone())
         .instrument(debug_span!("ticket validation request"))
-        .await?;
+        .then(|v| async {
+            TICKET_TIMES.record(
+                &cx,
+                ticket_start.elapsed()?.as_secs_f64(),
+                &[KeyValue::new("bucket", bucket.clone())],
+            );
+            v
+        })
+        .await
+    {
+        Ok(_) => {}
+        Err(e) => {
+            error!("Error when requesting the ratelimiter: {:?}", e);
+            bail!("failed to request the ratelimiter");
+        }
+    }
 
     request
         .headers_mut()
@@ -121,7 +207,21 @@ pub async fn handle_request(
     };
     *request.uri_mut() = uri;
     let span = debug_span!("upstream request to discord");
-    let resp = match client.request(request).instrument(span).await {
+    let upstream_start = SystemTime::now();
+    UPSTREAM_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
+    let resp = match client
+        .request(request)
+        .instrument(span)
+        .then(|v| async {
+            UPSTREAM_TIMES.record(
+                &cx,
+                upstream_start.elapsed()?.as_secs_f64(),
+                &[KeyValue::new("bucket", bucket.clone())],
+            );
+            v.context("")
+        })
+        .await
+    {
         Ok(response) => response,
         Err(e) => {
             error!("Error when requesting the Discord API: {:?}", e);
@@ -132,14 +232,28 @@ pub async fn handle_request(
     let headers = resp
         .headers()
         .into_iter()
-        .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string)))
+        .map(|(k, v)| {
+            (
+                k.to_string(),
+                v.to_str().map(std::string::ToString::to_string),
+            )
+        })
         .filter(|f| f.1.is_ok())
         .map(|f| (f.0, f.1.expect("errors should be filtered")))
         .collect();
 
+    HEADERS_SUBMIT_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
     let _submit_headers = ratelimiter
-        .submit_headers(hash, headers)
+        .submit_headers(bucket.clone(), headers)
         .instrument(info_span!("submitting headers"))
+        .then(|v| async {
+            HEADERS_SUBMIT_TIMES.record(
+                &cx,
+                upstream_start.elapsed()?.as_secs_f64(),
+                &[KeyValue::new("bucket", bucket.clone())],
+            );
+            v
+        })
         .await;
 
     Ok(resp)
index 174bea0c4b13a0bba3d29b97b7debfd753735d1c..917f5247c43e4242ccccc88663580784c906ef46 100644 (file)
@@ -53,11 +53,12 @@ impl Component for ReverseProxyServer {
 
             let client: Client<_, hyper::Body> = Client::builder().build(https);
             let token = settings.config.discord.token.clone();
-
+            let config = settings.config.clone();
             let service_fn = make_service_fn(move |_: &AddrStream| {
                 let client = client.clone();
                 let ratelimiter = ratelimiter.clone();
                 let token = token.clone();
+                let config = config.clone();
                 async move {
                     Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
                         let token = token.clone();
@@ -71,10 +72,11 @@ impl Component for ReverseProxyServer {
                         let client = client.clone();
                         let ratelimiter = ratelimiter.clone();
 
+                        let config = config.clone();
                         async move {
                             let token = token.clone();
                             let ratelimiter = ratelimiter.clone();
-                            handle_request(client, ratelimiter, token, request).await
+                            handle_request(client, ratelimiter, config, token, request).await
                         }
                     }))
                 }
index fe8ada759ab1ec131bd00d7f134ab2aed31d7770..643bc89cf45289a004da6ea0d587b89d07e87197 100644 (file)
@@ -1,4 +1,11 @@
 use leash::ignite;
 use rest::ReverseProxyServer;
 
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
 ignite!(ReverseProxyServer);
index 0c50009fbbbf194f56acdbde2e9252a7f621798e..af256c8d33e4dd97e333ed9bac35419c5694f04a 100644 (file)
@@ -19,4 +19,7 @@ ed25519-dalek = "1"
 twilight-model = { version = "0.14" }
 anyhow = "1.0.68"
 
-async-nats = "0.25.1"
+async-nats = "0.26.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
\ No newline at end of file
index f53172558ca50d64363e1f3cbe1a0f1315e34c06..9432dbdbeca4d6021c163ef3d51538caa7c3da54 100644 (file)
@@ -1,4 +1,11 @@
 use leash::ignite;
 use webhook::WebhookServer;
 
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
 ignite!(WebhookServer);
index ca110d0555cac1206f83d04f93c3fd87f3de09a5..b96f3849fba4b575cac8aa5088e0ea2291008535 100644 (file)
@@ -16,4 +16,4 @@ tracing = "0.1.37"
 env_logger = "0.10.0"
 tracing-opentelemetry = "0.18.0"
 opentelemetry = { version ="0.18.0", features = ["rt-tokio"] }
-opentelemetry-otlp = { version = "0.11.0" }
+opentelemetry-otlp = { version = "0.11.0", features = ["metrics"] }
index 91bfaf59257e8db14704e9a3b1cf6a9f2312b278..f3c947292b1717c9f1301bf1a2086c907f1eb930 100644 (file)
@@ -6,10 +6,13 @@
     clippy::complexity,
     clippy::perf,
     clippy::pedantic,
-    clippy::nursery,
+    clippy::nursery
 )]
 
 use anyhow::Result;
+use opentelemetry::global::shutdown_tracer_provider;
+use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector;
+use opentelemetry::sdk::metrics::selectors;
 use opentelemetry::sdk::propagation::TraceContextPropagator;
 use opentelemetry::sdk::trace::{self};
 use opentelemetry::sdk::Resource;
@@ -18,8 +21,10 @@ use opentelemetry_otlp::WithExportConfig;
 use serde::de::DeserializeOwned;
 use shared::config::Settings;
 use std::str::FromStr;
+use std::time::Duration;
 use std::{future::Future, pin::Pin};
 use tokio::sync::oneshot;
+use tracing::log::error;
 use tracing::{info, log::trace};
 use tracing_subscriber::filter::Directive;
 use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@@ -35,54 +40,111 @@ pub trait Component: Send + Sync + 'static + Sized {
         stop: oneshot::Receiver<()>,
     ) -> AnyhowResultFuture<()>;
     fn new() -> Self;
+}
 
-    fn _internal_start(self) -> AnyhowResultFuture<()> {
-        Box::pin(async move {
-            global::set_text_map_propagator(TraceContextPropagator::new());
+/// # Panics
+/// Panics in case of an invalid `RUST_LOG` variable.
+pub fn start_component<Y, C>(component: Y) -> AnyhowResultFuture<()>
+where
+    Y: Component<Config = C>,
+    C: Default + Clone + DeserializeOwned + Send,
+{
+    Box::pin(async move {
+        let settings = Settings::<Y::Config>::new(Y::SERVICE_NAME)?;
+
+        if let Some(meter_config) = settings
+            .opentelemetry
+            .as_ref()
+            .and_then(|f| f.metrics.clone())
+        {
+            let meter = opentelemetry_otlp::new_pipeline()
+                .metrics(
+                    selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
+                    stateless_temporality_selector(),
+                    opentelemetry::runtime::Tokio,
+                )
+                .with_exporter(
+                    opentelemetry_otlp::new_exporter()
+                        .tonic()
+                        .with_export_config(meter_config.into()),
+                )
+                .with_period(Duration::from_secs(3))
+                .with_timeout(Duration::from_secs(10))
+                .build()?;
+            // Using the opentelemetry_otlp meter
+            global::set_meter_provider(meter);
+        }
+        // Use the text propagator
+        global::set_text_map_propagator(TraceContextPropagator::new());
+        // Print debug errors
+        global::set_error_handler(|error| {
+            error!("OpenTelemetry error: {}", error);
+        })?;
+
+        if let Some(tracer_config) = settings
+            .opentelemetry
+            .as_ref()
+            .and_then(|f| f.traces.clone())
+        {
             let tracer = opentelemetry_otlp::new_pipeline()
                 .tracing()
                 .with_trace_config(trace::config().with_resource(Resource::new(vec![
-                    KeyValue::new("service.name", Self::SERVICE_NAME),
+                    KeyValue::new("service.name", Y::SERVICE_NAME),
                 ])))
-                .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env())
+                .with_exporter(
+                    opentelemetry_otlp::new_exporter()
+                        .tonic()
+                        .with_export_config(tracer_config.into()),
+                )
                 .install_batch(opentelemetry::runtime::Tokio)?;
+            let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
 
-            let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
-
+            tracing_subscriber::registry()
+                .with(otel_layer)
+                .with(
+                    // Use the info level as default
+                    EnvFilter::builder()
+                        .with_default_directive(Directive::from_str("info").unwrap())
+                        .from_env()?,
+                )
+                .init();
+        } else {
+            // Setup tracing
             tracing_subscriber::registry()
                 .with(fmt::layer())
-                .with(telemetry)
                 .with(
+                    // Use the info level as default
                     EnvFilter::builder()
                         .with_default_directive(Directive::from_str("info").unwrap())
                         .from_env()?,
                 )
                 .init();
+        }
 
-            info!("Starting nova");
-            let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME);
-            let (stop, stop_channel) = oneshot::channel();
-
-            tokio::spawn(async move {
-                trace!("started signal watching");
-                #[cfg(unix)]
-                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
-                    .unwrap()
-                    .recv()
-                    .await;
-                #[cfg(not(unix))]
-                return tokio::signal::ctrl_c().await.unwrap();
-
-                stop.send(()).unwrap();
-            });
-
-            trace!(
-                "Starting component {component}",
-                component = Self::SERVICE_NAME
-            );
-            self.start(settings?, stop_channel).await
-        })
-    }
+        // Finally starting nova
+        info!("Starting nova component {}", Y::SERVICE_NAME);
+        let (stop, stop_channel) = oneshot::channel();
+
+        tokio::spawn(async move {
+            trace!("started signal watching");
+            #[cfg(unix)]
+            tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
+                .unwrap()
+                .recv()
+                .await;
+            #[cfg(not(unix))]
+            return tokio::signal::ctrl_c().await.unwrap();
+
+            stop.send(()).unwrap();
+            shutdown_tracer_provider();
+        });
+
+        trace!(
+            "Starting component {component}",
+            component = Y::SERVICE_NAME
+        );
+        component.start(settings, stop_channel).await
+    })
 }
 
 #[macro_export]
@@ -90,9 +152,9 @@ macro_rules! ignite {
     ($c:ty) => {
         #[allow(dead_code)]
         fn main() -> anyhow::Result<()> {
-            use leash::Component;
+            use $crate::Component;
             let rt = tokio::runtime::Runtime::new()?;
-            rt.block_on(<$c as Component>::new()._internal_start())?;
+            rt.block_on($crate::start_component(<$c as Component>::new()))?;
             Ok(())
         }
     };
@@ -128,7 +190,5 @@ mod test {
         }
     }
 
-    
-
     ignite!(TestComponent);
 }
index 419ad0cb2519463e550ddc74e0e5a2c740b9c249..9b29bf2d357f3afe86b7296249ee535319c0d172 100644 (file)
@@ -10,7 +10,7 @@ serde_repr = "0.1"
 
 config = { version = "0.13", default-features = false, features = ["json", "yaml-rust", "ini"] }
 
-async-nats = "0.25.1"
+async-nats = "0.26.0"
 redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
 
 tokio = { version = "1", features = ["signal", "rt"] }
@@ -21,4 +21,6 @@ anyhow = "1.0.68"
 
 serde_test = "1.0.152"
 
-tracing = "0.1.37"
\ No newline at end of file
+tracing = "0.1.37"
+opentelemetry-otlp = "0.11.0"
+opentelemetry = "0.18.0"
\ No newline at end of file
index cdf0bd36db4b8fa4f4cb70617e616ddb3b89688c..4967e3cc4f0ea339b042de5802ac7bdeafd00b7e 100644 (file)
@@ -10,6 +10,7 @@ pub struct Settings<T: Clone + DeserializeOwned> {
     pub config: T,
     pub nats: crate::nats::Configuration,
     pub redis: crate::redis::Configuration,
+    pub opentelemetry: Option<crate::opentelemetry::Configuration>,
 }
 
 impl<T: Clone + DeserializeOwned + Default> Settings<T> {
index 92e7a0591cdb0e93e3879dcf3a292d1bb48ed3a1..33a6a136a75b9ff9bb49e173394101ca341552d6 100644 (file)
@@ -13,3 +13,4 @@ pub mod config;
 pub mod nats;
 pub mod payloads;
 pub mod redis;
+pub mod opentelemetry;
\ No newline at end of file
diff --git a/libs/shared/src/opentelemetry.rs b/libs/shared/src/opentelemetry.rs
new file mode 100644 (file)
index 0000000..ca6542d
--- /dev/null
@@ -0,0 +1,91 @@
+use std::ops::{Deref, DerefMut};
+
+use opentelemetry_otlp::{ExportConfig, Protocol};
+use serde::{de::Visitor, Deserialize};
+
+#[derive(Debug, Default)]
+#[repr(transparent)]
+pub struct ExportConfigDeserialize(ExportConfig);
+impl Clone for ExportConfigDeserialize {
+    fn clone(&self) -> Self {
+        Self(ExportConfig {
+            endpoint: self.0.endpoint.clone(),
+            protocol: self.0.protocol,
+            timeout: self.0.timeout,
+        })
+    }
+}
+
+impl From<ExportConfigDeserialize> for ExportConfig {
+    fn from(val: ExportConfigDeserialize) -> Self {
+        val.0
+    }
+}
+
+impl Deref for ExportConfigDeserialize {
+    type Target = ExportConfig;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl DerefMut for ExportConfigDeserialize {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.0
+    }
+}
+
+impl<'de> Deserialize<'de> for ExportConfigDeserialize {
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        #[derive(Deserialize, Debug)]
+        #[serde(field_identifier, rename_all = "lowercase")]
+        enum Fields {
+            Endpoint,
+            Timeout,
+        }
+
+        struct OpenTelemetryExportConfigVisitor;
+        impl<'de> Visitor<'de> for OpenTelemetryExportConfigVisitor {
+            type Value = ExportConfigDeserialize;
+            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+                formatter.write_str("struct OpenTelemetryExportConfig")
+            }
+
+            fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error>
+            where
+                A: serde::de::MapAccess<'de>,
+            {
+                let mut export_config = ExportConfigDeserialize::default();
+                export_config.0.protocol = Protocol::Grpc;
+                while let Some(name) = map.next_key::<Fields>()? {
+                    match name {
+                        Fields::Endpoint => {
+                            export_config.0.endpoint = map.next_value()?;
+                        }
+                        Fields::Timeout => {
+                            export_config.0.timeout = map.next_value()?;
+                        }
+                    }
+                }
+
+                Ok(export_config)
+            }
+        }
+
+        deserializer.deserialize_struct(
+            "OpenTelemetryExportConfig",
+            &["endpoint", "protocol", "timeout"],
+            OpenTelemetryExportConfigVisitor,
+        )
+    }
+}
+
+#[derive(Debug, Clone, Deserialize)]
+pub struct Configuration {
+    pub traces: Option<ExportConfigDeserialize>,
+    pub metrics: Option<ExportConfigDeserialize>,
+}