summaryrefslogtreecommitdiff
path: root/exes
diff options
context:
space:
mode:
authorMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-05 18:33:53 +0400
committerMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-05 18:33:53 +0400
commit038add4d5e8465f8bb36f1a1fa5817a02cab833b (patch)
tree2bcab259fc3b7a57ff9de4b043fa0c5571c85622 /exes
parent63565094f480154556be69a6b3625e47c3b28f04 (diff)
base for tracing
Diffstat (limited to 'exes')
-rw-r--r--exes/all/Cargo.toml10
-rw-r--r--exes/all/build.rs7
-rw-r--r--exes/all/src/lib.rs41
-rw-r--r--exes/cache/Cargo.toml15
-rw-r--r--exes/cache/src/config.rs2
-rw-r--r--exes/cache/src/main.rs10
-rw-r--r--exes/cache/src/managers/channels.rs1
-rw-r--r--exes/cache/src/managers/guild_schedules.rs1
-rw-r--r--exes/cache/src/managers/guilds.rs21
-rw-r--r--exes/cache/src/managers/integrations.rs1
-rw-r--r--exes/cache/src/managers/invites.rs1
-rw-r--r--exes/cache/src/managers/members.rs11
-rw-r--r--exes/cache/src/managers/messages.rs9
-rw-r--r--exes/cache/src/managers/mod.rs16
-rw-r--r--exes/cache/src/managers/reactions.rs9
-rw-r--r--exes/cache/src/managers/roles.rs7
-rw-r--r--exes/cache/src/managers/stage_instances.rs7
-rw-r--r--exes/cache/src/managers/threads.rs12
-rw-r--r--exes/gateway/Cargo.toml21
-rw-r--r--exes/gateway/src/config.rs2
-rw-r--r--exes/gateway/src/lib.rs53
-rw-r--r--exes/gateway/src/main.rs2
-rw-r--r--exes/ratelimit/Cargo.toml16
-rw-r--r--exes/ratelimit/src/grpc.rs60
-rw-r--r--exes/ratelimit/src/lib.rs8
-rw-r--r--exes/ratelimit/src/redis_global_local_bucket_ratelimiter/bucket.rs6
-rw-r--r--exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs9
-rw-r--r--exes/rest/Cargo.toml21
-rw-r--r--exes/rest/src/config.rs7
-rw-r--r--exes/rest/src/handler.rs51
-rw-r--r--exes/rest/src/lib.rs10
-rw-r--r--exes/rest/src/ratelimit_client/mod.rs130
-rw-r--r--exes/rest/src/ratelimit_client/remote_hashring.rs16
-rw-r--r--exes/webhook/Cargo.toml14
-rw-r--r--exes/webhook/src/config.rs1
-rw-r--r--exes/webhook/src/handler/mod.rs12
-rw-r--r--exes/webhook/src/handler/signature.rs38
-rw-r--r--exes/webhook/src/handler/tests/handler.rs1
-rw-r--r--exes/webhook/src/handler/tests/mod.rs2
-rw-r--r--exes/webhook/src/lib.rs7
40 files changed, 385 insertions, 283 deletions
diff --git a/exes/all/Cargo.toml b/exes/all/Cargo.toml
index 41396fc..26e8d04 100644
--- a/exes/all/Cargo.toml
+++ b/exes/all/Cargo.toml
@@ -16,13 +16,19 @@ ratelimit = { path = "../ratelimit" }
rest = { path = "../rest" }
webhook = { path = "../webhook" }
-tokio = { version = "1.23.0", features = ["full"] }
+tokio = { version = "1.23.0", features = ["rt"] }
serde = "1.0.152"
serde_json = "1.0.91"
anyhow = "1.0.68"
+tracing = "0.1.37"
+
config = "0.13.3"
-pretty_env_logger = "0.4.0"
+
+tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
+tracing-opentelemetry = "0.18.0"
+opentelemetry = { version ="0.18.0", features = ["rt-tokio"] }
+opentelemetry-otlp = { version = "0.11.0" }
[lib]
crate-type = ["staticlib"]
diff --git a/exes/all/build.rs b/exes/all/build.rs
index 192dffd..00d7afc 100644
--- a/exes/all/build.rs
+++ b/exes/all/build.rs
@@ -1,9 +1,8 @@
extern crate cbindgen;
+use cbindgen::{Config, Language};
use std::env;
use std::path::PathBuf;
-use cbindgen::{Config, Language};
-
fn main() {
let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
@@ -20,6 +19,6 @@ fn main() {
};
cbindgen::generate_with_config(crate_dir, config)
- .unwrap()
- .write_to_file(output_file);
+ .unwrap()
+ .write_to_file(output_file);
}
diff --git a/exes/all/src/lib.rs b/exes/all/src/lib.rs
index de83243..1c99109 100644
--- a/exes/all/src/lib.rs
+++ b/exes/all/src/lib.rs
@@ -5,14 +5,21 @@ use anyhow::Result;
use config::{Config, Environment, File};
use gateway::GatewayServer;
use leash::Component;
+use opentelemetry::{
+ global,
+ sdk::{propagation::TraceContextPropagator, trace, Resource},
+ KeyValue,
+};
+use opentelemetry_otlp::WithExportConfig;
use ratelimit::RatelimiterServerComponent;
use rest::ReverseProxyServer;
use serde::de::DeserializeOwned;
use serde_json::Value;
-use shared::{config::Settings, log::info};
+use shared::config::Settings;
use std::{
env,
ffi::{CStr, CString},
+ str::FromStr,
time::Duration,
};
use tokio::{
@@ -20,6 +27,11 @@ use tokio::{
sync::oneshot::{self, Sender},
task::JoinHandle,
};
+use tracing::info;
+use tracing_subscriber::{
+ filter::Directive, fmt, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
+ EnvFilter,
+};
use webhook::WebhookServer;
pub struct AllInOneInstance {
@@ -83,9 +95,7 @@ pub extern "C" fn load_config() -> *const libc::c_char {
#[no_mangle]
/// Initialise les logs des composants de nova
/// Utilise la crate `pretty_log_env`
-pub extern "C" fn init_logs() {
- pretty_env_logger::init();
-}
+pub extern "C" fn init_logs() {}
#[no_mangle]
/// Stops a nova instance
@@ -108,6 +118,29 @@ pub unsafe extern "C" fn start_instance(config: *const libc::c_char) -> *mut All
// Initialize a tokio runtime
let rt = Runtime::new().unwrap();
rt.block_on(async move {
+ global::set_text_map_propagator(TraceContextPropagator::new());
+ let tracer =
+ opentelemetry_otlp::new_pipeline()
+ .tracing()
+ .with_trace_config(trace::config().with_resource(Resource::new(vec![
+ KeyValue::new("service.name", "all-in-one"),
+ ])))
+ .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env())
+ .install_batch(opentelemetry::runtime::Tokio)
+ .unwrap();
+
+ let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
+
+ tracing_subscriber::registry()
+ .with(fmt::layer())
+ .with(telemetry)
+ .with(
+ EnvFilter::builder()
+ .with_default_directive(Directive::from_str("info").unwrap())
+ .from_env()
+ .unwrap(),
+ )
+ .init();
// Start the gateway server
let mut aio = vec![];
diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml
index 9a7f9ee..349deaa 100644
--- a/exes/cache/Cargo.toml
+++ b/exes/cache/Cargo.toml
@@ -8,12 +8,15 @@ edition = "2018"
[dependencies]
shared = { path = "../../libs/shared" }
proto = { path = "../../libs/proto" }
-async-nats = "0.25.1"
-tokio = { version = "1", features = ["full"] }
+
+tokio = { version = "1", features = ["rt"] }
+tokio-stream = "0.1.11"
+
serde = { version = "1.0.8", features = ["derive"] }
-log = { version = "0.4", features = ["std"] }
serde_json = { version = "1.0" }
-redis = "*"
-futures-util = "*"
+
+async-nats = "0.25.1"
twilight-model = "0.14"
-anyhow = "1.0.68" \ No newline at end of file
+anyhow = "1.0.68"
+
+tracing = "0.1.37" \ No newline at end of file
diff --git a/exes/cache/src/config.rs b/exes/cache/src/config.rs
index 3d9f5e2..3136c1a 100644
--- a/exes/cache/src/config.rs
+++ b/exes/cache/src/config.rs
@@ -1,5 +1,5 @@
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone, Default)]
pub struct CacheConfiguration {
- pub toggles: Vec<String>
+ pub toggles: Vec<String>,
}
diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs
index 5240a6a..bc13cd5 100644
--- a/exes/cache/src/main.rs
+++ b/exes/cache/src/main.rs
@@ -1,8 +1,6 @@
-use std::{error::Error, pin::Pin};
+use std::{error::Error, future::Future, pin::Pin};
use async_nats::{Client, Subscriber};
-use futures_util::{stream::StreamExt, Future};
-use log::info;
use managers::{
automoderation::Automoderation, bans::Bans, channels::Channels,
guild_schedules::GuildSchedules, guilds::Guilds, integrations::Integrations, invites::Invites,
@@ -10,6 +8,8 @@ use managers::{
stage_instances::StageInstances, threads::Threads, CacheManager,
};
use shared::{config::Settings, payloads::CachePayload};
+use tokio_stream::StreamExt;
+use tracing::info;
use twilight_model::gateway::event::DispatchEvent;
use crate::config::CacheConfiguration;
@@ -43,8 +43,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();
info!("loaded configuration: {:?}", settings);
let nats =
- Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats).await?;
- // let redis: redis::Client = settings.redis.into();
+ Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats)
+ .await?;
let mut cache = Cache::default();
diff --git a/exes/cache/src/managers/channels.rs b/exes/cache/src/managers/channels.rs
index fe34acc..c645420 100644
--- a/exes/cache/src/managers/channels.rs
+++ b/exes/cache/src/managers/channels.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct Channels {}
impl CacheManager for Channels {
diff --git a/exes/cache/src/managers/guild_schedules.rs b/exes/cache/src/managers/guild_schedules.rs
index bcc79c5..0c565f1 100644
--- a/exes/cache/src/managers/guild_schedules.rs
+++ b/exes/cache/src/managers/guild_schedules.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct GuildSchedules {}
impl CacheManager for GuildSchedules {
diff --git a/exes/cache/src/managers/guilds.rs b/exes/cache/src/managers/guilds.rs
index 3f5f4c4..6e142c8 100644
--- a/exes/cache/src/managers/guilds.rs
+++ b/exes/cache/src/managers/guilds.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct Guilds {}
impl CacheManager for Guilds {
@@ -15,16 +14,16 @@ impl CacheManager for Guilds {
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
- DispatchEvent::GuildCreate(_) => {},
- DispatchEvent::GuildDelete(_) => {},
- DispatchEvent::UnavailableGuild(_) => {},
- DispatchEvent::GuildUpdate(_) => {},
- DispatchEvent::WebhooksUpdate(_) => {},
- DispatchEvent::GuildStickersUpdate(_) => {},
- DispatchEvent::GuildEmojisUpdate(_) => {},
- DispatchEvent::VoiceServerUpdate(_) => {},
- DispatchEvent::GuildIntegrationsUpdate(_) => {},
- DispatchEvent::CommandPermissionsUpdate(_) => {},
+ DispatchEvent::GuildCreate(_) => {}
+ DispatchEvent::GuildDelete(_) => {}
+ DispatchEvent::UnavailableGuild(_) => {}
+ DispatchEvent::GuildUpdate(_) => {}
+ DispatchEvent::WebhooksUpdate(_) => {}
+ DispatchEvent::GuildStickersUpdate(_) => {}
+ DispatchEvent::GuildEmojisUpdate(_) => {}
+ DispatchEvent::VoiceServerUpdate(_) => {}
+ DispatchEvent::GuildIntegrationsUpdate(_) => {}
+ DispatchEvent::CommandPermissionsUpdate(_) => {}
_ => unreachable!(),
};
diff --git a/exes/cache/src/managers/integrations.rs b/exes/cache/src/managers/integrations.rs
index 99d292e..8e71724 100644
--- a/exes/cache/src/managers/integrations.rs
+++ b/exes/cache/src/managers/integrations.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct Integrations {}
impl CacheManager for Integrations {
diff --git a/exes/cache/src/managers/invites.rs b/exes/cache/src/managers/invites.rs
index 21da64f..6168e82 100644
--- a/exes/cache/src/managers/invites.rs
+++ b/exes/cache/src/managers/invites.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct Invites {}
impl CacheManager for Invites {
diff --git a/exes/cache/src/managers/members.rs b/exes/cache/src/managers/members.rs
index 3a483f1..21e6cde 100644
--- a/exes/cache/src/managers/members.rs
+++ b/exes/cache/src/managers/members.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct Members {}
impl CacheManager for Members {
@@ -15,11 +14,11 @@ impl CacheManager for Members {
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
- DispatchEvent::MemberAdd(_) => {},
- DispatchEvent::MemberRemove(_) => {},
- DispatchEvent::MemberUpdate(_) => {},
- DispatchEvent::MemberChunk(_) => {},
- DispatchEvent::UserUpdate(_) => {},
+ DispatchEvent::MemberAdd(_) => {}
+ DispatchEvent::MemberRemove(_) => {}
+ DispatchEvent::MemberUpdate(_) => {}
+ DispatchEvent::MemberChunk(_) => {}
+ DispatchEvent::UserUpdate(_) => {}
_ => unreachable!(),
};
diff --git a/exes/cache/src/managers/messages.rs b/exes/cache/src/managers/messages.rs
index 7b06ae7..1725781 100644
--- a/exes/cache/src/managers/messages.rs
+++ b/exes/cache/src/managers/messages.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct Messages {}
impl CacheManager for Messages {
@@ -15,10 +14,10 @@ impl CacheManager for Messages {
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
- DispatchEvent::MessageCreate(_) => {},
- DispatchEvent::MessageDelete(_) => {},
- DispatchEvent::MessageDeleteBulk(_) => {},
- DispatchEvent::MessageUpdate(_) => {},
+ DispatchEvent::MessageCreate(_) => {}
+ DispatchEvent::MessageDelete(_) => {}
+ DispatchEvent::MessageDeleteBulk(_) => {}
+ DispatchEvent::MessageUpdate(_) => {}
_ => unreachable!(),
};
diff --git a/exes/cache/src/managers/mod.rs b/exes/cache/src/managers/mod.rs
index 370cda6..9768a86 100644
--- a/exes/cache/src/managers/mod.rs
+++ b/exes/cache/src/managers/mod.rs
@@ -1,22 +1,22 @@
+use std::future::Future;
use std::pin::Pin;
use twilight_model::gateway::event::DispatchEvent;
-use std::future::Future;
use crate::CacheSourcedEvents;
+pub mod automoderation;
+pub mod bans;
pub mod channels;
-pub mod guilds;
pub mod guild_schedules;
-pub mod stage_instances;
+pub mod guilds;
pub mod integrations;
+pub mod invites;
pub mod members;
-pub mod bans;
-pub mod reactions;
pub mod messages;
-pub mod threads;
-pub mod invites;
+pub mod reactions;
pub mod roles;
-pub mod automoderation;
+pub mod stage_instances;
+pub mod threads;
pub trait CacheManager {
fn handle(&self, event: DispatchEvent) -> Pin<Box<dyn Future<Output = CacheSourcedEvents>>>;
diff --git a/exes/cache/src/managers/reactions.rs b/exes/cache/src/managers/reactions.rs
index 5d21e0b..b154638 100644
--- a/exes/cache/src/managers/reactions.rs
+++ b/exes/cache/src/managers/reactions.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct Reactions {}
impl CacheManager for Reactions {
@@ -15,10 +14,10 @@ impl CacheManager for Reactions {
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
- DispatchEvent::ReactionAdd(_) => {},
- DispatchEvent::ReactionRemove(_) => {},
- DispatchEvent::ReactionRemoveAll(_) => {},
- DispatchEvent::ReactionRemoveEmoji(_) => {},
+ DispatchEvent::ReactionAdd(_) => {}
+ DispatchEvent::ReactionRemove(_) => {}
+ DispatchEvent::ReactionRemoveAll(_) => {}
+ DispatchEvent::ReactionRemoveEmoji(_) => {}
_ => unreachable!(),
};
diff --git a/exes/cache/src/managers/roles.rs b/exes/cache/src/managers/roles.rs
index 5fa0f22..c69526f 100644
--- a/exes/cache/src/managers/roles.rs
+++ b/exes/cache/src/managers/roles.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct Roles {}
impl CacheManager for Roles {
@@ -15,9 +14,9 @@ impl CacheManager for Roles {
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
- DispatchEvent::RoleCreate(_) => {},
- DispatchEvent::RoleDelete(_) => {},
- DispatchEvent::RoleUpdate(_) => {},
+ DispatchEvent::RoleCreate(_) => {}
+ DispatchEvent::RoleDelete(_) => {}
+ DispatchEvent::RoleUpdate(_) => {}
_ => unreachable!(),
};
diff --git a/exes/cache/src/managers/stage_instances.rs b/exes/cache/src/managers/stage_instances.rs
index 314d089..baeabc8 100644
--- a/exes/cache/src/managers/stage_instances.rs
+++ b/exes/cache/src/managers/stage_instances.rs
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct StageInstances {}
impl CacheManager for StageInstances {
@@ -15,9 +14,9 @@ impl CacheManager for StageInstances {
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
- DispatchEvent::StageInstanceCreate(_) => {},
- DispatchEvent::StageInstanceDelete(_) => {},
- DispatchEvent::StageInstanceUpdate(_) => {},
+ DispatchEvent::StageInstanceCreate(_) => {}
+ DispatchEvent::StageInstanceDelete(_) => {}
+ DispatchEvent::StageInstanceUpdate(_) => {}
_ => unreachable!(),
};
diff --git a/exes/cache/src/managers/threads.rs b/exes/cache/src/managers/threads.rs
index d4efc2e..8956616 100644
--- a/exes/cache/src/managers/threads.rs
+++ b/exes/cache/src/managers/threads.rs
@@ -14,12 +14,12 @@ impl CacheManager for Threads {
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
- DispatchEvent::ThreadCreate(_) => {},
- DispatchEvent::ThreadDelete(_) => {},
- DispatchEvent::ThreadListSync(_) => {},
- DispatchEvent::ThreadMemberUpdate(_) => {},
- DispatchEvent::ThreadMembersUpdate(_) => {},
- DispatchEvent::ThreadUpdate(_) => {},
+ DispatchEvent::ThreadCreate(_) => {}
+ DispatchEvent::ThreadDelete(_) => {}
+ DispatchEvent::ThreadListSync(_) => {}
+ DispatchEvent::ThreadMemberUpdate(_) => {}
+ DispatchEvent::ThreadMembersUpdate(_) => {}
+ DispatchEvent::ThreadUpdate(_) => {}
_ => unreachable!(),
};
diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml
index d71ed4a..a174710 100644
--- a/exes/gateway/Cargo.toml
+++ b/exes/gateway/Cargo.toml
@@ -7,11 +7,24 @@ edition = "2018"
shared = { path = "../../libs/shared" }
proto = { path = "../../libs/proto" }
leash = { path = "../../libs/leash" }
-tokio = { version = "1", features = ["full"] }
+
+tokio = { version = "1", features = ["rt", "signal"] }
+tokio-stream = "0.1.11"
+
twilight-gateway = { version = "0.14" }
twilight-model = "0.14"
+
+bytes = "1.3.0"
+anyhow = "1.0.68"
+
serde = { version = "1.0.8", features = ["derive"] }
-futures = "0.3"
serde_json = { version = "1.0" }
-bytes = "*"
-anyhow = "*" \ No newline at end of file
+
+tracing = "0.1.37"
+tracing-futures = "0.2.5"
+
+async-nats = "0.25.1"
+
+tracing-opentelemetry = "0.18.0"
+opentelemetry = "0.18.0"
+opentelemetry-http = "0.7.0" \ No newline at end of file
diff --git a/exes/gateway/src/config.rs b/exes/gateway/src/config.rs
index 923ab30..7b12bfe 100644
--- a/exes/gateway/src/config.rs
+++ b/exes/gateway/src/config.rs
@@ -1,4 +1,4 @@
-use shared::serde::{Deserialize, Serialize};
+use serde::{Deserialize, Serialize};
use twilight_gateway::Intents;
#[derive(Serialize, Deserialize, Clone)]
diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs
index d7a4cee..014b72a 100644
--- a/exes/gateway/src/lib.rs
+++ b/exes/gateway/src/lib.rs
@@ -1,19 +1,28 @@
+use async_nats::{Client, HeaderMap, HeaderValue};
use config::GatewayConfig;
use leash::{AnyhowResultFuture, Component};
+use opentelemetry::{global, propagation::Injector};
use shared::{
config::Settings,
- log::{debug, info},
- nats_crate::Client,
- payloads::{CachePayload, DispatchEventTagged, Tracing},
+ payloads::{CachePayload, DispatchEventTagged},
};
-use std::{convert::TryFrom, pin::Pin};
-use tokio::sync::oneshot;
+use std::{convert::TryFrom, future::Future, pin::Pin, str::FromStr};
+use tokio::{select, sync::oneshot};
+use tokio_stream::StreamExt;
+use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_gateway::{Event, Shard};
pub mod config;
-use futures::FutureExt;
-use futures::{select, Future, StreamExt};
+use tracing::{debug, info, trace_span};
use twilight_model::gateway::event::DispatchEvent;
+struct MetadataMap<'a>(&'a mut HeaderMap);
+
+impl<'a> Injector for MetadataMap<'a> {
+ fn set(&mut self, key: &str, value: String) {
+ self.0.insert(key, HeaderValue::from_str(&value).unwrap())
+ }
+}
+
pub struct GatewayServer {}
impl Component for GatewayServer {
type Config = GatewayConfig;
@@ -22,7 +31,7 @@ impl Component for GatewayServer {
fn start(
&self,
settings: Settings<Self::Config>,
- stop: oneshot::Receiver<()>,
+ mut stop: oneshot::Receiver<()>,
) -> AnyhowResultFuture<()> {
Box::pin(async move {
let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
@@ -33,35 +42,41 @@ impl Component for GatewayServer {
settings.nats,
)
.await?;
-
shard.start().await?;
- let mut stop = stop.fuse();
loop {
select! {
- event = events.next().fuse() => {
+ event = events.next() => {
+
if let Some(event) = event {
match event {
Event::Ready(ready) => {
info!("Logged in as {}", ready.user.name);
- }
+ },
_ => {
+
let name = event.kind().name();
if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+ debug!("handling event {}", name.unwrap());
+
let data = CachePayload {
- tracing: Tracing {
- node_id: "".to_string(),
- span: None,
- },
data: DispatchEventTagged {
data: dispatch_event,
},
};
let value = serde_json::to_string(&data)?;
- debug!("nats send: {}", value);
let bytes = bytes::Bytes::from(value);
- nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+
+ let span = trace_span!("nats send");
+
+ let mut header_map = HeaderMap::new();
+ let context = span.context();
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&context, &mut MetadataMap(&mut header_map))
+ });
+
+ nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes)
.await?;
}
}
@@ -70,7 +85,7 @@ impl Component for GatewayServer {
break
}
},
- _ = stop => break
+ _ = (&mut stop) => break
};
}
diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs
index 2e18f9c..f1b0298 100644
--- a/exes/gateway/src/main.rs
+++ b/exes/gateway/src/main.rs
@@ -1,4 +1,4 @@
-use leash::ignite;
use gateway::GatewayServer;
+use leash::ignite;
ignite!(GatewayServer);
diff --git a/exes/ratelimit/Cargo.toml b/exes/ratelimit/Cargo.toml
index 82ca9f6..d82d8c9 100644
--- a/exes/ratelimit/Cargo.toml
+++ b/exes/ratelimit/Cargo.toml
@@ -9,13 +9,21 @@ edition = "2021"
shared = { path = "../../libs/shared" }
proto = { path = "../../libs/proto" }
leash = { path = "../../libs/leash" }
-hyper = { version = "0.14", features = ["full"] }
-tokio = { version = "1", features = ["full"] }
+
+hyper = "0.14"
+tokio = { version = "1", features = ["rt"] }
+
serde = { version = "1.0.8", features = ["derive"] }
+
twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" }
anyhow = "*"
-futures-util = "0.3.17"
tracing = "*"
-serde_json = { version = "1.0" }
+tracing-opentelemetry = "0.18.0"
+opentelemetry = "0.18.0"
+opentelemetry-http = "0.7.0"
+
tonic = "0.8.3"
tokio-stream = "0.1.11"
+
+
+redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] } \ No newline at end of file
diff --git a/exes/ratelimit/src/grpc.rs b/exes/ratelimit/src/grpc.rs
index a75c329..fbcf3b7 100644
--- a/exes/ratelimit/src/grpc.rs
+++ b/exes/ratelimit/src/grpc.rs
@@ -1,11 +1,13 @@
-
+use opentelemetry::{global, propagation::Extractor};
+use proto::nova::ratelimit::ratelimiter::{
+ ratelimiter_server::Ratelimiter, BucketSubmitTicketRequest, BucketSubmitTicketResponse,
+};
use std::pin::Pin;
-
-use futures_util::Stream;
-use proto::nova::ratelimit::ratelimiter::{ratelimiter_server::Ratelimiter, BucketSubmitTicketResponse, BucketSubmitTicketRequest};
use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, StreamExt};
+use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
+use tracing::{debug, debug_span, info, Instrument};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_http_ratelimiting::{ticket::TicketReceiver, RatelimitHeaders};
use crate::redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter;
@@ -20,9 +22,28 @@ impl RLServer {
}
}
+struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);
+
+impl<'a> Extractor for MetadataMap<'a> {
+ /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None
+ fn get(&self, key: &str) -> Option<&str> {
+ self.0.get(key).and_then(|metadata| metadata.to_str().ok())
+ }
+
+ /// Collect all the keys from the MetadataMap.
+ fn keys(&self) -> Vec<&str> {
+ self.0
+ .keys()
+ .map(|key| match key {
+ tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
+ tonic::metadata::KeyRef::Binary(v) => v.as_str(),
+ })
+ .collect::<Vec<_>>()
+ }
+}
+
#[tonic::async_trait]
impl Ratelimiter for RLServer {
-
type SubmitTicketStream =
Pin<Box<dyn Stream<Item = Result<BucketSubmitTicketResponse, Status>> + Send>>;
@@ -30,6 +51,14 @@ impl Ratelimiter for RLServer {
&self,
req: Request<Streaming<BucketSubmitTicketRequest>>,
) -> Result<Response<Self::SubmitTicketStream>, Status> {
+ let parent_cx =
+ global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata())));
+ // Generate a tracing span as usual
+ let span = tracing::span!(tracing::Level::INFO, "request process");
+
+ // Assign parent trace from external context
+ span.set_parent(parent_cx);
+
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let imrl = self.ratelimiter.clone();
@@ -45,29 +74,30 @@ impl Ratelimiter for RLServer {
match result.data.unwrap() {
proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Path(path) => {
- let a = imrl.ticket(path).await.unwrap();
+ let span = debug_span!("requesting ticket");
+ let a = imrl.ticket(path).instrument(span).await.unwrap();
receiver = Some(a);
-
tx.send(Ok(BucketSubmitTicketResponse {
accepted: 1
})).await.unwrap();
-
},
proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Headers(b) => {
if let Some(recv) = receiver {
- let recv = recv.await.unwrap();
+ let span = debug_span!("waiting for headers data");
+ let recv = recv.instrument(span).await.unwrap();
let rheaders = RatelimitHeaders::from_pairs(b.headers.iter().map(|f| (f.0.as_str(), f.1.as_bytes()))).unwrap();
-
- recv.headers(Some(rheaders)).unwrap();
+ recv.headers(Some(rheaders)).unwrap();
break;
}
},
}
}
- println!("\tstream ended");
- });
+
+ debug!("\tstream ended");
+ info!("request terminated");
+ }.instrument(span));
// echo just write the same data that was received
let out_stream = ReceiverStream::new(rx);
@@ -76,4 +106,4 @@ impl Ratelimiter for RLServer {
Box::pin(out_stream) as Self::SubmitTicketStream
))
}
-} \ No newline at end of file
+}
diff --git a/exes/ratelimit/src/lib.rs b/exes/ratelimit/src/lib.rs
index 345c37a..7a1f98c 100644
--- a/exes/ratelimit/src/lib.rs
+++ b/exes/ratelimit/src/lib.rs
@@ -1,9 +1,9 @@
-use futures_util::FutureExt;
use grpc::RLServer;
use leash::{AnyhowResultFuture, Component};
use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer;
+use redis::aio::MultiplexedConnection;
use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter;
-use shared::{config::Settings, redis_crate::aio::MultiplexedConnection};
+use shared::config::Settings;
use std::future::Future;
use std::{net::ToSocketAddrs, pin::Pin};
use tokio::sync::oneshot;
@@ -34,7 +34,9 @@ impl Component for RatelimiterServerComponent {
.add_service(RatelimiterServer::new(server))
.serve_with_shutdown(
"0.0.0.0:8093".to_socket_addrs().unwrap().next().unwrap(),
- stop.map(|_| ()),
+ async move {
+ let _ = stop.await;
+ },
)
.await?;
diff --git a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/bucket.rs b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/bucket.rs
index d739acf..b35dc45 100644
--- a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/bucket.rs
+++ b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/bucket.rs
@@ -4,7 +4,6 @@
//! and respects the global ratelimit.
use super::RedisLockPair;
-use twilight_http_ratelimiting::{headers::RatelimitHeaders, ticket::TicketNotifier};
use std::{
collections::HashMap,
mem,
@@ -21,6 +20,7 @@ use tokio::{
},
time::{sleep, timeout},
};
+use twilight_http_ratelimiting::{headers::RatelimitHeaders, ticket::TicketNotifier};
/// Time remaining until a bucket will reset.
#[derive(Clone, Debug)]
@@ -265,8 +265,8 @@ impl BucketQueueTask {
RatelimitHeaders::None => return,
RatelimitHeaders::Present(present) => {
Some((present.limit(), present.remaining(), present.reset_after()))
- },
- _=> unreachable!()
+ }
+ _ => unreachable!(),
};
tracing::debug!(path=?self.path, "updating bucket");
diff --git a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs
index a97d5a3..a055b04 100644
--- a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs
+++ b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs
@@ -1,12 +1,11 @@
use self::bucket::{Bucket, BucketQueueTask};
-use shared::redis_crate::aio::MultiplexedConnection;
-use shared::redis_crate::{AsyncCommands};
+use redis::aio::MultiplexedConnection;
+use redis::AsyncCommands;
use tokio::sync::Mutex;
use twilight_http_ratelimiting::ticket::{self, TicketNotifier};
use twilight_http_ratelimiting::GetTicketFuture;
mod bucket;
-
-use futures_util::future;
+use std::future;
use std::{
collections::hash_map::{Entry, HashMap},
sync::Arc,
@@ -97,6 +96,6 @@ impl RedisGlobalLocalBucketRatelimiter {
);
}
- Box::pin(future::ok(rx))
+ Box::pin(future::ready(Ok(rx)))
}
}
diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml
index f4c5ecc..39e9798 100644
--- a/exes/rest/Cargo.toml
+++ b/exes/rest/Cargo.toml
@@ -10,20 +10,23 @@ shared = { path = "../../libs/shared" }
proto = { path = "../../libs/proto" }
leash = { path = "../../libs/leash" }
-hyper = { version = "0.14", features = ["full"] }
-tokio = { version = "1", features = ["full"] }
+hyper= "0.14"
+http = "0.2.8"
+
+tokio = { version = "1", features = ["rt"] }
serde = { version = "1.0.8", features = ["derive"] }
-futures-util = "0.3.17"
+
hyper-tls = "0.5.0"
-lazy_static = "1.4.0"
-xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
+
+# todo(MatthieuCoder): Move to the real twilight when patch is merged
twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" }
+
tracing = "0.1.37"
+anyhow = "1.0.68"
hashring = "0.3.0"
-anyhow = "*"
tonic = "0.8.3"
-serde_json = { version = "1.0" }
-http = "0.2.8"
tokio-stream = "0.1.11"
dns-lookup = "1.0.8"
-tokio-scoped = "0.2.0" \ No newline at end of file
+opentelemetry = "0.18.0"
+opentelemetry-http = "0.7.0"
+tracing-opentelemetry = "0.18.0" \ No newline at end of file
diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs
index 4e27a30..3bfe8db 100644
--- a/exes/rest/src/config.rs
+++ b/exes/rest/src/config.rs
@@ -1,5 +1,5 @@
-use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use serde::Deserialize;
+use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
fn default_listening_address() -> SocketAddr {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8090))
@@ -7,8 +7,7 @@ fn default_listening_address() -> SocketAddr {
#[derive(Debug, Deserialize, Clone)]
pub struct ServerSettings {
- #[serde(default = "default_listening_address")]
- pub listening_adress: SocketAddr
+ pub listening_adress: SocketAddr,
}
impl Default for ServerSettings {
fn default() -> Self {
@@ -20,7 +19,7 @@ impl Default for ServerSettings {
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Discord {
- pub token: String
+ pub token: String,
}
#[derive(Debug, Deserialize, Clone, Default)]
diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs
index 004f763..3ad4cea 100644
--- a/exes/rest/src/handler.rs
+++ b/exes/rest/src/handler.rs
@@ -1,11 +1,3 @@
-use std::{
- collections::hash_map::DefaultHasher,
- convert::TryFrom,
- hash::{Hash, Hasher},
- str::FromStr,
- time::Instant,
-};
-
use anyhow::bail;
use http::{
header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
@@ -13,7 +5,13 @@ use http::{
};
use hyper::{client::HttpConnector, Body, Client};
use hyper_tls::HttpsConnector;
-use shared::log::error;
+use std::{
+ collections::hash_map::DefaultHasher,
+ convert::TryFrom,
+ hash::{Hash, Hasher},
+ str::FromStr,
+};
+use tracing::{debug_span, error, instrument, Instrument};
use twilight_http_ratelimiting::{Method, Path};
use crate::ratelimit_client::RemoteRatelimiter;
@@ -36,6 +34,7 @@ fn normalize_path(request_path: &str) -> (&str, &str) {
}
}
+#[instrument]
pub async fn handle_request(
client: Client<HttpsConnector<HttpConnector>, Body>,
ratelimiter: RemoteRatelimiter,
@@ -72,7 +71,7 @@ pub async fn handle_request(
"Failed to parse path for {:?} {}: {:?}",
method, trimmed_path, e
);
- bail!("failed o parse");
+ bail!("failed to parse");
}
}
.hash(&mut hash);
@@ -80,21 +79,18 @@ pub async fn handle_request(
(hash.finish().to_string(), uri_string)
};
- let start_ticket_request = Instant::now();
- let header_sender = match ratelimiter.ticket(hash).await {
+ let span = debug_span!("ticket validation request");
+ let header_sender = match span
+ .in_scope(|| ratelimiter.ticket(hash))
+ .await
+ {
Ok(sender) => sender,
Err(e) => {
error!("Failed to receive ticket for ratelimiting: {:?}", e);
bail!("failed to reteive ticket");
}
};
- let time_took_ticket = Instant::now() - start_ticket_request;
-
- request.headers_mut().insert(
- AUTHORIZATION,
- HeaderValue::from_bytes(token.as_bytes())
- .expect("strings are guaranteed to be valid utf-8"),
- );
+
request
.headers_mut()
.insert(HOST, HeaderValue::from_static("discord.com"));
@@ -106,7 +102,7 @@ pub async fn handle_request(
request.headers_mut().remove("proxy-connection");
request.headers_mut().remove(TRANSFER_ENCODING);
request.headers_mut().remove(UPGRADE);
-
+
if let Some(auth) = request.headers_mut().get_mut(AUTHORIZATION) {
if auth
.to_str()
@@ -130,25 +126,14 @@ pub async fn handle_request(
}
};
*request.uri_mut() = uri;
-
- let start_upstream_req = Instant::now();
- let mut resp = match client.request(request).await {
+ let span = debug_span!("upstream request to discord");
+ let resp = match client.request(request).instrument(span).await {
Ok(response) => response,
Err(e) => {
error!("Error when requesting the Discord API: {:?}", e);
bail!("failed to request the discord api");
}
};
- let upstream_time_took = Instant::now() - start_upstream_req;
-
- resp.headers_mut().append(
- "X-TicketRequest-Ms",
- HeaderValue::from_str(&time_took_ticket.as_millis().to_string()).unwrap(),
- );
- resp.headers_mut().append(
- "X-Upstream-Ms",
- HeaderValue::from_str(&upstream_time_took.as_millis().to_string()).unwrap(),
- );
let ratelimit_headers = resp
.headers()
diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs
index 02721cc..158fc97 100644
--- a/exes/rest/src/lib.rs
+++ b/exes/rest/src/lib.rs
@@ -8,6 +8,8 @@ use hyper::{
};
use hyper_tls::HttpsConnector;
use leash::{AnyhowResultFuture, Component};
+use opentelemetry::{global, trace::{Tracer}};
+use opentelemetry_http::HeaderExtractor;
use shared::config::Settings;
use std::{convert::Infallible, sync::Arc};
use tokio::sync::oneshot;
@@ -38,6 +40,12 @@ impl Component for ReverseProxyServer {
let token = token.clone();
async move {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
+ let parent_cx = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&HeaderExtractor(request.headers()))
+ });
+ let _span = global::tracer("")
+ .start_with_context("handle_request", &parent_cx);
+
let client = client.clone();
let ratelimiter = ratelimiter.clone();
let token = token.clone();
@@ -64,4 +72,4 @@ impl Component for ReverseProxyServer {
fn new() -> Self {
Self {}
}
-} \ No newline at end of file
+}
diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs
index ea34ad9..7493af9 100644
--- a/exes/rest/src/ratelimit_client/mod.rs
+++ b/exes/rest/src/ratelimit_client/mod.rs
@@ -1,10 +1,10 @@
-use self::remote_hashring::{HashRingWrapper, VNode};
-use futures_util::Future;
+use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode};
+use opentelemetry::global;
use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};
use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest;
-use shared::log::debug;
use std::collections::HashMap;
use std::fmt::Debug;
+use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
@@ -12,6 +12,9 @@ use std::time::{Duration, SystemTime};
use tokio::sync::oneshot::{self};
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
+use tonic::Request;
+use tracing::{debug, debug_span, Instrument, Span, instrument};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
mod remote_hashring;
@@ -45,7 +48,7 @@ impl RemoteRatelimiter {
let mut write = self.remotes.write().await;
- for ip in ["localhost"] {
+ for ip in ["ratelimit"] {
let a = VNode::new(ip.into()).await?;
write.add(a.clone());
}
@@ -82,55 +85,80 @@ impl RemoteRatelimiter {
obj
}
+ #[instrument(name = "ticket task")]
pub fn ticket(&self, path: String) -> IssueTicket {
let remotes = self.remotes.clone();
let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
-
- Box::pin(async move {
- // Get node managing this path
- let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
-
- // Buffers for the gRPC streaming channel.
- let (send, remote) = mpsc::channel(5);
- let (do_request, wait) = oneshot::channel();
- // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
- let stream = ReceiverStream::new(remote);
-
- // Start the grpc streaming
- let ticket = node.submit_ticket(stream).await?;
-
- // First, send the request
- send.send(BucketSubmitTicketRequest {
- data: Some(Data::Path(path)),
- })
- .await?;
-
- // We continuously listen for events in the channel.
- tokio::spawn(async move {
- let message = ticket.into_inner().message().await.unwrap().unwrap();
-
- if message.accepted == 1 {
- do_request.send(()).unwrap();
- let headers = rx.await.unwrap();
-
- send.send(BucketSubmitTicketRequest {
- data: Some(Data::Headers(Headers {
- precise_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("time went backwards")
- .as_millis() as u64,
- headers,
- })),
- })
- .await
- .unwrap();
- }
- });
-
- // Wait for the message to be sent
- wait.await?;
-
- Ok(tx)
- })
+ Box::pin(
+ async move {
+ // Get node managing this path
+ let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
+
+ // Buffers for the gRPC streaming channel.
+ let (send, remote) = mpsc::channel(5);
+ let (do_request, wait) = oneshot::channel();
+ // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
+ let stream = ReceiverStream::new(remote);
+
+ let mut request = Request::new(stream);
+
+ let span = debug_span!("remote request");
+ let context = span.context();
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()))
+ });
+
+ // Start the grpc streaming
+ let ticket = node.submit_ticket(request).await?;
+
+ // First, send the request
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Path(path)),
+ })
+ .await?;
+
+ // We continuously listen for events in the channel.
+ let span = debug_span!("stream worker");
+ tokio::spawn(
+ async move {
+ let span = debug_span!("waiting for ticket upstream");
+ let message = ticket
+ .into_inner()
+ .message()
+ .instrument(span)
+ .await
+ .unwrap()
+ .unwrap();
+
+ if message.accepted == 1 {
+ debug!("request ticket was accepted");
+ do_request.send(()).unwrap();
+ let span = debug_span!("waiting for response headers");
+ let headers = rx.instrument(span).await.unwrap();
+
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Headers(Headers {
+ precise_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("time went backwards")
+ .as_millis()
+ as u64,
+ headers,
+ })),
+ })
+ .await
+ .unwrap();
+ }
+ }
+ .instrument(span),
+ );
+
+ // Wait for the message to be sent
+ wait.await?;
+
+ Ok(tx)
+ }
+ .instrument(Span::current()),
+ )
}
}
diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs
index 4e3fa06..d1c1702 100644
--- a/exes/rest/src/ratelimit_client/remote_hashring.rs
+++ b/exes/rest/src/ratelimit_client/remote_hashring.rs
@@ -1,4 +1,6 @@
use core::fmt::Debug;
+use std::convert::TryFrom;
+use opentelemetry::propagation::Injector;
use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient;
use std::hash::Hash;
use std::ops::Deref;
@@ -32,6 +34,20 @@ impl Hash for VNode {
}
}
+pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap);
+
+impl<'a> Injector for MetadataMap<'a> {
+ /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs
+ fn set(&mut self, key: &str, value: String) {
+ if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
+ if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) {
+ self.0.insert(key, val);
+ }
+ }
+ }
+}
+
+
impl VNode {
pub async fn new(address: String) -> Result<Self, tonic::transport::Error> {
let client = RatelimiterClient::connect(format!("http://{}:8093", address.clone())).await?;
diff --git a/exes/webhook/Cargo.toml b/exes/webhook/Cargo.toml
index 589b5bd..0c50009 100644
--- a/exes/webhook/Cargo.toml
+++ b/exes/webhook/Cargo.toml
@@ -4,21 +4,19 @@ version = "0.1.0"
edition = "2018"
[dependencies]
-hyper = { version = "0.14", features = ["full"] }
-tokio = { version = "1", features = ["full"] }
+hyper = "0.14"
+tokio = { version = "1", features = ["rt"] }
shared = { path = "../../libs/shared" }
proto = { path = "../../libs/proto" }
leash = { path = "../../libs/leash" }
+tracing = "0.1.37"
serde = { version = "1.0.8", features = ["derive"] }
-hex = "0.4.3"
serde_json = { version = "1.0" }
-lazy_static = "1.4.0"
+
+hex = "0.4.3"
ed25519-dalek = "1"
twilight-model = { version = "0.14" }
anyhow = "1.0.68"
-futures-util = "0.3.25"
-[[bin]]
-name = "webhook"
-path = "src/main.rs"
+async-nats = "0.25.1"
diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs
index d1b3fb6..02543e6 100644
--- a/exes/webhook/src/config.rs
+++ b/exes/webhook/src/config.rs
@@ -9,7 +9,6 @@ fn default_listening_address() -> SocketAddr {
#[derive(Debug, Deserialize, Clone, Copy)]
pub struct ServerSettings {
- #[serde(default = "default_listening_address")]
pub listening_adress: SocketAddr,
}
impl Default for ServerSettings {
diff --git a/exes/webhook/src/handler/mod.rs b/exes/webhook/src/handler/mod.rs
index 3ef859e..594919b 100644
--- a/exes/webhook/src/handler/mod.rs
+++ b/exes/webhook/src/handler/mod.rs
@@ -1,4 +1,5 @@
use crate::config::WebhookConfig;
+use async_nats::Client;
use ed25519_dalek::PublicKey;
use error::WebhookError;
use hyper::{
@@ -6,11 +7,7 @@ use hyper::{
service::Service,
Body, Method, Request, Response, StatusCode,
};
-use shared::nats_crate::Client;
-use shared::{
- log::{debug, error},
- payloads::{CachePayload, DispatchEventTagged, Tracing},
-};
+use shared::payloads::{CachePayload, DispatchEventTagged};
use signature::validate_signature;
use std::{
future::Future,
@@ -18,6 +15,7 @@ use std::{
str::from_utf8,
task::{Context, Poll},
};
+use tracing::{debug, error};
use twilight_model::gateway::event::DispatchEvent;
use twilight_model::{
application::interaction::{Interaction, InteractionType},
@@ -98,10 +96,6 @@ impl WebhookService {
// this should hopefully not fail ?
let data = CachePayload {
- tracing: Tracing {
- node_id: "".to_string(),
- span: None,
- },
data: DispatchEventTagged {
data: DispatchEvent::InteractionCreate(Box::new(
InteractionCreate(value),
diff --git a/exes/webhook/src/handler/signature.rs b/exes/webhook/src/handler/signature.rs
index fc5555f..ece7b85 100644
--- a/exes/webhook/src/handler/signature.rs
+++ b/exes/webhook/src/handler/signature.rs
@@ -1,41 +1,13 @@
-use shared::prometheus::{Counter, HistogramVec, labels, opts, register_counter, register_histogram_vec};
-use ed25519_dalek::PublicKey;
-use ed25519_dalek::Verifier;
-use ed25519_dalek::Signature;
-use std::convert::TryInto;
-
-lazy_static::lazy_static! {
- static ref SIGNATURE_TIME_HISTOGRAM: HistogramVec = register_histogram_vec!(
- "nova_webhook_signature_time",
- "The time taken by the signature verification",
- &["signature"]
- ).unwrap();
-
- static ref SIGNATURE_COUNTER: Counter = register_counter!(opts!(
- "nova_webhook_signatures_verify",
- "number of signatures verification issued by the service",
- labels! {"handler" => "webhook_main"}
- )).unwrap();
-}
-
-fn demo<T, const N: usize>(v: Vec<T>) -> [T; N] {
- v.try_into()
- .unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", N, v.len()))
-}
+use ed25519_dalek::{PublicKey, Signature, Verifier};
pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool {
- SIGNATURE_COUNTER.inc();
- let timer = SIGNATURE_TIME_HISTOGRAM.with_label_values(&["webhook_main"]).start_timer();
-
- let signature_result = hex::decode(hex_signature);
+ let mut slice: [u8; Signature::BYTE_SIZE] = [0; Signature::BYTE_SIZE];
+ let signature_result = hex::decode_to_slice(hex_signature, &mut slice);
let mut result = false;
- if let Ok(signature) = signature_result {
- let sig = Signature::from(demo(signature));
-
- result = public_key.verify(data, &sig).is_ok();
+ if signature_result.is_ok() {
+ result = public_key.verify(data, &Signature::from(slice)).is_ok();
}
- timer.observe_duration();
result
}
diff --git a/exes/webhook/src/handler/tests/handler.rs b/exes/webhook/src/handler/tests/handler.rs
index e69de29..8b13789 100644
--- a/exes/webhook/src/handler/tests/handler.rs
+++ b/exes/webhook/src/handler/tests/handler.rs
@@ -0,0 +1 @@
+
diff --git a/exes/webhook/src/handler/tests/mod.rs b/exes/webhook/src/handler/tests/mod.rs
index cf7f558..60ae6d3 100644
--- a/exes/webhook/src/handler/tests/mod.rs
+++ b/exes/webhook/src/handler/tests/mod.rs
@@ -1,2 +1,2 @@
-pub mod signature;
pub mod handler;
+pub mod signature;
diff --git a/exes/webhook/src/lib.rs b/exes/webhook/src/lib.rs
index 43ab9c4..057e70f 100644
--- a/exes/webhook/src/lib.rs
+++ b/exes/webhook/src/lib.rs
@@ -6,11 +6,12 @@ use crate::{
config::WebhookConfig,
handler::{make_service::MakeSvc, WebhookService},
};
+use async_nats::Client;
use hyper::Server;
use leash::{AnyhowResultFuture, Component};
-use shared::{config::Settings, log::info, nats_crate::Client};
+use shared::config::Settings;
use tokio::sync::oneshot;
-
+use tracing::info;
#[derive(Clone, Copy)]
pub struct WebhookServer {}
@@ -27,7 +28,7 @@ impl Component for WebhookServer {
info!("Starting server on {}", settings.server.listening_adress);
let bind = settings.server.listening_adress;
- info!("NAts connected!");
+ info!("Nats connected!");
let nats = Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(
settings.nats,
)