diff options
| author | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-15 17:03:29 +0400 | 
|---|---|---|
| committer | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-15 17:03:29 +0400 | 
| commit | cbcaa3c01ec4d9ed95dc5af8232de1d10191bc44 (patch) | |
| tree | 02cd56bb6eba762024c33f2df3cbec4f7a1a27e1 /libs/leash/src/lib.rs | |
| parent | c06cf52e887cc560f1fd4fa72713b239fe0102a6 (diff) | |
add metrics and jemalloc
Diffstat (limited to 'libs/leash/src/lib.rs')
| -rw-r--r-- | libs/leash/src/lib.rs | 134 | 
1 files changed, 97 insertions, 37 deletions
diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs index 91bfaf5..f3c9472 100644 --- a/libs/leash/src/lib.rs +++ b/libs/leash/src/lib.rs @@ -6,10 +6,13 @@      clippy::complexity,      clippy::perf,      clippy::pedantic, -    clippy::nursery, +    clippy::nursery  )]  use anyhow::Result; +use opentelemetry::global::shutdown_tracer_provider; +use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector; +use opentelemetry::sdk::metrics::selectors;  use opentelemetry::sdk::propagation::TraceContextPropagator;  use opentelemetry::sdk::trace::{self};  use opentelemetry::sdk::Resource; @@ -18,8 +21,10 @@ use opentelemetry_otlp::WithExportConfig;  use serde::de::DeserializeOwned;  use shared::config::Settings;  use std::str::FromStr; +use std::time::Duration;  use std::{future::Future, pin::Pin};  use tokio::sync::oneshot; +use tracing::log::error;  use tracing::{info, log::trace};  use tracing_subscriber::filter::Directive;  use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -35,54 +40,111 @@ pub trait Component: Send + Sync + 'static + Sized {          stop: oneshot::Receiver<()>,      ) -> AnyhowResultFuture<()>;      fn new() -> Self; +} -    fn _internal_start(self) -> AnyhowResultFuture<()> { -        Box::pin(async move { -            global::set_text_map_propagator(TraceContextPropagator::new()); +/// # Panics +/// Panics in case of an invalid `RUST_LOG` variable. +pub fn start_component<Y, C>(component: Y) -> AnyhowResultFuture<()> +where +    Y: Component<Config = C>, +    C: Default + Clone + DeserializeOwned + Send, +{ +    Box::pin(async move { +        let settings = Settings::<Y::Config>::new(Y::SERVICE_NAME)?; + +        if let Some(meter_config) = settings +            .opentelemetry +            .as_ref() +            .and_then(|f| f.metrics.clone()) +        { +            let meter = opentelemetry_otlp::new_pipeline() +                .metrics( +                    selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), +                    stateless_temporality_selector(), +                    opentelemetry::runtime::Tokio, +                ) +                .with_exporter( +                    opentelemetry_otlp::new_exporter() +                        .tonic() +                        .with_export_config(meter_config.into()), +                ) +                .with_period(Duration::from_secs(3)) +                .with_timeout(Duration::from_secs(10)) +                .build()?; +            // Using the opentelemetry_otlp meter +            global::set_meter_provider(meter); +        } +        // Use the text propagator +        global::set_text_map_propagator(TraceContextPropagator::new()); +        // Print debug errors +        global::set_error_handler(|error| { +            error!("OpenTelemetry error: {}", error); +        })?; + +        if let Some(tracer_config) = settings +            .opentelemetry +            .as_ref() +            .and_then(|f| f.traces.clone()) +        {              let tracer = opentelemetry_otlp::new_pipeline()                  .tracing()                  .with_trace_config(trace::config().with_resource(Resource::new(vec![ -                    KeyValue::new("service.name", Self::SERVICE_NAME), +                    KeyValue::new("service.name", Y::SERVICE_NAME),                  ]))) -                .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env()) +                .with_exporter( +                    opentelemetry_otlp::new_exporter() +                        .tonic() +                        .with_export_config(tracer_config.into()), +                )                  .install_batch(opentelemetry::runtime::Tokio)?; +            let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); -            let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - +            tracing_subscriber::registry() +                .with(otel_layer) +                .with( +                    // Use the info level as default +                    EnvFilter::builder() +                        .with_default_directive(Directive::from_str("info").unwrap()) +                        .from_env()?, +                ) +                .init(); +        } else { +            // Setup tracing              tracing_subscriber::registry()                  .with(fmt::layer()) -                .with(telemetry)                  .with( +                    // Use the info level as default                      EnvFilter::builder()                          .with_default_directive(Directive::from_str("info").unwrap())                          .from_env()?,                  )                  .init(); +        } -            info!("Starting nova"); -            let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME); -            let (stop, stop_channel) = oneshot::channel(); - -            tokio::spawn(async move { -                trace!("started signal watching"); -                #[cfg(unix)] -                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) -                    .unwrap() -                    .recv() -                    .await; -                #[cfg(not(unix))] -                return tokio::signal::ctrl_c().await.unwrap(); - -                stop.send(()).unwrap(); -            }); - -            trace!( -                "Starting component {component}", -                component = Self::SERVICE_NAME -            ); -            self.start(settings?, stop_channel).await -        }) -    } +        // Finally starting nova +        info!("Starting nova component {}", Y::SERVICE_NAME); +        let (stop, stop_channel) = oneshot::channel(); + +        tokio::spawn(async move { +            trace!("started signal watching"); +            #[cfg(unix)] +            tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) +                .unwrap() +                .recv() +                .await; +            #[cfg(not(unix))] +            return tokio::signal::ctrl_c().await.unwrap(); + +            stop.send(()).unwrap(); +            shutdown_tracer_provider(); +        }); + +        trace!( +            "Starting component {component}", +            component = Y::SERVICE_NAME +        ); +        component.start(settings, stop_channel).await +    })  }  #[macro_export] @@ -90,9 +152,9 @@ macro_rules! ignite {      ($c:ty) => {          #[allow(dead_code)]          fn main() -> anyhow::Result<()> { -            use leash::Component; +            use $crate::Component;              let rt = tokio::runtime::Runtime::new()?; -            rt.block_on(<$c as Component>::new()._internal_start())?; +            rt.block_on($crate::start_component(<$c as Component>::new()))?;              Ok(())          }      }; @@ -128,7 +190,5 @@ mod test {          }      } -     -      ignite!(TestComponent);  }  | 
