diff options
| author | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-05 18:33:53 +0400 |
|---|---|---|
| committer | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-05 18:33:53 +0400 |
| commit | 038add4d5e8465f8bb36f1a1fa5817a02cab833b (patch) | |
| tree | 2bcab259fc3b7a57ff9de4b043fa0c5571c85622 /exes | |
| parent | 63565094f480154556be69a6b3625e47c3b28f04 (diff) | |
base for tracing
Diffstat (limited to 'exes')
40 files changed, 385 insertions, 283 deletions
diff --git a/exes/all/Cargo.toml b/exes/all/Cargo.toml index 41396fc..26e8d04 100644 --- a/exes/all/Cargo.toml +++ b/exes/all/Cargo.toml @@ -16,13 +16,19 @@ ratelimit = { path = "../ratelimit" } rest = { path = "../rest" } webhook = { path = "../webhook" } -tokio = { version = "1.23.0", features = ["full"] } +tokio = { version = "1.23.0", features = ["rt"] } serde = "1.0.152" serde_json = "1.0.91" anyhow = "1.0.68" +tracing = "0.1.37" + config = "0.13.3" -pretty_env_logger = "0.4.0" + +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } +tracing-opentelemetry = "0.18.0" +opentelemetry = { version ="0.18.0", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.11.0" } [lib] crate-type = ["staticlib"] diff --git a/exes/all/build.rs b/exes/all/build.rs index 192dffd..00d7afc 100644 --- a/exes/all/build.rs +++ b/exes/all/build.rs @@ -1,9 +1,8 @@ extern crate cbindgen; +use cbindgen::{Config, Language}; use std::env; use std::path::PathBuf; -use cbindgen::{Config, Language}; - fn main() { let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); @@ -20,6 +19,6 @@ fn main() { }; cbindgen::generate_with_config(crate_dir, config) - .unwrap() - .write_to_file(output_file); + .unwrap() + .write_to_file(output_file); } diff --git a/exes/all/src/lib.rs b/exes/all/src/lib.rs index de83243..1c99109 100644 --- a/exes/all/src/lib.rs +++ b/exes/all/src/lib.rs @@ -5,14 +5,21 @@ use anyhow::Result; use config::{Config, Environment, File}; use gateway::GatewayServer; use leash::Component; +use opentelemetry::{ + global, + sdk::{propagation::TraceContextPropagator, trace, Resource}, + KeyValue, +}; +use opentelemetry_otlp::WithExportConfig; use ratelimit::RatelimiterServerComponent; use rest::ReverseProxyServer; use serde::de::DeserializeOwned; use serde_json::Value; -use shared::{config::Settings, log::info}; +use shared::config::Settings; use std::{ env, ffi::{CStr, CString}, + str::FromStr, time::Duration, }; use tokio::{ @@ -20,6 +27,11 @@ use tokio::{ sync::oneshot::{self, Sender}, task::JoinHandle, }; +use tracing::info; +use tracing_subscriber::{ + filter::Directive, fmt, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, + EnvFilter, +}; use webhook::WebhookServer; pub struct AllInOneInstance { @@ -83,9 +95,7 @@ pub extern "C" fn load_config() -> *const libc::c_char { #[no_mangle] /// Initialise les logs des composants de nova /// Utilise la crate `pretty_log_env` -pub extern "C" fn init_logs() { - pretty_env_logger::init(); -} +pub extern "C" fn init_logs() {} #[no_mangle] /// Stops a nova instance @@ -108,6 +118,29 @@ pub unsafe extern "C" fn start_instance(config: *const libc::c_char) -> *mut All // Initialize a tokio runtime let rt = Runtime::new().unwrap(); rt.block_on(async move { + global::set_text_map_propagator(TraceContextPropagator::new()); + let tracer = + opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config(trace::config().with_resource(Resource::new(vec![ + KeyValue::new("service.name", "all-in-one"), + ]))) + .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env()) + .install_batch(opentelemetry::runtime::Tokio) + .unwrap(); + + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(telemetry) + .with( + EnvFilter::builder() + .with_default_directive(Directive::from_str("info").unwrap()) + .from_env() + .unwrap(), + ) + .init(); // Start the gateway server let mut aio = vec![]; diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml index 9a7f9ee..349deaa 100644 --- a/exes/cache/Cargo.toml +++ b/exes/cache/Cargo.toml @@ -8,12 +8,15 @@ edition = "2018" [dependencies] shared = { path = "../../libs/shared" } proto = { path = "../../libs/proto" } -async-nats = "0.25.1" -tokio = { version = "1", features = ["full"] } + +tokio = { version = "1", features = ["rt"] } +tokio-stream = "0.1.11" + serde = { version = "1.0.8", features = ["derive"] } -log = { version = "0.4", features = ["std"] } serde_json = { version = "1.0" } -redis = "*" -futures-util = "*" + +async-nats = "0.25.1" twilight-model = "0.14" -anyhow = "1.0.68"
\ No newline at end of file +anyhow = "1.0.68" + +tracing = "0.1.37"
\ No newline at end of file diff --git a/exes/cache/src/config.rs b/exes/cache/src/config.rs index 3d9f5e2..3136c1a 100644 --- a/exes/cache/src/config.rs +++ b/exes/cache/src/config.rs @@ -1,5 +1,5 @@ use serde::Deserialize; #[derive(Debug, Deserialize, Clone, Default)] pub struct CacheConfiguration { - pub toggles: Vec<String> + pub toggles: Vec<String>, } diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs index 5240a6a..bc13cd5 100644 --- a/exes/cache/src/main.rs +++ b/exes/cache/src/main.rs @@ -1,8 +1,6 @@ -use std::{error::Error, pin::Pin}; +use std::{error::Error, future::Future, pin::Pin}; use async_nats::{Client, Subscriber}; -use futures_util::{stream::StreamExt, Future}; -use log::info; use managers::{ automoderation::Automoderation, bans::Bans, channels::Channels, guild_schedules::GuildSchedules, guilds::Guilds, integrations::Integrations, invites::Invites, @@ -10,6 +8,8 @@ use managers::{ stage_instances::StageInstances, threads::Threads, CacheManager, }; use shared::{config::Settings, payloads::CachePayload}; +use tokio_stream::StreamExt; +use tracing::info; use twilight_model::gateway::event::DispatchEvent; use crate::config::CacheConfiguration; @@ -43,8 +43,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap(); info!("loaded configuration: {:?}", settings); let nats = - Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats).await?; - // let redis: redis::Client = settings.redis.into(); + Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats) + .await?; let mut cache = Cache::default(); diff --git a/exes/cache/src/managers/channels.rs b/exes/cache/src/managers/channels.rs index fe34acc..c645420 100644 --- a/exes/cache/src/managers/channels.rs +++ b/exes/cache/src/managers/channels.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct Channels {} impl CacheManager for Channels { diff --git a/exes/cache/src/managers/guild_schedules.rs b/exes/cache/src/managers/guild_schedules.rs index bcc79c5..0c565f1 100644 --- a/exes/cache/src/managers/guild_schedules.rs +++ b/exes/cache/src/managers/guild_schedules.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct GuildSchedules {} impl CacheManager for GuildSchedules { diff --git a/exes/cache/src/managers/guilds.rs b/exes/cache/src/managers/guilds.rs index 3f5f4c4..6e142c8 100644 --- a/exes/cache/src/managers/guilds.rs +++ b/exes/cache/src/managers/guilds.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct Guilds {} impl CacheManager for Guilds { @@ -15,16 +14,16 @@ impl CacheManager for Guilds { ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { Box::pin(async move { match event { - DispatchEvent::GuildCreate(_) => {}, - DispatchEvent::GuildDelete(_) => {}, - DispatchEvent::UnavailableGuild(_) => {}, - DispatchEvent::GuildUpdate(_) => {}, - DispatchEvent::WebhooksUpdate(_) => {}, - DispatchEvent::GuildStickersUpdate(_) => {}, - DispatchEvent::GuildEmojisUpdate(_) => {}, - DispatchEvent::VoiceServerUpdate(_) => {}, - DispatchEvent::GuildIntegrationsUpdate(_) => {}, - DispatchEvent::CommandPermissionsUpdate(_) => {}, + DispatchEvent::GuildCreate(_) => {} + DispatchEvent::GuildDelete(_) => {} + DispatchEvent::UnavailableGuild(_) => {} + DispatchEvent::GuildUpdate(_) => {} + DispatchEvent::WebhooksUpdate(_) => {} + DispatchEvent::GuildStickersUpdate(_) => {} + DispatchEvent::GuildEmojisUpdate(_) => {} + DispatchEvent::VoiceServerUpdate(_) => {} + DispatchEvent::GuildIntegrationsUpdate(_) => {} + DispatchEvent::CommandPermissionsUpdate(_) => {} _ => unreachable!(), }; diff --git a/exes/cache/src/managers/integrations.rs b/exes/cache/src/managers/integrations.rs index 99d292e..8e71724 100644 --- a/exes/cache/src/managers/integrations.rs +++ b/exes/cache/src/managers/integrations.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct Integrations {} impl CacheManager for Integrations { diff --git a/exes/cache/src/managers/invites.rs b/exes/cache/src/managers/invites.rs index 21da64f..6168e82 100644 --- a/exes/cache/src/managers/invites.rs +++ b/exes/cache/src/managers/invites.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct Invites {} impl CacheManager for Invites { diff --git a/exes/cache/src/managers/members.rs b/exes/cache/src/managers/members.rs index 3a483f1..21e6cde 100644 --- a/exes/cache/src/managers/members.rs +++ b/exes/cache/src/managers/members.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct Members {} impl CacheManager for Members { @@ -15,11 +14,11 @@ impl CacheManager for Members { ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { Box::pin(async move { match event { - DispatchEvent::MemberAdd(_) => {}, - DispatchEvent::MemberRemove(_) => {}, - DispatchEvent::MemberUpdate(_) => {}, - DispatchEvent::MemberChunk(_) => {}, - DispatchEvent::UserUpdate(_) => {}, + DispatchEvent::MemberAdd(_) => {} + DispatchEvent::MemberRemove(_) => {} + DispatchEvent::MemberUpdate(_) => {} + DispatchEvent::MemberChunk(_) => {} + DispatchEvent::UserUpdate(_) => {} _ => unreachable!(), }; diff --git a/exes/cache/src/managers/messages.rs b/exes/cache/src/managers/messages.rs index 7b06ae7..1725781 100644 --- a/exes/cache/src/managers/messages.rs +++ b/exes/cache/src/managers/messages.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct Messages {} impl CacheManager for Messages { @@ -15,10 +14,10 @@ impl CacheManager for Messages { ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { Box::pin(async move { match event { - DispatchEvent::MessageCreate(_) => {}, - DispatchEvent::MessageDelete(_) => {}, - DispatchEvent::MessageDeleteBulk(_) => {}, - DispatchEvent::MessageUpdate(_) => {}, + DispatchEvent::MessageCreate(_) => {} + DispatchEvent::MessageDelete(_) => {} + DispatchEvent::MessageDeleteBulk(_) => {} + DispatchEvent::MessageUpdate(_) => {} _ => unreachable!(), }; diff --git a/exes/cache/src/managers/mod.rs b/exes/cache/src/managers/mod.rs index 370cda6..9768a86 100644 --- a/exes/cache/src/managers/mod.rs +++ b/exes/cache/src/managers/mod.rs @@ -1,22 +1,22 @@ +use std::future::Future; use std::pin::Pin; use twilight_model::gateway::event::DispatchEvent; -use std::future::Future; use crate::CacheSourcedEvents; +pub mod automoderation; +pub mod bans; pub mod channels; -pub mod guilds; pub mod guild_schedules; -pub mod stage_instances; +pub mod guilds; pub mod integrations; +pub mod invites; pub mod members; -pub mod bans; -pub mod reactions; pub mod messages; -pub mod threads; -pub mod invites; +pub mod reactions; pub mod roles; -pub mod automoderation; +pub mod stage_instances; +pub mod threads; pub trait CacheManager { fn handle(&self, event: DispatchEvent) -> Pin<Box<dyn Future<Output = CacheSourcedEvents>>>; diff --git a/exes/cache/src/managers/reactions.rs b/exes/cache/src/managers/reactions.rs index 5d21e0b..b154638 100644 --- a/exes/cache/src/managers/reactions.rs +++ b/exes/cache/src/managers/reactions.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct Reactions {} impl CacheManager for Reactions { @@ -15,10 +14,10 @@ impl CacheManager for Reactions { ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { Box::pin(async move { match event { - DispatchEvent::ReactionAdd(_) => {}, - DispatchEvent::ReactionRemove(_) => {}, - DispatchEvent::ReactionRemoveAll(_) => {}, - DispatchEvent::ReactionRemoveEmoji(_) => {}, + DispatchEvent::ReactionAdd(_) => {} + DispatchEvent::ReactionRemove(_) => {} + DispatchEvent::ReactionRemoveAll(_) => {} + DispatchEvent::ReactionRemoveEmoji(_) => {} _ => unreachable!(), }; diff --git a/exes/cache/src/managers/roles.rs b/exes/cache/src/managers/roles.rs index 5fa0f22..c69526f 100644 --- a/exes/cache/src/managers/roles.rs +++ b/exes/cache/src/managers/roles.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct Roles {} impl CacheManager for Roles { @@ -15,9 +14,9 @@ impl CacheManager for Roles { ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { Box::pin(async move { match event { - DispatchEvent::RoleCreate(_) => {}, - DispatchEvent::RoleDelete(_) => {}, - DispatchEvent::RoleUpdate(_) => {}, + DispatchEvent::RoleCreate(_) => {} + DispatchEvent::RoleDelete(_) => {} + DispatchEvent::RoleUpdate(_) => {} _ => unreachable!(), }; diff --git a/exes/cache/src/managers/stage_instances.rs b/exes/cache/src/managers/stage_instances.rs index 314d089..baeabc8 100644 --- a/exes/cache/src/managers/stage_instances.rs +++ b/exes/cache/src/managers/stage_instances.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct StageInstances {} impl CacheManager for StageInstances { @@ -15,9 +14,9 @@ impl CacheManager for StageInstances { ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { Box::pin(async move { match event { - DispatchEvent::StageInstanceCreate(_) => {}, - DispatchEvent::StageInstanceDelete(_) => {}, - DispatchEvent::StageInstanceUpdate(_) => {}, + DispatchEvent::StageInstanceCreate(_) => {} + DispatchEvent::StageInstanceDelete(_) => {} + DispatchEvent::StageInstanceUpdate(_) => {} _ => unreachable!(), }; diff --git a/exes/cache/src/managers/threads.rs b/exes/cache/src/managers/threads.rs index d4efc2e..8956616 100644 --- a/exes/cache/src/managers/threads.rs +++ b/exes/cache/src/managers/threads.rs @@ -14,12 +14,12 @@ impl CacheManager for Threads { ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { Box::pin(async move { match event { - DispatchEvent::ThreadCreate(_) => {}, - DispatchEvent::ThreadDelete(_) => {}, - DispatchEvent::ThreadListSync(_) => {}, - DispatchEvent::ThreadMemberUpdate(_) => {}, - DispatchEvent::ThreadMembersUpdate(_) => {}, - DispatchEvent::ThreadUpdate(_) => {}, + DispatchEvent::ThreadCreate(_) => {} + DispatchEvent::ThreadDelete(_) => {} + DispatchEvent::ThreadListSync(_) => {} + DispatchEvent::ThreadMemberUpdate(_) => {} + DispatchEvent::ThreadMembersUpdate(_) => {} + DispatchEvent::ThreadUpdate(_) => {} _ => unreachable!(), }; diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml index d71ed4a..a174710 100644 --- a/exes/gateway/Cargo.toml +++ b/exes/gateway/Cargo.toml @@ -7,11 +7,24 @@ edition = "2018" shared = { path = "../../libs/shared" } proto = { path = "../../libs/proto" } leash = { path = "../../libs/leash" } -tokio = { version = "1", features = ["full"] } + +tokio = { version = "1", features = ["rt", "signal"] } +tokio-stream = "0.1.11" + twilight-gateway = { version = "0.14" } twilight-model = "0.14" + +bytes = "1.3.0" +anyhow = "1.0.68" + serde = { version = "1.0.8", features = ["derive"] } -futures = "0.3" serde_json = { version = "1.0" } -bytes = "*" -anyhow = "*"
\ No newline at end of file + +tracing = "0.1.37" +tracing-futures = "0.2.5" + +async-nats = "0.25.1" + +tracing-opentelemetry = "0.18.0" +opentelemetry = "0.18.0" +opentelemetry-http = "0.7.0"
\ No newline at end of file diff --git a/exes/gateway/src/config.rs b/exes/gateway/src/config.rs index 923ab30..7b12bfe 100644 --- a/exes/gateway/src/config.rs +++ b/exes/gateway/src/config.rs @@ -1,4 +1,4 @@ -use shared::serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use twilight_gateway::Intents; #[derive(Serialize, Deserialize, Clone)] diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs index d7a4cee..014b72a 100644 --- a/exes/gateway/src/lib.rs +++ b/exes/gateway/src/lib.rs @@ -1,19 +1,28 @@ +use async_nats::{Client, HeaderMap, HeaderValue}; use config::GatewayConfig; use leash::{AnyhowResultFuture, Component}; +use opentelemetry::{global, propagation::Injector}; use shared::{ config::Settings, - log::{debug, info}, - nats_crate::Client, - payloads::{CachePayload, DispatchEventTagged, Tracing}, + payloads::{CachePayload, DispatchEventTagged}, }; -use std::{convert::TryFrom, pin::Pin}; -use tokio::sync::oneshot; +use std::{convert::TryFrom, future::Future, pin::Pin, str::FromStr}; +use tokio::{select, sync::oneshot}; +use tokio_stream::StreamExt; +use tracing_opentelemetry::OpenTelemetrySpanExt; use twilight_gateway::{Event, Shard}; pub mod config; -use futures::FutureExt; -use futures::{select, Future, StreamExt}; +use tracing::{debug, info, trace_span}; use twilight_model::gateway::event::DispatchEvent; +struct MetadataMap<'a>(&'a mut HeaderMap); + +impl<'a> Injector for MetadataMap<'a> { + fn set(&mut self, key: &str, value: String) { + self.0.insert(key, HeaderValue::from_str(&value).unwrap()) + } +} + pub struct GatewayServer {} impl Component for GatewayServer { type Config = GatewayConfig; @@ -22,7 +31,7 @@ impl Component for GatewayServer { fn start( &self, settings: Settings<Self::Config>, - stop: oneshot::Receiver<()>, + mut stop: oneshot::Receiver<()>, ) -> AnyhowResultFuture<()> { Box::pin(async move { let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) @@ -33,35 +42,41 @@ impl Component for GatewayServer { settings.nats, ) .await?; - shard.start().await?; - let mut stop = stop.fuse(); loop { select! { - event = events.next().fuse() => { + event = events.next() => { + if let Some(event) = event { match event { Event::Ready(ready) => { info!("Logged in as {}", ready.user.name); - } + }, _ => { + let name = event.kind().name(); if let Ok(dispatch_event) = DispatchEvent::try_from(event) { + debug!("handling event {}", name.unwrap()); + let data = CachePayload { - tracing: Tracing { - node_id: "".to_string(), - span: None, - }, data: DispatchEventTagged { data: dispatch_event, }, }; let value = serde_json::to_string(&data)?; - debug!("nats send: {}", value); let bytes = bytes::Bytes::from(value); - nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) + + let span = trace_span!("nats send"); + + let mut header_map = HeaderMap::new(); + let context = span.context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&context, &mut MetadataMap(&mut header_map)) + }); + + nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes) .await?; } } @@ -70,7 +85,7 @@ impl Component for GatewayServer { break } }, - _ = stop => break + _ = (&mut stop) => break }; } diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index 2e18f9c..f1b0298 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -1,4 +1,4 @@ -use leash::ignite; use gateway::GatewayServer; +use leash::ignite; ignite!(GatewayServer); diff --git a/exes/ratelimit/Cargo.toml b/exes/ratelimit/Cargo.toml index 82ca9f6..d82d8c9 100644 --- a/exes/ratelimit/Cargo.toml +++ b/exes/ratelimit/Cargo.toml @@ -9,13 +9,21 @@ edition = "2021" shared = { path = "../../libs/shared" } proto = { path = "../../libs/proto" } leash = { path = "../../libs/leash" } -hyper = { version = "0.14", features = ["full"] } -tokio = { version = "1", features = ["full"] } + +hyper = "0.14" +tokio = { version = "1", features = ["rt"] } + serde = { version = "1.0.8", features = ["derive"] } + twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" } anyhow = "*" -futures-util = "0.3.17" tracing = "*" -serde_json = { version = "1.0" } +tracing-opentelemetry = "0.18.0" +opentelemetry = "0.18.0" +opentelemetry-http = "0.7.0" + tonic = "0.8.3" tokio-stream = "0.1.11" + + +redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
\ No newline at end of file diff --git a/exes/ratelimit/src/grpc.rs b/exes/ratelimit/src/grpc.rs index a75c329..fbcf3b7 100644 --- a/exes/ratelimit/src/grpc.rs +++ b/exes/ratelimit/src/grpc.rs @@ -1,11 +1,13 @@ - +use opentelemetry::{global, propagation::Extractor}; +use proto::nova::ratelimit::ratelimiter::{ + ratelimiter_server::Ratelimiter, BucketSubmitTicketRequest, BucketSubmitTicketResponse, +}; use std::pin::Pin; - -use futures_util::Stream; -use proto::nova::ratelimit::ratelimiter::{ratelimiter_server::Ratelimiter, BucketSubmitTicketResponse, BucketSubmitTicketRequest}; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; +use tracing::{debug, debug_span, info, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use twilight_http_ratelimiting::{ticket::TicketReceiver, RatelimitHeaders}; use crate::redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter; @@ -20,9 +22,28 @@ impl RLServer { } } +struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); + +impl<'a> Extractor for MetadataMap<'a> { + /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + /// Collect all the keys from the MetadataMap. + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::<Vec<_>>() + } +} + #[tonic::async_trait] impl Ratelimiter for RLServer { - type SubmitTicketStream = Pin<Box<dyn Stream<Item = Result<BucketSubmitTicketResponse, Status>> + Send>>; @@ -30,6 +51,14 @@ impl Ratelimiter for RLServer { &self, req: Request<Streaming<BucketSubmitTicketRequest>>, ) -> Result<Response<Self::SubmitTicketStream>, Status> { + let parent_cx = + global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata()))); + // Generate a tracing span as usual + let span = tracing::span!(tracing::Level::INFO, "request process"); + + // Assign parent trace from external context + span.set_parent(parent_cx); + let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); let imrl = self.ratelimiter.clone(); @@ -45,29 +74,30 @@ impl Ratelimiter for RLServer { match result.data.unwrap() { proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Path(path) => { - let a = imrl.ticket(path).await.unwrap(); + let span = debug_span!("requesting ticket"); + let a = imrl.ticket(path).instrument(span).await.unwrap(); receiver = Some(a); - tx.send(Ok(BucketSubmitTicketResponse { accepted: 1 })).await.unwrap(); - }, proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Headers(b) => { if let Some(recv) = receiver { - let recv = recv.await.unwrap(); + let span = debug_span!("waiting for headers data"); + let recv = recv.instrument(span).await.unwrap(); let rheaders = RatelimitHeaders::from_pairs(b.headers.iter().map(|f| (f.0.as_str(), f.1.as_bytes()))).unwrap(); - - recv.headers(Some(rheaders)).unwrap(); + recv.headers(Some(rheaders)).unwrap(); break; } }, } } - println!("\tstream ended"); - }); + + debug!("\tstream ended"); + info!("request terminated"); + }.instrument(span)); // echo just write the same data that was received let out_stream = ReceiverStream::new(rx); @@ -76,4 +106,4 @@ impl Ratelimiter for RLServer { Box::pin(out_stream) as Self::SubmitTicketStream )) } -}
\ No newline at end of file +} diff --git a/exes/ratelimit/src/lib.rs b/exes/ratelimit/src/lib.rs index 345c37a..7a1f98c 100644 --- a/exes/ratelimit/src/lib.rs +++ b/exes/ratelimit/src/lib.rs @@ -1,9 +1,9 @@ -use futures_util::FutureExt; use grpc::RLServer; use leash::{AnyhowResultFuture, Component}; use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer; +use redis::aio::MultiplexedConnection; use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter; -use shared::{config::Settings, redis_crate::aio::MultiplexedConnection}; +use shared::config::Settings; use std::future::Future; use std::{net::ToSocketAddrs, pin::Pin}; use tokio::sync::oneshot; @@ -34,7 +34,9 @@ impl Component for RatelimiterServerComponent { .add_service(RatelimiterServer::new(server)) .serve_with_shutdown( "0.0.0.0:8093".to_socket_addrs().unwrap().next().unwrap(), - stop.map(|_| ()), + async move { + let _ = stop.await; + }, ) .await?; diff --git a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/bucket.rs b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/bucket.rs index d739acf..b35dc45 100644 --- a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/bucket.rs +++ b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/bucket.rs @@ -4,7 +4,6 @@ //! and respects the global ratelimit. use super::RedisLockPair; -use twilight_http_ratelimiting::{headers::RatelimitHeaders, ticket::TicketNotifier}; use std::{ collections::HashMap, mem, @@ -21,6 +20,7 @@ use tokio::{ }, time::{sleep, timeout}, }; +use twilight_http_ratelimiting::{headers::RatelimitHeaders, ticket::TicketNotifier}; /// Time remaining until a bucket will reset. #[derive(Clone, Debug)] @@ -265,8 +265,8 @@ impl BucketQueueTask { RatelimitHeaders::None => return, RatelimitHeaders::Present(present) => { Some((present.limit(), present.remaining(), present.reset_after())) - }, - _=> unreachable!() + } + _ => unreachable!(), }; tracing::debug!(path=?self.path, "updating bucket"); diff --git a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs index a97d5a3..a055b04 100644 --- a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs +++ b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs @@ -1,12 +1,11 @@ use self::bucket::{Bucket, BucketQueueTask}; -use shared::redis_crate::aio::MultiplexedConnection; -use shared::redis_crate::{AsyncCommands}; +use redis::aio::MultiplexedConnection; +use redis::AsyncCommands; use tokio::sync::Mutex; use twilight_http_ratelimiting::ticket::{self, TicketNotifier}; use twilight_http_ratelimiting::GetTicketFuture; mod bucket; - -use futures_util::future; +use std::future; use std::{ collections::hash_map::{Entry, HashMap}, sync::Arc, @@ -97,6 +96,6 @@ impl RedisGlobalLocalBucketRatelimiter { ); } - Box::pin(future::ok(rx)) + Box::pin(future::ready(Ok(rx))) } } diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml index f4c5ecc..39e9798 100644 --- a/exes/rest/Cargo.toml +++ b/exes/rest/Cargo.toml @@ -10,20 +10,23 @@ shared = { path = "../../libs/shared" } proto = { path = "../../libs/proto" } leash = { path = "../../libs/leash" } -hyper = { version = "0.14", features = ["full"] } -tokio = { version = "1", features = ["full"] } +hyper= "0.14" +http = "0.2.8" + +tokio = { version = "1", features = ["rt"] } serde = { version = "1.0.8", features = ["derive"] } -futures-util = "0.3.17" + hyper-tls = "0.5.0" -lazy_static = "1.4.0" -xxhash-rust = { version = "0.8.2", features = ["xxh32"] } + +# todo(MatthieuCoder): Move to the real twilight when patch is merged twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" } + tracing = "0.1.37" +anyhow = "1.0.68" hashring = "0.3.0" -anyhow = "*" tonic = "0.8.3" -serde_json = { version = "1.0" } -http = "0.2.8" tokio-stream = "0.1.11" dns-lookup = "1.0.8" -tokio-scoped = "0.2.0"
\ No newline at end of file +opentelemetry = "0.18.0" +opentelemetry-http = "0.7.0" +tracing-opentelemetry = "0.18.0"
\ No newline at end of file diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 4e27a30..3bfe8db 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -1,5 +1,5 @@ -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use serde::Deserialize; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; fn default_listening_address() -> SocketAddr { SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8090)) @@ -7,8 +7,7 @@ fn default_listening_address() -> SocketAddr { #[derive(Debug, Deserialize, Clone)] pub struct ServerSettings { - #[serde(default = "default_listening_address")] - pub listening_adress: SocketAddr + pub listening_adress: SocketAddr, } impl Default for ServerSettings { fn default() -> Self { @@ -20,7 +19,7 @@ impl Default for ServerSettings { #[derive(Debug, Deserialize, Clone, Default)] pub struct Discord { - pub token: String + pub token: String, } #[derive(Debug, Deserialize, Clone, Default)] diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index 004f763..3ad4cea 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -1,11 +1,3 @@ -use std::{ - collections::hash_map::DefaultHasher, - convert::TryFrom, - hash::{Hash, Hasher}, - str::FromStr, - time::Instant, -}; - use anyhow::bail; use http::{ header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE}, @@ -13,7 +5,13 @@ use http::{ }; use hyper::{client::HttpConnector, Body, Client}; use hyper_tls::HttpsConnector; -use shared::log::error; +use std::{ + collections::hash_map::DefaultHasher, + convert::TryFrom, + hash::{Hash, Hasher}, + str::FromStr, +}; +use tracing::{debug_span, error, instrument, Instrument}; use twilight_http_ratelimiting::{Method, Path}; use crate::ratelimit_client::RemoteRatelimiter; @@ -36,6 +34,7 @@ fn normalize_path(request_path: &str) -> (&str, &str) { } } +#[instrument] pub async fn handle_request( client: Client<HttpsConnector<HttpConnector>, Body>, ratelimiter: RemoteRatelimiter, @@ -72,7 +71,7 @@ pub async fn handle_request( "Failed to parse path for {:?} {}: {:?}", method, trimmed_path, e ); - bail!("failed o parse"); + bail!("failed to parse"); } } .hash(&mut hash); @@ -80,21 +79,18 @@ pub async fn handle_request( (hash.finish().to_string(), uri_string) }; - let start_ticket_request = Instant::now(); - let header_sender = match ratelimiter.ticket(hash).await { + let span = debug_span!("ticket validation request"); + let header_sender = match span + .in_scope(|| ratelimiter.ticket(hash)) + .await + { Ok(sender) => sender, Err(e) => { error!("Failed to receive ticket for ratelimiting: {:?}", e); bail!("failed to reteive ticket"); } }; - let time_took_ticket = Instant::now() - start_ticket_request; - - request.headers_mut().insert( - AUTHORIZATION, - HeaderValue::from_bytes(token.as_bytes()) - .expect("strings are guaranteed to be valid utf-8"), - ); + request .headers_mut() .insert(HOST, HeaderValue::from_static("discord.com")); @@ -106,7 +102,7 @@ pub async fn handle_request( request.headers_mut().remove("proxy-connection"); request.headers_mut().remove(TRANSFER_ENCODING); request.headers_mut().remove(UPGRADE); - + if let Some(auth) = request.headers_mut().get_mut(AUTHORIZATION) { if auth .to_str() @@ -130,25 +126,14 @@ pub async fn handle_request( } }; *request.uri_mut() = uri; - - let start_upstream_req = Instant::now(); - let mut resp = match client.request(request).await { + let span = debug_span!("upstream request to discord"); + let resp = match client.request(request).instrument(span).await { Ok(response) => response, Err(e) => { error!("Error when requesting the Discord API: {:?}", e); bail!("failed to request the discord api"); } }; - let upstream_time_took = Instant::now() - start_upstream_req; - - resp.headers_mut().append( - "X-TicketRequest-Ms", - HeaderValue::from_str(&time_took_ticket.as_millis().to_string()).unwrap(), - ); - resp.headers_mut().append( - "X-Upstream-Ms", - HeaderValue::from_str(&upstream_time_took.as_millis().to_string()).unwrap(), - ); let ratelimit_headers = resp .headers() diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs index 02721cc..158fc97 100644 --- a/exes/rest/src/lib.rs +++ b/exes/rest/src/lib.rs @@ -8,6 +8,8 @@ use hyper::{ }; use hyper_tls::HttpsConnector; use leash::{AnyhowResultFuture, Component}; +use opentelemetry::{global, trace::{Tracer}}; +use opentelemetry_http::HeaderExtractor; use shared::config::Settings; use std::{convert::Infallible, sync::Arc}; use tokio::sync::oneshot; @@ -38,6 +40,12 @@ impl Component for ReverseProxyServer { let token = token.clone(); async move { Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { + let parent_cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&HeaderExtractor(request.headers())) + }); + let _span = global::tracer("") + .start_with_context("handle_request", &parent_cx); + let client = client.clone(); let ratelimiter = ratelimiter.clone(); let token = token.clone(); @@ -64,4 +72,4 @@ impl Component for ReverseProxyServer { fn new() -> Self { Self {} } -}
\ No newline at end of file +} diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index ea34ad9..7493af9 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -1,10 +1,10 @@ -use self::remote_hashring::{HashRingWrapper, VNode}; -use futures_util::Future; +use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode}; +use opentelemetry::global; use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers}; use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest; -use shared::log::debug; use std::collections::HashMap; use std::fmt::Debug; +use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::time::UNIX_EPOCH; @@ -12,6 +12,9 @@ use std::time::{Duration, SystemTime}; use tokio::sync::oneshot::{self}; use tokio::sync::{broadcast, mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; +use tonic::Request; +use tracing::{debug, debug_span, Instrument, Span, instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; mod remote_hashring; @@ -45,7 +48,7 @@ impl RemoteRatelimiter { let mut write = self.remotes.write().await; - for ip in ["localhost"] { + for ip in ["ratelimit"] { let a = VNode::new(ip.into()).await?; write.add(a.clone()); } @@ -82,55 +85,80 @@ impl RemoteRatelimiter { obj } + #[instrument(name = "ticket task")] pub fn ticket(&self, path: String) -> IssueTicket { let remotes = self.remotes.clone(); let (tx, rx) = oneshot::channel::<HashMap<String, String>>(); - - Box::pin(async move { - // Get node managing this path - let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); - - // Buffers for the gRPC streaming channel. - let (send, remote) = mpsc::channel(5); - let (do_request, wait) = oneshot::channel(); - // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it - let stream = ReceiverStream::new(remote); - - // Start the grpc streaming - let ticket = node.submit_ticket(stream).await?; - - // First, send the request - send.send(BucketSubmitTicketRequest { - data: Some(Data::Path(path)), - }) - .await?; - - // We continuously listen for events in the channel. - tokio::spawn(async move { - let message = ticket.into_inner().message().await.unwrap().unwrap(); - - if message.accepted == 1 { - do_request.send(()).unwrap(); - let headers = rx.await.unwrap(); - - send.send(BucketSubmitTicketRequest { - data: Some(Data::Headers(Headers { - precise_time: SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("time went backwards") - .as_millis() as u64, - headers, - })), - }) - .await - .unwrap(); - } - }); - - // Wait for the message to be sent - wait.await?; - - Ok(tx) - }) + Box::pin( + async move { + // Get node managing this path + let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); + + // Buffers for the gRPC streaming channel. + let (send, remote) = mpsc::channel(5); + let (do_request, wait) = oneshot::channel(); + // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it + let stream = ReceiverStream::new(remote); + + let mut request = Request::new(stream); + + let span = debug_span!("remote request"); + let context = span.context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())) + }); + + // Start the grpc streaming + let ticket = node.submit_ticket(request).await?; + + // First, send the request + send.send(BucketSubmitTicketRequest { + data: Some(Data::Path(path)), + }) + .await?; + + // We continuously listen for events in the channel. + let span = debug_span!("stream worker"); + tokio::spawn( + async move { + let span = debug_span!("waiting for ticket upstream"); + let message = ticket + .into_inner() + .message() + .instrument(span) + .await + .unwrap() + .unwrap(); + + if message.accepted == 1 { + debug!("request ticket was accepted"); + do_request.send(()).unwrap(); + let span = debug_span!("waiting for response headers"); + let headers = rx.instrument(span).await.unwrap(); + + send.send(BucketSubmitTicketRequest { + data: Some(Data::Headers(Headers { + precise_time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_millis() + as u64, + headers, + })), + }) + .await + .unwrap(); + } + } + .instrument(span), + ); + + // Wait for the message to be sent + wait.await?; + + Ok(tx) + } + .instrument(Span::current()), + ) } } diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs index 4e3fa06..d1c1702 100644 --- a/exes/rest/src/ratelimit_client/remote_hashring.rs +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -1,4 +1,6 @@ use core::fmt::Debug; +use std::convert::TryFrom; +use opentelemetry::propagation::Injector; use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient; use std::hash::Hash; use std::ops::Deref; @@ -32,6 +34,20 @@ impl Hash for VNode { } } +pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap); + +impl<'a> Injector for MetadataMap<'a> { + /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs + fn set(&mut self, key: &str, value: String) { + if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { + if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { + self.0.insert(key, val); + } + } + } +} + + impl VNode { pub async fn new(address: String) -> Result<Self, tonic::transport::Error> { let client = RatelimiterClient::connect(format!("http://{}:8093", address.clone())).await?; diff --git a/exes/webhook/Cargo.toml b/exes/webhook/Cargo.toml index 589b5bd..0c50009 100644 --- a/exes/webhook/Cargo.toml +++ b/exes/webhook/Cargo.toml @@ -4,21 +4,19 @@ version = "0.1.0" edition = "2018" [dependencies] -hyper = { version = "0.14", features = ["full"] } -tokio = { version = "1", features = ["full"] } +hyper = "0.14" +tokio = { version = "1", features = ["rt"] } shared = { path = "../../libs/shared" } proto = { path = "../../libs/proto" } leash = { path = "../../libs/leash" } +tracing = "0.1.37" serde = { version = "1.0.8", features = ["derive"] } -hex = "0.4.3" serde_json = { version = "1.0" } -lazy_static = "1.4.0" + +hex = "0.4.3" ed25519-dalek = "1" twilight-model = { version = "0.14" } anyhow = "1.0.68" -futures-util = "0.3.25" -[[bin]] -name = "webhook" -path = "src/main.rs" +async-nats = "0.25.1" diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs index d1b3fb6..02543e6 100644 --- a/exes/webhook/src/config.rs +++ b/exes/webhook/src/config.rs @@ -9,7 +9,6 @@ fn default_listening_address() -> SocketAddr { #[derive(Debug, Deserialize, Clone, Copy)] pub struct ServerSettings { - #[serde(default = "default_listening_address")] pub listening_adress: SocketAddr, } impl Default for ServerSettings { diff --git a/exes/webhook/src/handler/mod.rs b/exes/webhook/src/handler/mod.rs index 3ef859e..594919b 100644 --- a/exes/webhook/src/handler/mod.rs +++ b/exes/webhook/src/handler/mod.rs @@ -1,4 +1,5 @@ use crate::config::WebhookConfig; +use async_nats::Client; use ed25519_dalek::PublicKey; use error::WebhookError; use hyper::{ @@ -6,11 +7,7 @@ use hyper::{ service::Service, Body, Method, Request, Response, StatusCode, }; -use shared::nats_crate::Client; -use shared::{ - log::{debug, error}, - payloads::{CachePayload, DispatchEventTagged, Tracing}, -}; +use shared::payloads::{CachePayload, DispatchEventTagged}; use signature::validate_signature; use std::{ future::Future, @@ -18,6 +15,7 @@ use std::{ str::from_utf8, task::{Context, Poll}, }; +use tracing::{debug, error}; use twilight_model::gateway::event::DispatchEvent; use twilight_model::{ application::interaction::{Interaction, InteractionType}, @@ -98,10 +96,6 @@ impl WebhookService { // this should hopefully not fail ? let data = CachePayload { - tracing: Tracing { - node_id: "".to_string(), - span: None, - }, data: DispatchEventTagged { data: DispatchEvent::InteractionCreate(Box::new( InteractionCreate(value), diff --git a/exes/webhook/src/handler/signature.rs b/exes/webhook/src/handler/signature.rs index fc5555f..ece7b85 100644 --- a/exes/webhook/src/handler/signature.rs +++ b/exes/webhook/src/handler/signature.rs @@ -1,41 +1,13 @@ -use shared::prometheus::{Counter, HistogramVec, labels, opts, register_counter, register_histogram_vec}; -use ed25519_dalek::PublicKey; -use ed25519_dalek::Verifier; -use ed25519_dalek::Signature; -use std::convert::TryInto; - -lazy_static::lazy_static! { - static ref SIGNATURE_TIME_HISTOGRAM: HistogramVec = register_histogram_vec!( - "nova_webhook_signature_time", - "The time taken by the signature verification", - &["signature"] - ).unwrap(); - - static ref SIGNATURE_COUNTER: Counter = register_counter!(opts!( - "nova_webhook_signatures_verify", - "number of signatures verification issued by the service", - labels! {"handler" => "webhook_main"} - )).unwrap(); -} - -fn demo<T, const N: usize>(v: Vec<T>) -> [T; N] { - v.try_into() - .unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", N, v.len())) -} +use ed25519_dalek::{PublicKey, Signature, Verifier}; pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool { - SIGNATURE_COUNTER.inc(); - let timer = SIGNATURE_TIME_HISTOGRAM.with_label_values(&["webhook_main"]).start_timer(); - - let signature_result = hex::decode(hex_signature); + let mut slice: [u8; Signature::BYTE_SIZE] = [0; Signature::BYTE_SIZE]; + let signature_result = hex::decode_to_slice(hex_signature, &mut slice); let mut result = false; - if let Ok(signature) = signature_result { - let sig = Signature::from(demo(signature)); - - result = public_key.verify(data, &sig).is_ok(); + if signature_result.is_ok() { + result = public_key.verify(data, &Signature::from(slice)).is_ok(); } - timer.observe_duration(); result } diff --git a/exes/webhook/src/handler/tests/handler.rs b/exes/webhook/src/handler/tests/handler.rs index e69de29..8b13789 100644 --- a/exes/webhook/src/handler/tests/handler.rs +++ b/exes/webhook/src/handler/tests/handler.rs @@ -0,0 +1 @@ + diff --git a/exes/webhook/src/handler/tests/mod.rs b/exes/webhook/src/handler/tests/mod.rs index cf7f558..60ae6d3 100644 --- a/exes/webhook/src/handler/tests/mod.rs +++ b/exes/webhook/src/handler/tests/mod.rs @@ -1,2 +1,2 @@ -pub mod signature; pub mod handler; +pub mod signature; diff --git a/exes/webhook/src/lib.rs b/exes/webhook/src/lib.rs index 43ab9c4..057e70f 100644 --- a/exes/webhook/src/lib.rs +++ b/exes/webhook/src/lib.rs @@ -6,11 +6,12 @@ use crate::{ config::WebhookConfig, handler::{make_service::MakeSvc, WebhookService}, }; +use async_nats::Client; use hyper::Server; use leash::{AnyhowResultFuture, Component}; -use shared::{config::Settings, log::info, nats_crate::Client}; +use shared::config::Settings; use tokio::sync::oneshot; - +use tracing::info; #[derive(Clone, Copy)] pub struct WebhookServer {} @@ -27,7 +28,7 @@ impl Component for WebhookServer { info!("Starting server on {}", settings.server.listening_adress); let bind = settings.server.listening_adress; - info!("NAts connected!"); + info!("Nats connected!"); let nats = Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into( settings.nats, ) |
