diff options
| author | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-15 17:03:29 +0400 | 
|---|---|---|
| committer | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-15 17:03:29 +0400 | 
| commit | cbcaa3c01ec4d9ed95dc5af8232de1d10191bc44 (patch) | |
| tree | 02cd56bb6eba762024c33f2df3cbec4f7a1a27e1 /libs | |
| parent | c06cf52e887cc560f1fd4fa72713b239fe0102a6 (diff) | |
add metrics and jemalloc
Diffstat (limited to 'libs')
| -rw-r--r-- | libs/leash/Cargo.toml | 2 | ||||
| -rw-r--r-- | libs/leash/src/lib.rs | 134 | ||||
| -rw-r--r-- | libs/shared/Cargo.toml | 6 | ||||
| -rw-r--r-- | libs/shared/src/config.rs | 1 | ||||
| -rw-r--r-- | libs/shared/src/lib.rs | 1 | ||||
| -rw-r--r-- | libs/shared/src/opentelemetry.rs | 91 | 
6 files changed, 195 insertions, 40 deletions
diff --git a/libs/leash/Cargo.toml b/libs/leash/Cargo.toml index ca110d0..b96f384 100644 --- a/libs/leash/Cargo.toml +++ b/libs/leash/Cargo.toml @@ -16,4 +16,4 @@ tracing = "0.1.37"  env_logger = "0.10.0"  tracing-opentelemetry = "0.18.0"  opentelemetry = { version ="0.18.0", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.11.0" } +opentelemetry-otlp = { version = "0.11.0", features = ["metrics"] } diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs index 91bfaf5..f3c9472 100644 --- a/libs/leash/src/lib.rs +++ b/libs/leash/src/lib.rs @@ -6,10 +6,13 @@      clippy::complexity,      clippy::perf,      clippy::pedantic, -    clippy::nursery, +    clippy::nursery  )]  use anyhow::Result; +use opentelemetry::global::shutdown_tracer_provider; +use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector; +use opentelemetry::sdk::metrics::selectors;  use opentelemetry::sdk::propagation::TraceContextPropagator;  use opentelemetry::sdk::trace::{self};  use opentelemetry::sdk::Resource; @@ -18,8 +21,10 @@ use opentelemetry_otlp::WithExportConfig;  use serde::de::DeserializeOwned;  use shared::config::Settings;  use std::str::FromStr; +use std::time::Duration;  use std::{future::Future, pin::Pin};  use tokio::sync::oneshot; +use tracing::log::error;  use tracing::{info, log::trace};  use tracing_subscriber::filter::Directive;  use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -35,54 +40,111 @@ pub trait Component: Send + Sync + 'static + Sized {          stop: oneshot::Receiver<()>,      ) -> AnyhowResultFuture<()>;      fn new() -> Self; +} -    fn _internal_start(self) -> AnyhowResultFuture<()> { -        Box::pin(async move { -            global::set_text_map_propagator(TraceContextPropagator::new()); +/// # Panics +/// Panics in case of an invalid `RUST_LOG` variable. +pub fn start_component<Y, C>(component: Y) -> AnyhowResultFuture<()> +where +    Y: Component<Config = C>, +    C: Default + Clone + DeserializeOwned + Send, +{ +    Box::pin(async move { +        let settings = Settings::<Y::Config>::new(Y::SERVICE_NAME)?; + +        if let Some(meter_config) = settings +            .opentelemetry +            .as_ref() +            .and_then(|f| f.metrics.clone()) +        { +            let meter = opentelemetry_otlp::new_pipeline() +                .metrics( +                    selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), +                    stateless_temporality_selector(), +                    opentelemetry::runtime::Tokio, +                ) +                .with_exporter( +                    opentelemetry_otlp::new_exporter() +                        .tonic() +                        .with_export_config(meter_config.into()), +                ) +                .with_period(Duration::from_secs(3)) +                .with_timeout(Duration::from_secs(10)) +                .build()?; +            // Using the opentelemetry_otlp meter +            global::set_meter_provider(meter); +        } +        // Use the text propagator +        global::set_text_map_propagator(TraceContextPropagator::new()); +        // Print debug errors +        global::set_error_handler(|error| { +            error!("OpenTelemetry error: {}", error); +        })?; + +        if let Some(tracer_config) = settings +            .opentelemetry +            .as_ref() +            .and_then(|f| f.traces.clone()) +        {              let tracer = opentelemetry_otlp::new_pipeline()                  .tracing()                  .with_trace_config(trace::config().with_resource(Resource::new(vec![ -                    KeyValue::new("service.name", Self::SERVICE_NAME), +                    KeyValue::new("service.name", Y::SERVICE_NAME),                  ]))) -                .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env()) +                .with_exporter( +                    opentelemetry_otlp::new_exporter() +                        .tonic() +                        .with_export_config(tracer_config.into()), +                )                  .install_batch(opentelemetry::runtime::Tokio)?; +            let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); -            let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - +            tracing_subscriber::registry() +                .with(otel_layer) +                .with( +                    // Use the info level as default +                    EnvFilter::builder() +                        .with_default_directive(Directive::from_str("info").unwrap()) +                        .from_env()?, +                ) +                .init(); +        } else { +            // Setup tracing              tracing_subscriber::registry()                  .with(fmt::layer()) -                .with(telemetry)                  .with( +                    // Use the info level as default                      EnvFilter::builder()                          .with_default_directive(Directive::from_str("info").unwrap())                          .from_env()?,                  )                  .init(); +        } -            info!("Starting nova"); -            let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME); -            let (stop, stop_channel) = oneshot::channel(); - -            tokio::spawn(async move { -                trace!("started signal watching"); -                #[cfg(unix)] -                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) -                    .unwrap() -                    .recv() -                    .await; -                #[cfg(not(unix))] -                return tokio::signal::ctrl_c().await.unwrap(); - -                stop.send(()).unwrap(); -            }); - -            trace!( -                "Starting component {component}", -                component = Self::SERVICE_NAME -            ); -            self.start(settings?, stop_channel).await -        }) -    } +        // Finally starting nova +        info!("Starting nova component {}", Y::SERVICE_NAME); +        let (stop, stop_channel) = oneshot::channel(); + +        tokio::spawn(async move { +            trace!("started signal watching"); +            #[cfg(unix)] +            tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) +                .unwrap() +                .recv() +                .await; +            #[cfg(not(unix))] +            return tokio::signal::ctrl_c().await.unwrap(); + +            stop.send(()).unwrap(); +            shutdown_tracer_provider(); +        }); + +        trace!( +            "Starting component {component}", +            component = Y::SERVICE_NAME +        ); +        component.start(settings, stop_channel).await +    })  }  #[macro_export] @@ -90,9 +152,9 @@ macro_rules! ignite {      ($c:ty) => {          #[allow(dead_code)]          fn main() -> anyhow::Result<()> { -            use leash::Component; +            use $crate::Component;              let rt = tokio::runtime::Runtime::new()?; -            rt.block_on(<$c as Component>::new()._internal_start())?; +            rt.block_on($crate::start_component(<$c as Component>::new()))?;              Ok(())          }      }; @@ -128,7 +190,5 @@ mod test {          }      } -     -      ignite!(TestComponent);  } diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml index 419ad0c..9b29bf2 100644 --- a/libs/shared/Cargo.toml +++ b/libs/shared/Cargo.toml @@ -10,7 +10,7 @@ serde_repr = "0.1"  config = { version = "0.13", default-features = false, features = ["json", "yaml-rust", "ini"] } -async-nats = "0.25.1" +async-nats = "0.26.0"  redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }  tokio = { version = "1", features = ["signal", "rt"] } @@ -21,4 +21,6 @@ anyhow = "1.0.68"  serde_test = "1.0.152" -tracing = "0.1.37"
\ No newline at end of file +tracing = "0.1.37" +opentelemetry-otlp = "0.11.0" +opentelemetry = "0.18.0"
\ No newline at end of file diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs index cdf0bd3..4967e3c 100644 --- a/libs/shared/src/config.rs +++ b/libs/shared/src/config.rs @@ -10,6 +10,7 @@ pub struct Settings<T: Clone + DeserializeOwned> {      pub config: T,      pub nats: crate::nats::Configuration,      pub redis: crate::redis::Configuration, +    pub opentelemetry: Option<crate::opentelemetry::Configuration>,  }  impl<T: Clone + DeserializeOwned + Default> Settings<T> { diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index 92e7a05..33a6a13 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -13,3 +13,4 @@ pub mod config;  pub mod nats;  pub mod payloads;  pub mod redis; +pub mod opentelemetry;
\ No newline at end of file diff --git a/libs/shared/src/opentelemetry.rs b/libs/shared/src/opentelemetry.rs new file mode 100644 index 0000000..ca6542d --- /dev/null +++ b/libs/shared/src/opentelemetry.rs @@ -0,0 +1,91 @@ +use std::ops::{Deref, DerefMut}; + +use opentelemetry_otlp::{ExportConfig, Protocol}; +use serde::{de::Visitor, Deserialize}; + +#[derive(Debug, Default)] +#[repr(transparent)] +pub struct ExportConfigDeserialize(ExportConfig); +impl Clone for ExportConfigDeserialize { +    fn clone(&self) -> Self { +        Self(ExportConfig { +            endpoint: self.0.endpoint.clone(), +            protocol: self.0.protocol, +            timeout: self.0.timeout, +        }) +    } +} + +impl From<ExportConfigDeserialize> for ExportConfig { +    fn from(val: ExportConfigDeserialize) -> Self { +        val.0 +    } +} + +impl Deref for ExportConfigDeserialize { +    type Target = ExportConfig; + +    fn deref(&self) -> &Self::Target { +        &self.0 +    } +} + +impl DerefMut for ExportConfigDeserialize { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.0 +    } +} + +impl<'de> Deserialize<'de> for ExportConfigDeserialize { +    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error> +    where +        D: serde::Deserializer<'de>, +    { +        #[derive(Deserialize, Debug)] +        #[serde(field_identifier, rename_all = "lowercase")] +        enum Fields { +            Endpoint, +            Timeout, +        } + +        struct OpenTelemetryExportConfigVisitor; +        impl<'de> Visitor<'de> for OpenTelemetryExportConfigVisitor { +            type Value = ExportConfigDeserialize; +            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { +                formatter.write_str("struct OpenTelemetryExportConfig") +            } + +            fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error> +            where +                A: serde::de::MapAccess<'de>, +            { +                let mut export_config = ExportConfigDeserialize::default(); +                export_config.0.protocol = Protocol::Grpc; +                while let Some(name) = map.next_key::<Fields>()? { +                    match name { +                        Fields::Endpoint => { +                            export_config.0.endpoint = map.next_value()?; +                        } +                        Fields::Timeout => { +                            export_config.0.timeout = map.next_value()?; +                        } +                    } +                } + +                Ok(export_config) +            } +        } + +        deserializer.deserialize_struct( +            "OpenTelemetryExportConfig", +            &["endpoint", "protocol", "timeout"], +            OpenTelemetryExportConfigVisitor, +        ) +    } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Configuration { +    pub traces: Option<ExportConfigDeserialize>, +    pub metrics: Option<ExportConfigDeserialize>, +}  | 
