[[package]]
name = "async-nats"
-version = "0.25.1"
+version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32"
+checksum = "0b0e90b3cd41350d89a242b981fb888f0eb8e3cb81c0fcb4563338f7e96a1084"
dependencies = [
"base64",
"base64-url",
"percent-encoding",
]
+[[package]]
+name = "fs_extra"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
+
[[package]]
name = "futures"
version = "0.3.25"
"serde",
"serde_json",
"shared",
+ "tikv-jemallocator",
"tokio",
"tokio-stream",
"tracing",
dependencies = [
"anyhow",
"criterion",
- "env_logger 0.7.1",
+ "env_logger 0.10.0",
"hyper",
"leash",
"opentelemetry",
"serde",
"shared",
"test-log",
+ "tikv-jemallocator",
"tokio",
"tokio-stream",
"tokio-test",
"proto",
"serde",
"shared",
+ "tikv-jemallocator",
"tokio",
"tokio-stream",
"tonic",
"anyhow",
"async-nats",
"config",
+ "opentelemetry",
+ "opentelemetry-otlp",
"redis",
"serde",
"serde_json",
"once_cell",
]
+[[package]]
+name = "tikv-jemalloc-sys"
+version = "0.5.2+5.3.0-patched"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ec45c14da997d0925c7835883e4d5c181f196fa142f8c19d7643d1e9af2592c3"
+dependencies = [
+ "cc",
+ "fs_extra",
+ "libc",
+]
+
+[[package]]
+name = "tikv-jemallocator"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "20612db8a13a6c06d57ec83953694185a367e16945f66565e8028d2c0bd76979"
+dependencies = [
+ "libc",
+ "tikv-jemalloc-sys",
+]
+
[[package]]
name = "time"
version = "0.3.17"
"serde",
"serde_json",
"shared",
+ "tikv-jemallocator",
"tokio",
"tracing",
"twilight-model",
FROM rust AS chef
USER root
RUN cargo install cargo-chef
-RUN apt-get update && apt-get install -y protobuf-compiler
+RUN apt-get update && apt-get install -y protobuf-compiler
WORKDIR /app
# Planning install
version: "3.3"
services:
nats:
- image: bitnami/nats
+ image: nats
restart: always
ports:
- 4222:4222
image: redis
ports:
- 6379:6379
-
+ mock:
+ image: nginx
cache:
image: ghcr.io/discordnova/nova/cache
restart: always
- ./config/default.yml:/config/default.yml
environment:
- RUST_LOG=debug
- - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on:
- nats
- redis
- ./config/default.yml:/config/default.yml
environment:
- RUST_LOG=debug
- - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on:
- nats
- otelcol
- ./config/default.yml:/config/default.yml
environment:
- RUST_LOG=debug
- - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on:
- ratelimit
- otelcol
- ./config/default.yml:/config/default.yml
environment:
- RUST_LOG=debug
- - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on:
- nats
- otelcol
- ./config/default.yml:/config/default.yml
environment:
- RUST_LOG=debug
- - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on:
- nats
- redis
image: jaegertracing/all-in-one
container_name: jaeger
command:
- - "--memory.max-debugs"
- - "10000"
- "--query.base-path"
- "/jaeger/ui"
- "--prometheus.server-url"
serde = { version = "1.0.8", features = ["derive"] }
serde_json = { version = "1.0" }
-async-nats = "0.25.1"
+async-nats = "0.26.0"
twilight-model = "0.14"
anyhow = "1.0.68"
tracing = "0.1.37"
tracing-futures = "0.2.5"
-async-nats = "0.25.1"
+async-nats = "0.26.0"
tracing-opentelemetry = "0.18.0"
opentelemetry = "0.18.0"
-opentelemetry-http = "0.7.0"
\ No newline at end of file
+opentelemetry-http = "0.7.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
\ No newline at end of file
use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_gateway::{Event, Shard};
pub mod config;
-use tracing::{debug, error, info, trace_span};
+use tracing::{debug, error, info, info_span, instrument, Instrument};
use twilight_model::gateway::event::DispatchEvent;
struct MetadataMap<'a>(&'a mut HeaderMap);
}
}
+#[instrument]
async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
if let Event::Ready(ready) = event {
- info!("Logged in as {}", ready.user.name);
+ info!(username = ready.user.name, "logged in");
} else {
let name = event.kind().name();
if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
- debug!("handling event {}", name.unwrap());
+ let name = name.unwrap();
+ debug!(event_name = name, "handling dispatch event");
let data = CachePayload {
data: DispatchEventTagged(dispatch_event),
let value = serde_json::to_string(&data)?;
let bytes = bytes::Bytes::from(value);
- let span = trace_span!("nats send");
+ let span = info_span!("nats send");
let mut header_map = HeaderMap::new();
let context = span.context();
propagator.inject_context(&context, &mut MetadataMap(&mut header_map));
});
- nats.publish_with_headers(
- format!("nova.cache.dispatch.{}", name.unwrap()),
- header_map,
- bytes,
- )
- .await?;
+ nats.publish_with_headers(format!("nova.cache.dispatch.{name}"), header_map, bytes)
+ .instrument(info_span!("sending to nats"))
+ .await?;
}
}
use gateway::GatewayServer;
use leash::ignite;
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
ignite!(GatewayServer);
[[bench]]
name = "bucket"
harness = false
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
\ No newline at end of file
use leash::ignite;
use ratelimit::RatelimiterServerComponent;
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
ignite!(RatelimiterServerComponent);
dns-lookup = "1.0.8"
opentelemetry = "0.18.0"
opentelemetry-http = "0.7.0"
-tracing-opentelemetry = "0.18.0"
\ No newline at end of file
+tracing-opentelemetry = "0.18.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
\ No newline at end of file
pub discord: Discord,
pub ratelimiter_address: String,
pub ratelimiter_port: u16,
+ #[serde(default = "default_upstream")]
+ pub upstream: Option<String>,
+}
+
+#[allow(clippy::unnecessary_wraps)]
+fn default_upstream() -> Option<String> {
+ Some("https://discord.com".to_string())
}
-use anyhow::bail;
+use anyhow::{bail, Context};
+use futures_util::FutureExt;
use http::{
header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
HeaderValue, Method as HttpMethod, Request, Response, Uri,
};
use hyper::{client::HttpConnector, Body, Client};
use hyper_rustls::HttpsConnector;
+use opentelemetry::{
+ global,
+ metrics::{Counter, Histogram},
+ Context as OpenTelemetryContext, KeyValue,
+};
use std::{
collections::hash_map::DefaultHasher,
convert::TryFrom,
hash::{Hash, Hasher},
str::FromStr,
sync::Arc,
+ time::SystemTime,
};
-use tracing::{debug_span, error, info_span, instrument, Instrument};
+use tracing::{debug_span, error, info_span, log::trace, Instrument};
use twilight_http_ratelimiting::{Method, Path};
-use crate::ratelimit_client::RemoteRatelimiter;
+use crate::{config::ReverseProxy, ratelimit_client::RemoteRatelimiter};
+use lazy_static::lazy_static;
+
+lazy_static! {
+ static ref METER_NAME: &'static str = "";
+ static ref REQUESTS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.http_requests_total")
+ .with_description("Amount of requests processed by the rest reverse proxy")
+ .init()
+ };
+ static ref UPSTREAM_CALLS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.upstream_http_requests_total")
+ .with_description("Amount of requests sent to discord")
+ .init()
+ };
+ static ref TICKET_CALLS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.ticket_http_requests_total")
+ .with_description("Amount of requests sent to the ratelimiter")
+ .init()
+ };
+ static ref HEADERS_SUBMIT_CALLS: Counter<u64> = {
+ global::meter(&METER_NAME)
+ .u64_counter("rest.header_submit_http_requests_total")
+ .with_description("Amount of requests sent to the ratelimiter")
+ .init()
+ };
+ static ref UPSTREAM_TIMES: Histogram<f64> = {
+ global::meter(&METER_NAME)
+ .f64_histogram("rest.upstream_http_request_duration_seconds")
+ .with_description("Time took to request discord")
+ .init()
+ };
+ static ref TICKET_TIMES: Histogram<f64> = {
+ global::meter(&METER_NAME)
+ .f64_histogram("rest.ticket_http_request_duration_seconds")
+ .with_description("Time took to get a ticket from the ratelimiter")
+ .init()
+ };
+ static ref HEADERS_SUBMIT_TIMES: Histogram<f64> = {
+ global::meter(&METER_NAME)
+ .f64_histogram("rest.header_submit_http_request_duration_seconds")
+ .with_description("Time took to get a ticket from the ratelimiter")
+ .init()
+ };
+}
/// Normalizes the path
+#[inline]
fn normalize_path(request_path: &str) -> (&str, &str) {
if let Some(trimmed_path) = request_path.strip_prefix("/api") {
if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) {
}
}
-#[instrument]
+#[inline]
+#[allow(clippy::too_many_lines)]
pub async fn handle_request(
client: Client<HttpsConnector<HttpConnector>, Body>,
ratelimiter: Arc<RemoteRatelimiter>,
+ config: ReverseProxy,
token: String,
mut request: Request<Body>,
) -> Result<Response<Body>, anyhow::Error> {
- let (hash, uri_string) = {
+ let cx = OpenTelemetryContext::current();
+
+ let (bucket, uri_string) = {
let method = match *request.method() {
HttpMethod::DELETE => Method::Delete,
HttpMethod::GET => Method::Get,
HttpMethod::POST => Method::Post,
HttpMethod::PUT => Method::Put,
_ => {
- error!("Unsupported HTTP method in request, {}", request.method());
+ error!(method =? request.method(), "unsupported HTTP method in request");
bail!("unsupported method");
}
};
-
let request_path = request.uri().path();
let (api_path, trimmed_path) = normalize_path(request_path);
+ trace!("normalized path to {trimmed_path}");
- let mut uri_string = format!("https://discord.com{api_path}{trimmed_path}");
+ let mut uri_string = format!(
+ "{}{api_path}{trimmed_path}",
+ config.upstream.expect("no upstream")
+ );
if let Some(query) = request.uri().query() {
uri_string.push('?');
uri_string.push_str(query);
}
+ trace!("full request uri is {uri_string}");
+
let mut hash = DefaultHasher::new();
match Path::try_from((method, trimmed_path)) {
Ok(path) => path,
}
}
.hash(&mut hash);
+ let bucket = hash.finish().to_string();
+ trace!("Request bucket is {}", bucket);
- (hash.finish().to_string(), uri_string)
+ (bucket, uri_string)
};
+
+ REQUESTS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
+
+ let ticket_start = SystemTime::now();
+ TICKET_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
// waits for the request to be authorized
- ratelimiter
- .ticket(hash.clone())
+ match ratelimiter
+ .ticket(bucket.clone())
.instrument(debug_span!("ticket validation request"))
- .await?;
+ .then(|v| async {
+ TICKET_TIMES.record(
+ &cx,
+ ticket_start.elapsed()?.as_secs_f64(),
+ &[KeyValue::new("bucket", bucket.clone())],
+ );
+ v
+ })
+ .await
+ {
+ Ok(_) => {}
+ Err(e) => {
+ error!("Error when requesting the ratelimiter: {:?}", e);
+ bail!("failed to request the ratelimiter");
+ }
+ }
request
.headers_mut()
};
*request.uri_mut() = uri;
let span = debug_span!("upstream request to discord");
- let resp = match client.request(request).instrument(span).await {
+ let upstream_start = SystemTime::now();
+ UPSTREAM_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
+ let resp = match client
+ .request(request)
+ .instrument(span)
+ .then(|v| async {
+ UPSTREAM_TIMES.record(
+ &cx,
+ upstream_start.elapsed()?.as_secs_f64(),
+ &[KeyValue::new("bucket", bucket.clone())],
+ );
+ v.context("")
+ })
+ .await
+ {
Ok(response) => response,
Err(e) => {
error!("Error when requesting the Discord API: {:?}", e);
let headers = resp
.headers()
.into_iter()
- .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string)))
+ .map(|(k, v)| {
+ (
+ k.to_string(),
+ v.to_str().map(std::string::ToString::to_string),
+ )
+ })
.filter(|f| f.1.is_ok())
.map(|f| (f.0, f.1.expect("errors should be filtered")))
.collect();
+ HEADERS_SUBMIT_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
let _submit_headers = ratelimiter
- .submit_headers(hash, headers)
+ .submit_headers(bucket.clone(), headers)
.instrument(info_span!("submitting headers"))
+ .then(|v| async {
+ HEADERS_SUBMIT_TIMES.record(
+ &cx,
+ upstream_start.elapsed()?.as_secs_f64(),
+ &[KeyValue::new("bucket", bucket.clone())],
+ );
+ v
+ })
.await;
Ok(resp)
let client: Client<_, hyper::Body> = Client::builder().build(https);
let token = settings.config.discord.token.clone();
-
+ let config = settings.config.clone();
let service_fn = make_service_fn(move |_: &AddrStream| {
let client = client.clone();
let ratelimiter = ratelimiter.clone();
let token = token.clone();
+ let config = config.clone();
async move {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
let token = token.clone();
let client = client.clone();
let ratelimiter = ratelimiter.clone();
+ let config = config.clone();
async move {
let token = token.clone();
let ratelimiter = ratelimiter.clone();
- handle_request(client, ratelimiter, token, request).await
+ handle_request(client, ratelimiter, config, token, request).await
}
}))
}
use leash::ignite;
use rest::ReverseProxyServer;
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
ignite!(ReverseProxyServer);
twilight-model = { version = "0.14" }
anyhow = "1.0.68"
-async-nats = "0.25.1"
+async-nats = "0.26.0"
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
\ No newline at end of file
use leash::ignite;
use webhook::WebhookServer;
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
ignite!(WebhookServer);
env_logger = "0.10.0"
tracing-opentelemetry = "0.18.0"
opentelemetry = { version ="0.18.0", features = ["rt-tokio"] }
-opentelemetry-otlp = { version = "0.11.0" }
+opentelemetry-otlp = { version = "0.11.0", features = ["metrics"] }
clippy::complexity,
clippy::perf,
clippy::pedantic,
- clippy::nursery,
+ clippy::nursery
)]
use anyhow::Result;
+use opentelemetry::global::shutdown_tracer_provider;
+use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector;
+use opentelemetry::sdk::metrics::selectors;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace::{self};
use opentelemetry::sdk::Resource;
use serde::de::DeserializeOwned;
use shared::config::Settings;
use std::str::FromStr;
+use std::time::Duration;
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
+use tracing::log::error;
use tracing::{info, log::trace};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
stop: oneshot::Receiver<()>,
) -> AnyhowResultFuture<()>;
fn new() -> Self;
+}
- fn _internal_start(self) -> AnyhowResultFuture<()> {
- Box::pin(async move {
- global::set_text_map_propagator(TraceContextPropagator::new());
+/// # Panics
+/// Panics in case of an invalid `RUST_LOG` variable.
+pub fn start_component<Y, C>(component: Y) -> AnyhowResultFuture<()>
+where
+ Y: Component<Config = C>,
+ C: Default + Clone + DeserializeOwned + Send,
+{
+ Box::pin(async move {
+ let settings = Settings::<Y::Config>::new(Y::SERVICE_NAME)?;
+
+ if let Some(meter_config) = settings
+ .opentelemetry
+ .as_ref()
+ .and_then(|f| f.metrics.clone())
+ {
+ let meter = opentelemetry_otlp::new_pipeline()
+ .metrics(
+ selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
+ stateless_temporality_selector(),
+ opentelemetry::runtime::Tokio,
+ )
+ .with_exporter(
+ opentelemetry_otlp::new_exporter()
+ .tonic()
+ .with_export_config(meter_config.into()),
+ )
+ .with_period(Duration::from_secs(3))
+ .with_timeout(Duration::from_secs(10))
+ .build()?;
+ // Using the opentelemetry_otlp meter
+ global::set_meter_provider(meter);
+ }
+ // Use the text propagator
+ global::set_text_map_propagator(TraceContextPropagator::new());
+ // Print debug errors
+ global::set_error_handler(|error| {
+ error!("OpenTelemetry error: {}", error);
+ })?;
+
+ if let Some(tracer_config) = settings
+ .opentelemetry
+ .as_ref()
+ .and_then(|f| f.traces.clone())
+ {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(trace::config().with_resource(Resource::new(vec![
- KeyValue::new("service.name", Self::SERVICE_NAME),
+ KeyValue::new("service.name", Y::SERVICE_NAME),
])))
- .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env())
+ .with_exporter(
+ opentelemetry_otlp::new_exporter()
+ .tonic()
+ .with_export_config(tracer_config.into()),
+ )
.install_batch(opentelemetry::runtime::Tokio)?;
+ let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
- let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
-
+ tracing_subscriber::registry()
+ .with(otel_layer)
+ .with(
+ // Use the info level as default
+ EnvFilter::builder()
+ .with_default_directive(Directive::from_str("info").unwrap())
+ .from_env()?,
+ )
+ .init();
+ } else {
+ // Setup tracing
tracing_subscriber::registry()
.with(fmt::layer())
- .with(telemetry)
.with(
+ // Use the info level as default
EnvFilter::builder()
.with_default_directive(Directive::from_str("info").unwrap())
.from_env()?,
)
.init();
+ }
- info!("Starting nova");
- let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME);
- let (stop, stop_channel) = oneshot::channel();
-
- tokio::spawn(async move {
- trace!("started signal watching");
- #[cfg(unix)]
- tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
- .unwrap()
- .recv()
- .await;
- #[cfg(not(unix))]
- return tokio::signal::ctrl_c().await.unwrap();
-
- stop.send(()).unwrap();
- });
-
- trace!(
- "Starting component {component}",
- component = Self::SERVICE_NAME
- );
- self.start(settings?, stop_channel).await
- })
- }
+ // Finally starting nova
+ info!("Starting nova component {}", Y::SERVICE_NAME);
+ let (stop, stop_channel) = oneshot::channel();
+
+ tokio::spawn(async move {
+ trace!("started signal watching");
+ #[cfg(unix)]
+ tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
+ .unwrap()
+ .recv()
+ .await;
+ #[cfg(not(unix))]
+ return tokio::signal::ctrl_c().await.unwrap();
+
+ stop.send(()).unwrap();
+ shutdown_tracer_provider();
+ });
+
+ trace!(
+ "Starting component {component}",
+ component = Y::SERVICE_NAME
+ );
+ component.start(settings, stop_channel).await
+ })
}
#[macro_export]
($c:ty) => {
#[allow(dead_code)]
fn main() -> anyhow::Result<()> {
- use leash::Component;
+ use $crate::Component;
let rt = tokio::runtime::Runtime::new()?;
- rt.block_on(<$c as Component>::new()._internal_start())?;
+ rt.block_on($crate::start_component(<$c as Component>::new()))?;
Ok(())
}
};
}
}
-
-
ignite!(TestComponent);
}
config = { version = "0.13", default-features = false, features = ["json", "yaml-rust", "ini"] }
-async-nats = "0.25.1"
+async-nats = "0.26.0"
redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
tokio = { version = "1", features = ["signal", "rt"] }
serde_test = "1.0.152"
-tracing = "0.1.37"
\ No newline at end of file
+tracing = "0.1.37"
+opentelemetry-otlp = "0.11.0"
+opentelemetry = "0.18.0"
\ No newline at end of file
pub config: T,
pub nats: crate::nats::Configuration,
pub redis: crate::redis::Configuration,
+ pub opentelemetry: Option<crate::opentelemetry::Configuration>,
}
impl<T: Clone + DeserializeOwned + Default> Settings<T> {
pub mod nats;
pub mod payloads;
pub mod redis;
+pub mod opentelemetry;
\ No newline at end of file
--- /dev/null
+use std::ops::{Deref, DerefMut};
+
+use opentelemetry_otlp::{ExportConfig, Protocol};
+use serde::{de::Visitor, Deserialize};
+
+#[derive(Debug, Default)]
+#[repr(transparent)]
+pub struct ExportConfigDeserialize(ExportConfig);
+impl Clone for ExportConfigDeserialize {
+ fn clone(&self) -> Self {
+ Self(ExportConfig {
+ endpoint: self.0.endpoint.clone(),
+ protocol: self.0.protocol,
+ timeout: self.0.timeout,
+ })
+ }
+}
+
+impl From<ExportConfigDeserialize> for ExportConfig {
+ fn from(val: ExportConfigDeserialize) -> Self {
+ val.0
+ }
+}
+
+impl Deref for ExportConfigDeserialize {
+ type Target = ExportConfig;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for ExportConfigDeserialize {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+impl<'de> Deserialize<'de> for ExportConfigDeserialize {
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ #[derive(Deserialize, Debug)]
+ #[serde(field_identifier, rename_all = "lowercase")]
+ enum Fields {
+ Endpoint,
+ Timeout,
+ }
+
+ struct OpenTelemetryExportConfigVisitor;
+ impl<'de> Visitor<'de> for OpenTelemetryExportConfigVisitor {
+ type Value = ExportConfigDeserialize;
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ formatter.write_str("struct OpenTelemetryExportConfig")
+ }
+
+ fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error>
+ where
+ A: serde::de::MapAccess<'de>,
+ {
+ let mut export_config = ExportConfigDeserialize::default();
+ export_config.0.protocol = Protocol::Grpc;
+ while let Some(name) = map.next_key::<Fields>()? {
+ match name {
+ Fields::Endpoint => {
+ export_config.0.endpoint = map.next_value()?;
+ }
+ Fields::Timeout => {
+ export_config.0.timeout = map.next_value()?;
+ }
+ }
+ }
+
+ Ok(export_config)
+ }
+ }
+
+ deserializer.deserialize_struct(
+ "OpenTelemetryExportConfig",
+ &["endpoint", "protocol", "timeout"],
+ OpenTelemetryExportConfigVisitor,
+ )
+ }
+}
+
+#[derive(Debug, Clone, Deserialize)]
+pub struct Configuration {
+ pub traces: Option<ExportConfigDeserialize>,
+ pub metrics: Option<ExportConfigDeserialize>,
+}