diff options
Diffstat (limited to 'libs/leash/src/lib.rs')
| -rw-r--r-- | libs/leash/src/lib.rs | 134 |
1 files changed, 97 insertions, 37 deletions
diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs index 91bfaf5..f3c9472 100644 --- a/libs/leash/src/lib.rs +++ b/libs/leash/src/lib.rs @@ -6,10 +6,13 @@ clippy::complexity, clippy::perf, clippy::pedantic, - clippy::nursery, + clippy::nursery )] use anyhow::Result; +use opentelemetry::global::shutdown_tracer_provider; +use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector; +use opentelemetry::sdk::metrics::selectors; use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::sdk::trace::{self}; use opentelemetry::sdk::Resource; @@ -18,8 +21,10 @@ use opentelemetry_otlp::WithExportConfig; use serde::de::DeserializeOwned; use shared::config::Settings; use std::str::FromStr; +use std::time::Duration; use std::{future::Future, pin::Pin}; use tokio::sync::oneshot; +use tracing::log::error; use tracing::{info, log::trace}; use tracing_subscriber::filter::Directive; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -35,54 +40,111 @@ pub trait Component: Send + Sync + 'static + Sized { stop: oneshot::Receiver<()>, ) -> AnyhowResultFuture<()>; fn new() -> Self; +} - fn _internal_start(self) -> AnyhowResultFuture<()> { - Box::pin(async move { - global::set_text_map_propagator(TraceContextPropagator::new()); +/// # Panics +/// Panics in case of an invalid `RUST_LOG` variable. +pub fn start_component<Y, C>(component: Y) -> AnyhowResultFuture<()> +where + Y: Component<Config = C>, + C: Default + Clone + DeserializeOwned + Send, +{ + Box::pin(async move { + let settings = Settings::<Y::Config>::new(Y::SERVICE_NAME)?; + + if let Some(meter_config) = settings + .opentelemetry + .as_ref() + .and_then(|f| f.metrics.clone()) + { + let meter = opentelemetry_otlp::new_pipeline() + .metrics( + selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), + stateless_temporality_selector(), + opentelemetry::runtime::Tokio, + ) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_export_config(meter_config.into()), + ) + .with_period(Duration::from_secs(3)) + .with_timeout(Duration::from_secs(10)) + .build()?; + // Using the opentelemetry_otlp meter + global::set_meter_provider(meter); + } + // Use the text propagator + global::set_text_map_propagator(TraceContextPropagator::new()); + // Print debug errors + global::set_error_handler(|error| { + error!("OpenTelemetry error: {}", error); + })?; + + if let Some(tracer_config) = settings + .opentelemetry + .as_ref() + .and_then(|f| f.traces.clone()) + { let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_trace_config(trace::config().with_resource(Resource::new(vec![ - KeyValue::new("service.name", Self::SERVICE_NAME), + KeyValue::new("service.name", Y::SERVICE_NAME), ]))) - .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env()) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_export_config(tracer_config.into()), + ) .install_batch(opentelemetry::runtime::Tokio)?; + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - + tracing_subscriber::registry() + .with(otel_layer) + .with( + // Use the info level as default + EnvFilter::builder() + .with_default_directive(Directive::from_str("info").unwrap()) + .from_env()?, + ) + .init(); + } else { + // Setup tracing tracing_subscriber::registry() .with(fmt::layer()) - .with(telemetry) .with( + // Use the info level as default EnvFilter::builder() .with_default_directive(Directive::from_str("info").unwrap()) .from_env()?, ) .init(); + } - info!("Starting nova"); - let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME); - let (stop, stop_channel) = oneshot::channel(); - - tokio::spawn(async move { - trace!("started signal watching"); - #[cfg(unix)] - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .unwrap() - .recv() - .await; - #[cfg(not(unix))] - return tokio::signal::ctrl_c().await.unwrap(); - - stop.send(()).unwrap(); - }); - - trace!( - "Starting component {component}", - component = Self::SERVICE_NAME - ); - self.start(settings?, stop_channel).await - }) - } + // Finally starting nova + info!("Starting nova component {}", Y::SERVICE_NAME); + let (stop, stop_channel) = oneshot::channel(); + + tokio::spawn(async move { + trace!("started signal watching"); + #[cfg(unix)] + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .unwrap() + .recv() + .await; + #[cfg(not(unix))] + return tokio::signal::ctrl_c().await.unwrap(); + + stop.send(()).unwrap(); + shutdown_tracer_provider(); + }); + + trace!( + "Starting component {component}", + component = Y::SERVICE_NAME + ); + component.start(settings, stop_channel).await + }) } #[macro_export] @@ -90,9 +152,9 @@ macro_rules! ignite { ($c:ty) => { #[allow(dead_code)] fn main() -> anyhow::Result<()> { - use leash::Component; + use $crate::Component; let rt = tokio::runtime::Runtime::new()?; - rt.block_on(<$c as Component>::new()._internal_start())?; + rt.block_on($crate::start_component(<$c as Component>::new()))?; Ok(()) } }; @@ -128,7 +190,5 @@ mod test { } } - - ignite!(TestComponent); } |
