From: MatthieuCoder Date: Sun, 15 Jan 2023 13:03:29 +0000 (+0400) Subject: add metrics and jemalloc X-Git-Tag: v0.1.1~24 X-Git-Url: https://git.puffer.fish/?a=commitdiff_plain;h=cbcaa3c01ec4d9ed95dc5af8232de1d10191bc44;p=matthieu%2Fnova.git add metrics and jemalloc --- diff --git a/Cargo.lock b/Cargo.lock index 4588509..d47e07f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,9 +86,9 @@ checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" [[package]] name = "async-nats" -version = "0.25.1" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32" +checksum = "0b0e90b3cd41350d89a242b981fb888f0eb8e3cb81c0fcb4563338f7e96a1084" dependencies = [ "base64", "base64-url", @@ -770,6 +770,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "futures" version = "0.3.25" @@ -873,6 +879,7 @@ dependencies = [ "serde", "serde_json", "shared", + "tikv-jemallocator", "tokio", "tokio-stream", "tracing", @@ -1930,7 +1937,7 @@ version = "0.1.0" dependencies = [ "anyhow", "criterion", - "env_logger 0.7.1", + "env_logger 0.10.0", "hyper", "leash", "opentelemetry", @@ -1940,6 +1947,7 @@ dependencies = [ "serde", "shared", "test-log", + "tikv-jemallocator", "tokio", "tokio-stream", "tokio-test", @@ -2059,6 +2067,7 @@ dependencies = [ "proto", "serde", "shared", + "tikv-jemallocator", "tokio", "tokio-stream", "tonic", @@ -2354,6 +2363,8 @@ dependencies = [ "anyhow", "async-nats", "config", + "opentelemetry", + "opentelemetry-otlp", "redis", "serde", "serde_json", @@ -2563,6 +2574,27 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tikv-jemalloc-sys" +version = "0.5.2+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec45c14da997d0925c7835883e4d5c181f196fa142f8c19d7643d1e9af2592c3" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20612db8a13a6c06d57ec83953694185a367e16945f66565e8028d2c0bd76979" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.17" @@ -3264,6 +3296,7 @@ dependencies = [ "serde", "serde_json", "shared", + "tikv-jemallocator", "tokio", "tracing", "twilight-model", diff --git a/Dockerfile b/Dockerfile index 4b82766..4e54bbf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ FROM rust AS chef USER root RUN cargo install cargo-chef -RUN apt-get update && apt-get install -y protobuf-compiler +RUN apt-get update && apt-get install -y protobuf-compiler WORKDIR /app # Planning install diff --git a/docker-compose.yaml b/docker-compose.yaml index 7126433..ac1fb12 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,7 +1,7 @@ version: "3.3" services: nats: - image: bitnami/nats + image: nats restart: always ports: - 4222:4222 @@ -11,7 +11,8 @@ services: image: redis ports: - 6379:6379 - + mock: + image: nginx cache: image: ghcr.io/discordnova/nova/cache restart: always @@ -23,7 +24,6 @@ services: - ./config/default.yml:/config/default.yml environment: - RUST_LOG=debug - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317 depends_on: - nats - redis @@ -40,7 +40,6 @@ services: - ./config/default.yml:/config/default.yml environment: - RUST_LOG=debug - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317 depends_on: - nats - otelcol @@ -56,7 +55,6 @@ services: - ./config/default.yml:/config/default.yml environment: - RUST_LOG=debug - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317 depends_on: - ratelimit - otelcol @@ -76,7 +74,6 @@ services: - ./config/default.yml:/config/default.yml environment: - RUST_LOG=debug - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317 depends_on: - nats - otelcol @@ -94,7 +91,6 @@ services: - ./config/default.yml:/config/default.yml environment: - RUST_LOG=debug - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317 depends_on: - nats - redis @@ -108,8 +104,6 @@ services: image: jaegertracing/all-in-one container_name: jaeger command: - - "--memory.max-debugs" - - "10000" - "--query.base-path" - "/jaeger/ui" - "--prometheus.server-url" diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml index 70cb6a3..4dd7c97 100644 --- a/exes/cache/Cargo.toml +++ b/exes/cache/Cargo.toml @@ -19,7 +19,7 @@ tokio-stream = "0.1.11" serde = { version = "1.0.8", features = ["derive"] } serde_json = { version = "1.0" } -async-nats = "0.25.1" +async-nats = "0.26.0" twilight-model = "0.14" anyhow = "1.0.68" diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml index a174710..98a1ae8 100644 --- a/exes/gateway/Cargo.toml +++ b/exes/gateway/Cargo.toml @@ -23,8 +23,11 @@ serde_json = { version = "1.0" } tracing = "0.1.37" tracing-futures = "0.2.5" -async-nats = "0.25.1" +async-nats = "0.26.0" tracing-opentelemetry = "0.18.0" opentelemetry = "0.18.0" -opentelemetry-http = "0.7.0" \ No newline at end of file +opentelemetry-http = "0.7.0" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5" \ No newline at end of file diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs index c4da8f4..ddc4bbd 100644 --- a/exes/gateway/src/lib.rs +++ b/exes/gateway/src/lib.rs @@ -24,7 +24,7 @@ use tokio_stream::StreamExt; use tracing_opentelemetry::OpenTelemetrySpanExt; use twilight_gateway::{Event, Shard}; pub mod config; -use tracing::{debug, error, info, trace_span}; +use tracing::{debug, error, info, info_span, instrument, Instrument}; use twilight_model::gateway::event::DispatchEvent; struct MetadataMap<'a>(&'a mut HeaderMap); @@ -84,13 +84,15 @@ impl Component for GatewayServer { } } +#[instrument] async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> { if let Event::Ready(ready) = event { - info!("Logged in as {}", ready.user.name); + info!(username = ready.user.name, "logged in"); } else { let name = event.kind().name(); if let Ok(dispatch_event) = DispatchEvent::try_from(event) { - debug!("handling event {}", name.unwrap()); + let name = name.unwrap(); + debug!(event_name = name, "handling dispatch event"); let data = CachePayload { data: DispatchEventTagged(dispatch_event), @@ -98,7 +100,7 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> { let value = serde_json::to_string(&data)?; let bytes = bytes::Bytes::from(value); - let span = trace_span!("nats send"); + let span = info_span!("nats send"); let mut header_map = HeaderMap::new(); let context = span.context(); @@ -106,12 +108,9 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> { propagator.inject_context(&context, &mut MetadataMap(&mut header_map)); }); - nats.publish_with_headers( - format!("nova.cache.dispatch.{}", name.unwrap()), - header_map, - bytes, - ) - .await?; + nats.publish_with_headers(format!("nova.cache.dispatch.{name}"), header_map, bytes) + .instrument(info_span!("sending to nats")) + .await?; } } diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index f1b0298..6cac7e1 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -1,4 +1,11 @@ use gateway::GatewayServer; use leash::ignite; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + ignite!(GatewayServer); diff --git a/exes/ratelimit/Cargo.toml b/exes/ratelimit/Cargo.toml index 3989735..b011d38 100644 --- a/exes/ratelimit/Cargo.toml +++ b/exes/ratelimit/Cargo.toml @@ -39,3 +39,6 @@ test-log = { version = "0.2.11", features = ["log", "trace"] } [[bench]] name = "bucket" harness = false + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5" \ No newline at end of file diff --git a/exes/ratelimit/src/main.rs b/exes/ratelimit/src/main.rs index 2de812b..d16e7d3 100644 --- a/exes/ratelimit/src/main.rs +++ b/exes/ratelimit/src/main.rs @@ -1,4 +1,11 @@ use leash::ignite; use ratelimit::RatelimiterServerComponent; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + ignite!(RatelimiterServerComponent); diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml index 5583416..3d362b2 100644 --- a/exes/rest/Cargo.toml +++ b/exes/rest/Cargo.toml @@ -29,4 +29,7 @@ tokio-stream = "0.1.11" dns-lookup = "1.0.8" opentelemetry = "0.18.0" opentelemetry-http = "0.7.0" -tracing-opentelemetry = "0.18.0" \ No newline at end of file +tracing-opentelemetry = "0.18.0" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5" \ No newline at end of file diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 9593ac8..900d3fb 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -28,4 +28,11 @@ pub struct ReverseProxy { pub discord: Discord, pub ratelimiter_address: String, pub ratelimiter_port: u16, + #[serde(default = "default_upstream")] + pub upstream: Option, +} + +#[allow(clippy::unnecessary_wraps)] +fn default_upstream() -> Option { + Some("https://discord.com".to_string()) } diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index f053af7..3d47935 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -1,23 +1,78 @@ -use anyhow::bail; +use anyhow::{bail, Context}; +use futures_util::FutureExt; use http::{ header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE}, HeaderValue, Method as HttpMethod, Request, Response, Uri, }; use hyper::{client::HttpConnector, Body, Client}; use hyper_rustls::HttpsConnector; +use opentelemetry::{ + global, + metrics::{Counter, Histogram}, + Context as OpenTelemetryContext, KeyValue, +}; use std::{ collections::hash_map::DefaultHasher, convert::TryFrom, hash::{Hash, Hasher}, str::FromStr, sync::Arc, + time::SystemTime, }; -use tracing::{debug_span, error, info_span, instrument, Instrument}; +use tracing::{debug_span, error, info_span, log::trace, Instrument}; use twilight_http_ratelimiting::{Method, Path}; -use crate::ratelimit_client::RemoteRatelimiter; +use crate::{config::ReverseProxy, ratelimit_client::RemoteRatelimiter}; +use lazy_static::lazy_static; + +lazy_static! { + static ref METER_NAME: &'static str = ""; + static ref REQUESTS: Counter = { + global::meter(&METER_NAME) + .u64_counter("rest.http_requests_total") + .with_description("Amount of requests processed by the rest reverse proxy") + .init() + }; + static ref UPSTREAM_CALLS: Counter = { + global::meter(&METER_NAME) + .u64_counter("rest.upstream_http_requests_total") + .with_description("Amount of requests sent to discord") + .init() + }; + static ref TICKET_CALLS: Counter = { + global::meter(&METER_NAME) + .u64_counter("rest.ticket_http_requests_total") + .with_description("Amount of requests sent to the ratelimiter") + .init() + }; + static ref HEADERS_SUBMIT_CALLS: Counter = { + global::meter(&METER_NAME) + .u64_counter("rest.header_submit_http_requests_total") + .with_description("Amount of requests sent to the ratelimiter") + .init() + }; + static ref UPSTREAM_TIMES: Histogram = { + global::meter(&METER_NAME) + .f64_histogram("rest.upstream_http_request_duration_seconds") + .with_description("Time took to request discord") + .init() + }; + static ref TICKET_TIMES: Histogram = { + global::meter(&METER_NAME) + .f64_histogram("rest.ticket_http_request_duration_seconds") + .with_description("Time took to get a ticket from the ratelimiter") + .init() + }; + static ref HEADERS_SUBMIT_TIMES: Histogram = { + global::meter(&METER_NAME) + .f64_histogram("rest.header_submit_http_request_duration_seconds") + .with_description("Time took to get a ticket from the ratelimiter") + .init() + }; +} /// Normalizes the path +#[inline] fn normalize_path(request_path: &str) -> (&str, &str) { if let Some(trimmed_path) = request_path.strip_prefix("/api") { if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) { @@ -35,14 +90,18 @@ fn normalize_path(request_path: &str) -> (&str, &str) { } } -#[instrument] +#[inline] +#[allow(clippy::too_many_lines)] pub async fn handle_request( client: Client, Body>, ratelimiter: Arc, + config: ReverseProxy, token: String, mut request: Request, ) -> Result, anyhow::Error> { - let (hash, uri_string) = { + let cx = OpenTelemetryContext::current(); + + let (bucket, uri_string) = { let method = match *request.method() { HttpMethod::DELETE => Method::Delete, HttpMethod::GET => Method::Get, @@ -50,20 +109,25 @@ pub async fn handle_request( HttpMethod::POST => Method::Post, HttpMethod::PUT => Method::Put, _ => { - error!("Unsupported HTTP method in request, {}", request.method()); + error!(method =? request.method(), "unsupported HTTP method in request"); bail!("unsupported method"); } }; - let request_path = request.uri().path(); let (api_path, trimmed_path) = normalize_path(request_path); + trace!("normalized path to {trimmed_path}"); - let mut uri_string = format!("https://discord.com{api_path}{trimmed_path}"); + let mut uri_string = format!( + "{}{api_path}{trimmed_path}", + config.upstream.expect("no upstream") + ); if let Some(query) = request.uri().query() { uri_string.push('?'); uri_string.push_str(query); } + trace!("full request uri is {uri_string}"); + let mut hash = DefaultHasher::new(); match Path::try_from((method, trimmed_path)) { Ok(path) => path, @@ -76,14 +140,36 @@ pub async fn handle_request( } } .hash(&mut hash); + let bucket = hash.finish().to_string(); + trace!("Request bucket is {}", bucket); - (hash.finish().to_string(), uri_string) + (bucket, uri_string) }; + + REQUESTS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]); + + let ticket_start = SystemTime::now(); + TICKET_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]); // waits for the request to be authorized - ratelimiter - .ticket(hash.clone()) + match ratelimiter + .ticket(bucket.clone()) .instrument(debug_span!("ticket validation request")) - .await?; + .then(|v| async { + TICKET_TIMES.record( + &cx, + ticket_start.elapsed()?.as_secs_f64(), + &[KeyValue::new("bucket", bucket.clone())], + ); + v + }) + .await + { + Ok(_) => {} + Err(e) => { + error!("Error when requesting the ratelimiter: {:?}", e); + bail!("failed to request the ratelimiter"); + } + } request .headers_mut() @@ -121,7 +207,21 @@ pub async fn handle_request( }; *request.uri_mut() = uri; let span = debug_span!("upstream request to discord"); - let resp = match client.request(request).instrument(span).await { + let upstream_start = SystemTime::now(); + UPSTREAM_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]); + let resp = match client + .request(request) + .instrument(span) + .then(|v| async { + UPSTREAM_TIMES.record( + &cx, + upstream_start.elapsed()?.as_secs_f64(), + &[KeyValue::new("bucket", bucket.clone())], + ); + v.context("") + }) + .await + { Ok(response) => response, Err(e) => { error!("Error when requesting the Discord API: {:?}", e); @@ -132,14 +232,28 @@ pub async fn handle_request( let headers = resp .headers() .into_iter() - .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string))) + .map(|(k, v)| { + ( + k.to_string(), + v.to_str().map(std::string::ToString::to_string), + ) + }) .filter(|f| f.1.is_ok()) .map(|f| (f.0, f.1.expect("errors should be filtered"))) .collect(); + HEADERS_SUBMIT_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]); let _submit_headers = ratelimiter - .submit_headers(hash, headers) + .submit_headers(bucket.clone(), headers) .instrument(info_span!("submitting headers")) + .then(|v| async { + HEADERS_SUBMIT_TIMES.record( + &cx, + upstream_start.elapsed()?.as_secs_f64(), + &[KeyValue::new("bucket", bucket.clone())], + ); + v + }) .await; Ok(resp) diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs index 174bea0..917f524 100644 --- a/exes/rest/src/lib.rs +++ b/exes/rest/src/lib.rs @@ -53,11 +53,12 @@ impl Component for ReverseProxyServer { let client: Client<_, hyper::Body> = Client::builder().build(https); let token = settings.config.discord.token.clone(); - + let config = settings.config.clone(); let service_fn = make_service_fn(move |_: &AddrStream| { let client = client.clone(); let ratelimiter = ratelimiter.clone(); let token = token.clone(); + let config = config.clone(); async move { Ok::<_, Infallible>(service_fn(move |request: Request| { let token = token.clone(); @@ -71,10 +72,11 @@ impl Component for ReverseProxyServer { let client = client.clone(); let ratelimiter = ratelimiter.clone(); + let config = config.clone(); async move { let token = token.clone(); let ratelimiter = ratelimiter.clone(); - handle_request(client, ratelimiter, token, request).await + handle_request(client, ratelimiter, config, token, request).await } })) } diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs index fe8ada7..643bc89 100644 --- a/exes/rest/src/main.rs +++ b/exes/rest/src/main.rs @@ -1,4 +1,11 @@ use leash::ignite; use rest::ReverseProxyServer; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + ignite!(ReverseProxyServer); diff --git a/exes/webhook/Cargo.toml b/exes/webhook/Cargo.toml index 0c50009..af256c8 100644 --- a/exes/webhook/Cargo.toml +++ b/exes/webhook/Cargo.toml @@ -19,4 +19,7 @@ ed25519-dalek = "1" twilight-model = { version = "0.14" } anyhow = "1.0.68" -async-nats = "0.25.1" +async-nats = "0.26.0" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5" \ No newline at end of file diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs index f531725..9432dbd 100644 --- a/exes/webhook/src/main.rs +++ b/exes/webhook/src/main.rs @@ -1,4 +1,11 @@ use leash::ignite; use webhook::WebhookServer; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + ignite!(WebhookServer); diff --git a/libs/leash/Cargo.toml b/libs/leash/Cargo.toml index ca110d0..b96f384 100644 --- a/libs/leash/Cargo.toml +++ b/libs/leash/Cargo.toml @@ -16,4 +16,4 @@ tracing = "0.1.37" env_logger = "0.10.0" tracing-opentelemetry = "0.18.0" opentelemetry = { version ="0.18.0", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.11.0" } +opentelemetry-otlp = { version = "0.11.0", features = ["metrics"] } diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs index 91bfaf5..f3c9472 100644 --- a/libs/leash/src/lib.rs +++ b/libs/leash/src/lib.rs @@ -6,10 +6,13 @@ clippy::complexity, clippy::perf, clippy::pedantic, - clippy::nursery, + clippy::nursery )] use anyhow::Result; +use opentelemetry::global::shutdown_tracer_provider; +use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector; +use opentelemetry::sdk::metrics::selectors; use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::sdk::trace::{self}; use opentelemetry::sdk::Resource; @@ -18,8 +21,10 @@ use opentelemetry_otlp::WithExportConfig; use serde::de::DeserializeOwned; use shared::config::Settings; use std::str::FromStr; +use std::time::Duration; use std::{future::Future, pin::Pin}; use tokio::sync::oneshot; +use tracing::log::error; use tracing::{info, log::trace}; use tracing_subscriber::filter::Directive; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -35,54 +40,111 @@ pub trait Component: Send + Sync + 'static + Sized { stop: oneshot::Receiver<()>, ) -> AnyhowResultFuture<()>; fn new() -> Self; +} - fn _internal_start(self) -> AnyhowResultFuture<()> { - Box::pin(async move { - global::set_text_map_propagator(TraceContextPropagator::new()); +/// # Panics +/// Panics in case of an invalid `RUST_LOG` variable. +pub fn start_component(component: Y) -> AnyhowResultFuture<()> +where + Y: Component, + C: Default + Clone + DeserializeOwned + Send, +{ + Box::pin(async move { + let settings = Settings::::new(Y::SERVICE_NAME)?; + + if let Some(meter_config) = settings + .opentelemetry + .as_ref() + .and_then(|f| f.metrics.clone()) + { + let meter = opentelemetry_otlp::new_pipeline() + .metrics( + selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), + stateless_temporality_selector(), + opentelemetry::runtime::Tokio, + ) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_export_config(meter_config.into()), + ) + .with_period(Duration::from_secs(3)) + .with_timeout(Duration::from_secs(10)) + .build()?; + // Using the opentelemetry_otlp meter + global::set_meter_provider(meter); + } + // Use the text propagator + global::set_text_map_propagator(TraceContextPropagator::new()); + // Print debug errors + global::set_error_handler(|error| { + error!("OpenTelemetry error: {}", error); + })?; + + if let Some(tracer_config) = settings + .opentelemetry + .as_ref() + .and_then(|f| f.traces.clone()) + { let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_trace_config(trace::config().with_resource(Resource::new(vec![ - KeyValue::new("service.name", Self::SERVICE_NAME), + KeyValue::new("service.name", Y::SERVICE_NAME), ]))) - .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env()) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_export_config(tracer_config.into()), + ) .install_batch(opentelemetry::runtime::Tokio)?; + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - + tracing_subscriber::registry() + .with(otel_layer) + .with( + // Use the info level as default + EnvFilter::builder() + .with_default_directive(Directive::from_str("info").unwrap()) + .from_env()?, + ) + .init(); + } else { + // Setup tracing tracing_subscriber::registry() .with(fmt::layer()) - .with(telemetry) .with( + // Use the info level as default EnvFilter::builder() .with_default_directive(Directive::from_str("info").unwrap()) .from_env()?, ) .init(); + } - info!("Starting nova"); - let settings = Settings::::new(Self::SERVICE_NAME); - let (stop, stop_channel) = oneshot::channel(); - - tokio::spawn(async move { - trace!("started signal watching"); - #[cfg(unix)] - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .unwrap() - .recv() - .await; - #[cfg(not(unix))] - return tokio::signal::ctrl_c().await.unwrap(); - - stop.send(()).unwrap(); - }); - - trace!( - "Starting component {component}", - component = Self::SERVICE_NAME - ); - self.start(settings?, stop_channel).await - }) - } + // Finally starting nova + info!("Starting nova component {}", Y::SERVICE_NAME); + let (stop, stop_channel) = oneshot::channel(); + + tokio::spawn(async move { + trace!("started signal watching"); + #[cfg(unix)] + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .unwrap() + .recv() + .await; + #[cfg(not(unix))] + return tokio::signal::ctrl_c().await.unwrap(); + + stop.send(()).unwrap(); + shutdown_tracer_provider(); + }); + + trace!( + "Starting component {component}", + component = Y::SERVICE_NAME + ); + component.start(settings, stop_channel).await + }) } #[macro_export] @@ -90,9 +152,9 @@ macro_rules! ignite { ($c:ty) => { #[allow(dead_code)] fn main() -> anyhow::Result<()> { - use leash::Component; + use $crate::Component; let rt = tokio::runtime::Runtime::new()?; - rt.block_on(<$c as Component>::new()._internal_start())?; + rt.block_on($crate::start_component(<$c as Component>::new()))?; Ok(()) } }; @@ -128,7 +190,5 @@ mod test { } } - - ignite!(TestComponent); } diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml index 419ad0c..9b29bf2 100644 --- a/libs/shared/Cargo.toml +++ b/libs/shared/Cargo.toml @@ -10,7 +10,7 @@ serde_repr = "0.1" config = { version = "0.13", default-features = false, features = ["json", "yaml-rust", "ini"] } -async-nats = "0.25.1" +async-nats = "0.26.0" redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] } tokio = { version = "1", features = ["signal", "rt"] } @@ -21,4 +21,6 @@ anyhow = "1.0.68" serde_test = "1.0.152" -tracing = "0.1.37" \ No newline at end of file +tracing = "0.1.37" +opentelemetry-otlp = "0.11.0" +opentelemetry = "0.18.0" \ No newline at end of file diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs index cdf0bd3..4967e3c 100644 --- a/libs/shared/src/config.rs +++ b/libs/shared/src/config.rs @@ -10,6 +10,7 @@ pub struct Settings { pub config: T, pub nats: crate::nats::Configuration, pub redis: crate::redis::Configuration, + pub opentelemetry: Option, } impl Settings { diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index 92e7a05..33a6a13 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -13,3 +13,4 @@ pub mod config; pub mod nats; pub mod payloads; pub mod redis; +pub mod opentelemetry; \ No newline at end of file diff --git a/libs/shared/src/opentelemetry.rs b/libs/shared/src/opentelemetry.rs new file mode 100644 index 0000000..ca6542d --- /dev/null +++ b/libs/shared/src/opentelemetry.rs @@ -0,0 +1,91 @@ +use std::ops::{Deref, DerefMut}; + +use opentelemetry_otlp::{ExportConfig, Protocol}; +use serde::{de::Visitor, Deserialize}; + +#[derive(Debug, Default)] +#[repr(transparent)] +pub struct ExportConfigDeserialize(ExportConfig); +impl Clone for ExportConfigDeserialize { + fn clone(&self) -> Self { + Self(ExportConfig { + endpoint: self.0.endpoint.clone(), + protocol: self.0.protocol, + timeout: self.0.timeout, + }) + } +} + +impl From for ExportConfig { + fn from(val: ExportConfigDeserialize) -> Self { + val.0 + } +} + +impl Deref for ExportConfigDeserialize { + type Target = ExportConfig; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for ExportConfigDeserialize { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl<'de> Deserialize<'de> for ExportConfigDeserialize { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize, Debug)] + #[serde(field_identifier, rename_all = "lowercase")] + enum Fields { + Endpoint, + Timeout, + } + + struct OpenTelemetryExportConfigVisitor; + impl<'de> Visitor<'de> for OpenTelemetryExportConfigVisitor { + type Value = ExportConfigDeserialize; + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("struct OpenTelemetryExportConfig") + } + + fn visit_map(self, mut map: A) -> std::result::Result + where + A: serde::de::MapAccess<'de>, + { + let mut export_config = ExportConfigDeserialize::default(); + export_config.0.protocol = Protocol::Grpc; + while let Some(name) = map.next_key::()? { + match name { + Fields::Endpoint => { + export_config.0.endpoint = map.next_value()?; + } + Fields::Timeout => { + export_config.0.timeout = map.next_value()?; + } + } + } + + Ok(export_config) + } + } + + deserializer.deserialize_struct( + "OpenTelemetryExportConfig", + &["endpoint", "protocol", "timeout"], + OpenTelemetryExportConfigVisitor, + ) + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Configuration { + pub traces: Option, + pub metrics: Option, +}