summaryrefslogtreecommitdiff
path: root/libs
diff options
context:
space:
mode:
authorMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-15 17:03:29 +0400
committerMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-15 17:03:29 +0400
commitcbcaa3c01ec4d9ed95dc5af8232de1d10191bc44 (patch)
tree02cd56bb6eba762024c33f2df3cbec4f7a1a27e1 /libs
parentc06cf52e887cc560f1fd4fa72713b239fe0102a6 (diff)
add metrics and jemalloc
Diffstat (limited to 'libs')
-rw-r--r--libs/leash/Cargo.toml2
-rw-r--r--libs/leash/src/lib.rs134
-rw-r--r--libs/shared/Cargo.toml6
-rw-r--r--libs/shared/src/config.rs1
-rw-r--r--libs/shared/src/lib.rs1
-rw-r--r--libs/shared/src/opentelemetry.rs91
6 files changed, 195 insertions, 40 deletions
diff --git a/libs/leash/Cargo.toml b/libs/leash/Cargo.toml
index ca110d0..b96f384 100644
--- a/libs/leash/Cargo.toml
+++ b/libs/leash/Cargo.toml
@@ -16,4 +16,4 @@ tracing = "0.1.37"
env_logger = "0.10.0"
tracing-opentelemetry = "0.18.0"
opentelemetry = { version ="0.18.0", features = ["rt-tokio"] }
-opentelemetry-otlp = { version = "0.11.0" }
+opentelemetry-otlp = { version = "0.11.0", features = ["metrics"] }
diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs
index 91bfaf5..f3c9472 100644
--- a/libs/leash/src/lib.rs
+++ b/libs/leash/src/lib.rs
@@ -6,10 +6,13 @@
clippy::complexity,
clippy::perf,
clippy::pedantic,
- clippy::nursery,
+ clippy::nursery
)]
use anyhow::Result;
+use opentelemetry::global::shutdown_tracer_provider;
+use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector;
+use opentelemetry::sdk::metrics::selectors;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace::{self};
use opentelemetry::sdk::Resource;
@@ -18,8 +21,10 @@ use opentelemetry_otlp::WithExportConfig;
use serde::de::DeserializeOwned;
use shared::config::Settings;
use std::str::FromStr;
+use std::time::Duration;
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
+use tracing::log::error;
use tracing::{info, log::trace};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@@ -35,54 +40,111 @@ pub trait Component: Send + Sync + 'static + Sized {
stop: oneshot::Receiver<()>,
) -> AnyhowResultFuture<()>;
fn new() -> Self;
+}
- fn _internal_start(self) -> AnyhowResultFuture<()> {
- Box::pin(async move {
- global::set_text_map_propagator(TraceContextPropagator::new());
+/// # Panics
+/// Panics in case of an invalid `RUST_LOG` variable.
+pub fn start_component<Y, C>(component: Y) -> AnyhowResultFuture<()>
+where
+ Y: Component<Config = C>,
+ C: Default + Clone + DeserializeOwned + Send,
+{
+ Box::pin(async move {
+ let settings = Settings::<Y::Config>::new(Y::SERVICE_NAME)?;
+
+ if let Some(meter_config) = settings
+ .opentelemetry
+ .as_ref()
+ .and_then(|f| f.metrics.clone())
+ {
+ let meter = opentelemetry_otlp::new_pipeline()
+ .metrics(
+ selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
+ stateless_temporality_selector(),
+ opentelemetry::runtime::Tokio,
+ )
+ .with_exporter(
+ opentelemetry_otlp::new_exporter()
+ .tonic()
+ .with_export_config(meter_config.into()),
+ )
+ .with_period(Duration::from_secs(3))
+ .with_timeout(Duration::from_secs(10))
+ .build()?;
+ // Using the opentelemetry_otlp meter
+ global::set_meter_provider(meter);
+ }
+ // Use the text propagator
+ global::set_text_map_propagator(TraceContextPropagator::new());
+ // Print debug errors
+ global::set_error_handler(|error| {
+ error!("OpenTelemetry error: {}", error);
+ })?;
+
+ if let Some(tracer_config) = settings
+ .opentelemetry
+ .as_ref()
+ .and_then(|f| f.traces.clone())
+ {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(trace::config().with_resource(Resource::new(vec![
- KeyValue::new("service.name", Self::SERVICE_NAME),
+ KeyValue::new("service.name", Y::SERVICE_NAME),
])))
- .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env())
+ .with_exporter(
+ opentelemetry_otlp::new_exporter()
+ .tonic()
+ .with_export_config(tracer_config.into()),
+ )
.install_batch(opentelemetry::runtime::Tokio)?;
+ let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
- let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
-
+ tracing_subscriber::registry()
+ .with(otel_layer)
+ .with(
+ // Use the info level as default
+ EnvFilter::builder()
+ .with_default_directive(Directive::from_str("info").unwrap())
+ .from_env()?,
+ )
+ .init();
+ } else {
+ // Setup tracing
tracing_subscriber::registry()
.with(fmt::layer())
- .with(telemetry)
.with(
+ // Use the info level as default
EnvFilter::builder()
.with_default_directive(Directive::from_str("info").unwrap())
.from_env()?,
)
.init();
+ }
- info!("Starting nova");
- let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME);
- let (stop, stop_channel) = oneshot::channel();
-
- tokio::spawn(async move {
- trace!("started signal watching");
- #[cfg(unix)]
- tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
- .unwrap()
- .recv()
- .await;
- #[cfg(not(unix))]
- return tokio::signal::ctrl_c().await.unwrap();
-
- stop.send(()).unwrap();
- });
-
- trace!(
- "Starting component {component}",
- component = Self::SERVICE_NAME
- );
- self.start(settings?, stop_channel).await
- })
- }
+ // Finally starting nova
+ info!("Starting nova component {}", Y::SERVICE_NAME);
+ let (stop, stop_channel) = oneshot::channel();
+
+ tokio::spawn(async move {
+ trace!("started signal watching");
+ #[cfg(unix)]
+ tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
+ .unwrap()
+ .recv()
+ .await;
+ #[cfg(not(unix))]
+ return tokio::signal::ctrl_c().await.unwrap();
+
+ stop.send(()).unwrap();
+ shutdown_tracer_provider();
+ });
+
+ trace!(
+ "Starting component {component}",
+ component = Y::SERVICE_NAME
+ );
+ component.start(settings, stop_channel).await
+ })
}
#[macro_export]
@@ -90,9 +152,9 @@ macro_rules! ignite {
($c:ty) => {
#[allow(dead_code)]
fn main() -> anyhow::Result<()> {
- use leash::Component;
+ use $crate::Component;
let rt = tokio::runtime::Runtime::new()?;
- rt.block_on(<$c as Component>::new()._internal_start())?;
+ rt.block_on($crate::start_component(<$c as Component>::new()))?;
Ok(())
}
};
@@ -128,7 +190,5 @@ mod test {
}
}
-
-
ignite!(TestComponent);
}
diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml
index 419ad0c..9b29bf2 100644
--- a/libs/shared/Cargo.toml
+++ b/libs/shared/Cargo.toml
@@ -10,7 +10,7 @@ serde_repr = "0.1"
config = { version = "0.13", default-features = false, features = ["json", "yaml-rust", "ini"] }
-async-nats = "0.25.1"
+async-nats = "0.26.0"
redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
tokio = { version = "1", features = ["signal", "rt"] }
@@ -21,4 +21,6 @@ anyhow = "1.0.68"
serde_test = "1.0.152"
-tracing = "0.1.37" \ No newline at end of file
+tracing = "0.1.37"
+opentelemetry-otlp = "0.11.0"
+opentelemetry = "0.18.0" \ No newline at end of file
diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs
index cdf0bd3..4967e3c 100644
--- a/libs/shared/src/config.rs
+++ b/libs/shared/src/config.rs
@@ -10,6 +10,7 @@ pub struct Settings<T: Clone + DeserializeOwned> {
pub config: T,
pub nats: crate::nats::Configuration,
pub redis: crate::redis::Configuration,
+ pub opentelemetry: Option<crate::opentelemetry::Configuration>,
}
impl<T: Clone + DeserializeOwned + Default> Settings<T> {
diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs
index 92e7a05..33a6a13 100644
--- a/libs/shared/src/lib.rs
+++ b/libs/shared/src/lib.rs
@@ -13,3 +13,4 @@ pub mod config;
pub mod nats;
pub mod payloads;
pub mod redis;
+pub mod opentelemetry; \ No newline at end of file
diff --git a/libs/shared/src/opentelemetry.rs b/libs/shared/src/opentelemetry.rs
new file mode 100644
index 0000000..ca6542d
--- /dev/null
+++ b/libs/shared/src/opentelemetry.rs
@@ -0,0 +1,91 @@
+use std::ops::{Deref, DerefMut};
+
+use opentelemetry_otlp::{ExportConfig, Protocol};
+use serde::{de::Visitor, Deserialize};
+
+#[derive(Debug, Default)]
+#[repr(transparent)]
+pub struct ExportConfigDeserialize(ExportConfig);
+impl Clone for ExportConfigDeserialize {
+ fn clone(&self) -> Self {
+ Self(ExportConfig {
+ endpoint: self.0.endpoint.clone(),
+ protocol: self.0.protocol,
+ timeout: self.0.timeout,
+ })
+ }
+}
+
+impl From<ExportConfigDeserialize> for ExportConfig {
+ fn from(val: ExportConfigDeserialize) -> Self {
+ val.0
+ }
+}
+
+impl Deref for ExportConfigDeserialize {
+ type Target = ExportConfig;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for ExportConfigDeserialize {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+impl<'de> Deserialize<'de> for ExportConfigDeserialize {
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ #[derive(Deserialize, Debug)]
+ #[serde(field_identifier, rename_all = "lowercase")]
+ enum Fields {
+ Endpoint,
+ Timeout,
+ }
+
+ struct OpenTelemetryExportConfigVisitor;
+ impl<'de> Visitor<'de> for OpenTelemetryExportConfigVisitor {
+ type Value = ExportConfigDeserialize;
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ formatter.write_str("struct OpenTelemetryExportConfig")
+ }
+
+ fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error>
+ where
+ A: serde::de::MapAccess<'de>,
+ {
+ let mut export_config = ExportConfigDeserialize::default();
+ export_config.0.protocol = Protocol::Grpc;
+ while let Some(name) = map.next_key::<Fields>()? {
+ match name {
+ Fields::Endpoint => {
+ export_config.0.endpoint = map.next_value()?;
+ }
+ Fields::Timeout => {
+ export_config.0.timeout = map.next_value()?;
+ }
+ }
+ }
+
+ Ok(export_config)
+ }
+ }
+
+ deserializer.deserialize_struct(
+ "OpenTelemetryExportConfig",
+ &["endpoint", "protocol", "timeout"],
+ OpenTelemetryExportConfigVisitor,
+ )
+ }
+}
+
+#[derive(Debug, Clone, Deserialize)]
+pub struct Configuration {
+ pub traces: Option<ExportConfigDeserialize>,
+ pub metrics: Option<ExportConfigDeserialize>,
+}