diff options
| author | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-15 17:03:29 +0400 |
|---|---|---|
| committer | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-15 17:03:29 +0400 |
| commit | cbcaa3c01ec4d9ed95dc5af8232de1d10191bc44 (patch) | |
| tree | 02cd56bb6eba762024c33f2df3cbec4f7a1a27e1 /libs | |
| parent | c06cf52e887cc560f1fd4fa72713b239fe0102a6 (diff) | |
add metrics and jemalloc
Diffstat (limited to 'libs')
| -rw-r--r-- | libs/leash/Cargo.toml | 2 | ||||
| -rw-r--r-- | libs/leash/src/lib.rs | 134 | ||||
| -rw-r--r-- | libs/shared/Cargo.toml | 6 | ||||
| -rw-r--r-- | libs/shared/src/config.rs | 1 | ||||
| -rw-r--r-- | libs/shared/src/lib.rs | 1 | ||||
| -rw-r--r-- | libs/shared/src/opentelemetry.rs | 91 |
6 files changed, 195 insertions, 40 deletions
diff --git a/libs/leash/Cargo.toml b/libs/leash/Cargo.toml index ca110d0..b96f384 100644 --- a/libs/leash/Cargo.toml +++ b/libs/leash/Cargo.toml @@ -16,4 +16,4 @@ tracing = "0.1.37" env_logger = "0.10.0" tracing-opentelemetry = "0.18.0" opentelemetry = { version ="0.18.0", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.11.0" } +opentelemetry-otlp = { version = "0.11.0", features = ["metrics"] } diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs index 91bfaf5..f3c9472 100644 --- a/libs/leash/src/lib.rs +++ b/libs/leash/src/lib.rs @@ -6,10 +6,13 @@ clippy::complexity, clippy::perf, clippy::pedantic, - clippy::nursery, + clippy::nursery )] use anyhow::Result; +use opentelemetry::global::shutdown_tracer_provider; +use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector; +use opentelemetry::sdk::metrics::selectors; use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::sdk::trace::{self}; use opentelemetry::sdk::Resource; @@ -18,8 +21,10 @@ use opentelemetry_otlp::WithExportConfig; use serde::de::DeserializeOwned; use shared::config::Settings; use std::str::FromStr; +use std::time::Duration; use std::{future::Future, pin::Pin}; use tokio::sync::oneshot; +use tracing::log::error; use tracing::{info, log::trace}; use tracing_subscriber::filter::Directive; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -35,54 +40,111 @@ pub trait Component: Send + Sync + 'static + Sized { stop: oneshot::Receiver<()>, ) -> AnyhowResultFuture<()>; fn new() -> Self; +} - fn _internal_start(self) -> AnyhowResultFuture<()> { - Box::pin(async move { - global::set_text_map_propagator(TraceContextPropagator::new()); +/// # Panics +/// Panics in case of an invalid `RUST_LOG` variable. +pub fn start_component<Y, C>(component: Y) -> AnyhowResultFuture<()> +where + Y: Component<Config = C>, + C: Default + Clone + DeserializeOwned + Send, +{ + Box::pin(async move { + let settings = Settings::<Y::Config>::new(Y::SERVICE_NAME)?; + + if let Some(meter_config) = settings + .opentelemetry + .as_ref() + .and_then(|f| f.metrics.clone()) + { + let meter = opentelemetry_otlp::new_pipeline() + .metrics( + selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), + stateless_temporality_selector(), + opentelemetry::runtime::Tokio, + ) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_export_config(meter_config.into()), + ) + .with_period(Duration::from_secs(3)) + .with_timeout(Duration::from_secs(10)) + .build()?; + // Using the opentelemetry_otlp meter + global::set_meter_provider(meter); + } + // Use the text propagator + global::set_text_map_propagator(TraceContextPropagator::new()); + // Print debug errors + global::set_error_handler(|error| { + error!("OpenTelemetry error: {}", error); + })?; + + if let Some(tracer_config) = settings + .opentelemetry + .as_ref() + .and_then(|f| f.traces.clone()) + { let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_trace_config(trace::config().with_resource(Resource::new(vec![ - KeyValue::new("service.name", Self::SERVICE_NAME), + KeyValue::new("service.name", Y::SERVICE_NAME), ]))) - .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env()) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_export_config(tracer_config.into()), + ) .install_batch(opentelemetry::runtime::Tokio)?; + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - + tracing_subscriber::registry() + .with(otel_layer) + .with( + // Use the info level as default + EnvFilter::builder() + .with_default_directive(Directive::from_str("info").unwrap()) + .from_env()?, + ) + .init(); + } else { + // Setup tracing tracing_subscriber::registry() .with(fmt::layer()) - .with(telemetry) .with( + // Use the info level as default EnvFilter::builder() .with_default_directive(Directive::from_str("info").unwrap()) .from_env()?, ) .init(); + } - info!("Starting nova"); - let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME); - let (stop, stop_channel) = oneshot::channel(); - - tokio::spawn(async move { - trace!("started signal watching"); - #[cfg(unix)] - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .unwrap() - .recv() - .await; - #[cfg(not(unix))] - return tokio::signal::ctrl_c().await.unwrap(); - - stop.send(()).unwrap(); - }); - - trace!( - "Starting component {component}", - component = Self::SERVICE_NAME - ); - self.start(settings?, stop_channel).await - }) - } + // Finally starting nova + info!("Starting nova component {}", Y::SERVICE_NAME); + let (stop, stop_channel) = oneshot::channel(); + + tokio::spawn(async move { + trace!("started signal watching"); + #[cfg(unix)] + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .unwrap() + .recv() + .await; + #[cfg(not(unix))] + return tokio::signal::ctrl_c().await.unwrap(); + + stop.send(()).unwrap(); + shutdown_tracer_provider(); + }); + + trace!( + "Starting component {component}", + component = Y::SERVICE_NAME + ); + component.start(settings, stop_channel).await + }) } #[macro_export] @@ -90,9 +152,9 @@ macro_rules! ignite { ($c:ty) => { #[allow(dead_code)] fn main() -> anyhow::Result<()> { - use leash::Component; + use $crate::Component; let rt = tokio::runtime::Runtime::new()?; - rt.block_on(<$c as Component>::new()._internal_start())?; + rt.block_on($crate::start_component(<$c as Component>::new()))?; Ok(()) } }; @@ -128,7 +190,5 @@ mod test { } } - - ignite!(TestComponent); } diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml index 419ad0c..9b29bf2 100644 --- a/libs/shared/Cargo.toml +++ b/libs/shared/Cargo.toml @@ -10,7 +10,7 @@ serde_repr = "0.1" config = { version = "0.13", default-features = false, features = ["json", "yaml-rust", "ini"] } -async-nats = "0.25.1" +async-nats = "0.26.0" redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] } tokio = { version = "1", features = ["signal", "rt"] } @@ -21,4 +21,6 @@ anyhow = "1.0.68" serde_test = "1.0.152" -tracing = "0.1.37"
\ No newline at end of file +tracing = "0.1.37" +opentelemetry-otlp = "0.11.0" +opentelemetry = "0.18.0"
\ No newline at end of file diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs index cdf0bd3..4967e3c 100644 --- a/libs/shared/src/config.rs +++ b/libs/shared/src/config.rs @@ -10,6 +10,7 @@ pub struct Settings<T: Clone + DeserializeOwned> { pub config: T, pub nats: crate::nats::Configuration, pub redis: crate::redis::Configuration, + pub opentelemetry: Option<crate::opentelemetry::Configuration>, } impl<T: Clone + DeserializeOwned + Default> Settings<T> { diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index 92e7a05..33a6a13 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -13,3 +13,4 @@ pub mod config; pub mod nats; pub mod payloads; pub mod redis; +pub mod opentelemetry;
\ No newline at end of file diff --git a/libs/shared/src/opentelemetry.rs b/libs/shared/src/opentelemetry.rs new file mode 100644 index 0000000..ca6542d --- /dev/null +++ b/libs/shared/src/opentelemetry.rs @@ -0,0 +1,91 @@ +use std::ops::{Deref, DerefMut}; + +use opentelemetry_otlp::{ExportConfig, Protocol}; +use serde::{de::Visitor, Deserialize}; + +#[derive(Debug, Default)] +#[repr(transparent)] +pub struct ExportConfigDeserialize(ExportConfig); +impl Clone for ExportConfigDeserialize { + fn clone(&self) -> Self { + Self(ExportConfig { + endpoint: self.0.endpoint.clone(), + protocol: self.0.protocol, + timeout: self.0.timeout, + }) + } +} + +impl From<ExportConfigDeserialize> for ExportConfig { + fn from(val: ExportConfigDeserialize) -> Self { + val.0 + } +} + +impl Deref for ExportConfigDeserialize { + type Target = ExportConfig; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for ExportConfigDeserialize { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl<'de> Deserialize<'de> for ExportConfigDeserialize { + fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize, Debug)] + #[serde(field_identifier, rename_all = "lowercase")] + enum Fields { + Endpoint, + Timeout, + } + + struct OpenTelemetryExportConfigVisitor; + impl<'de> Visitor<'de> for OpenTelemetryExportConfigVisitor { + type Value = ExportConfigDeserialize; + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("struct OpenTelemetryExportConfig") + } + + fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut export_config = ExportConfigDeserialize::default(); + export_config.0.protocol = Protocol::Grpc; + while let Some(name) = map.next_key::<Fields>()? { + match name { + Fields::Endpoint => { + export_config.0.endpoint = map.next_value()?; + } + Fields::Timeout => { + export_config.0.timeout = map.next_value()?; + } + } + } + + Ok(export_config) + } + } + + deserializer.deserialize_struct( + "OpenTelemetryExportConfig", + &["endpoint", "protocol", "timeout"], + OpenTelemetryExportConfigVisitor, + ) + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Configuration { + pub traces: Option<ExportConfigDeserialize>, + pub metrics: Option<ExportConfigDeserialize>, +} |
