diff options
Diffstat (limited to 'exes')
| -rw-r--r-- | exes/cache/Cargo.toml | 7 | ||||
| -rw-r--r-- | exes/cache/src/config.rs | 4 | ||||
| -rw-r--r-- | exes/cache/src/main.rs | 193 | ||||
| -rw-r--r-- | exes/cache/src/managers/automoderation.rs | 26 | ||||
| -rw-r--r-- | exes/cache/src/managers/bans.rs | 26 | ||||
| -rw-r--r-- | exes/cache/src/managers/channels.rs | 28 | ||||
| -rw-r--r-- | exes/cache/src/managers/guild_schedules.rs | 29 | ||||
| -rw-r--r-- | exes/cache/src/managers/guilds.rs | 34 | ||||
| -rw-r--r-- | exes/cache/src/managers/integrations.rs | 28 | ||||
| -rw-r--r-- | exes/cache/src/managers/invites.rs | 26 | ||||
| -rw-r--r-- | exes/cache/src/managers/members.rs | 29 | ||||
| -rw-r--r-- | exes/cache/src/managers/messages.rs | 28 | ||||
| -rw-r--r-- | exes/cache/src/managers/mod.rs | 23 | ||||
| -rw-r--r-- | exes/cache/src/managers/reactions.rs | 28 | ||||
| -rw-r--r-- | exes/cache/src/managers/roles.rs | 27 | ||||
| -rw-r--r-- | exes/cache/src/managers/stage_instances.rs | 27 | ||||
| -rw-r--r-- | exes/cache/src/managers/threads.rs | 29 | ||||
| -rw-r--r-- | exes/gateway/Cargo.toml | 3 | ||||
| -rw-r--r-- | exes/gateway/src/main.rs | 10 | ||||
| -rw-r--r-- | exes/webhook/src/handler/handler.rs | 16 | ||||
| -rw-r--r-- | exes/webhook/src/handler/make_service.rs | 4 | ||||
| -rw-r--r-- | exes/webhook/src/main.rs | 6 |
22 files changed, 606 insertions, 25 deletions
diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml index 853903b..61bb449 100644 --- a/exes/cache/Cargo.toml +++ b/exes/cache/Cargo.toml @@ -7,8 +7,11 @@ edition = "2018" [dependencies] shared = { path = "../../libs/shared" } -nats = "0.23.1" +async-nats = "0.25.1" +tokio = { version = "1", features = ["full"] } serde = { version = "1.0.8", features = ["derive"] } log = { version = "0.4", features = ["std"] } serde_json = { version = "1.0" } -redis = "*"
\ No newline at end of file +redis = "*" +futures-util = "*" +twilight-model = "0.14"
\ No newline at end of file diff --git a/exes/cache/src/config.rs b/exes/cache/src/config.rs index 37b1e73..2aa71a8 100644 --- a/exes/cache/src/config.rs +++ b/exes/cache/src/config.rs @@ -1,4 +1,6 @@ use serde::Deserialize; #[derive(Debug, Deserialize, Clone, Default)] -pub struct CacheConfiguration {}
\ No newline at end of file +pub struct CacheConfiguration { + pub toggles: Vec<String> +}
\ No newline at end of file diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs index ab79dc9..a74cfa6 100644 --- a/exes/cache/src/main.rs +++ b/exes/cache/src/main.rs @@ -1,14 +1,201 @@ -use shared::config::Settings; +use std::error::Error; + +use async_nats::{Client, Subscriber}; +use futures_util::stream::StreamExt; use log::info; +use managers::{ + automoderation::Automoderation, bans::Bans, channels::Channels, + guild_schedules::GuildSchedules, guilds::Guilds, integrations::Integrations, invites::Invites, + members::Members, messages::Messages, reactions::Reactions, roles::Roles, + stage_instances::StageInstances, threads::Threads, CacheManager, +}; +use shared::{config::Settings, payloads::CachePayload}; +use twilight_model::gateway::event::DispatchEvent; use crate::config::CacheConfiguration; mod config; +mod managers; + +pub enum CacheSourcedEvents { + None, +} +#[derive(Default)] +struct MegaCache { + automoderation: Automoderation, + channels: Channels, + bans: Bans, + guild_schedules: GuildSchedules, + guilds: Guilds, + integrations: Integrations, + invites: Invites, + members: Members, + messages: Messages, + reactions: Reactions, + roles: Roles, + stage_instances: StageInstances, + threads: Threads, +} -fn main() { +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap(); info!("loaded configuration: {:?}", settings); - + let nats: Client = settings.nats.to_client().await?; + // let redis: redis::Client = settings.redis.into(); + + let mut cache = MegaCache::default(); + + let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?; + listen(&mut sub, &mut cache, settings.config.toggles).await; + Ok(()) +} + +async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec<String>) { + while let Some(data) = sub.next().await { + let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap(); + let event = cp.data.data; + match event { + // Channel events + DispatchEvent::ChannelCreate(_) + | DispatchEvent::ChannelDelete(_) + | DispatchEvent::ChannelPinsUpdate(_) + | DispatchEvent::ChannelUpdate(_) + if features.contains(&"channels_cache".to_string()) => + { + cache.channels.handle(event); + } + + // Guild Cache + DispatchEvent::GuildCreate(_) + | DispatchEvent::GuildDelete(_) + | DispatchEvent::UnavailableGuild(_) + | DispatchEvent::GuildUpdate(_) + | DispatchEvent::WebhooksUpdate(_) + | DispatchEvent::GuildStickersUpdate(_) + | DispatchEvent::GuildEmojisUpdate(_) + | DispatchEvent::VoiceServerUpdate(_) + | DispatchEvent::GuildIntegrationsUpdate(_) + | DispatchEvent::CommandPermissionsUpdate(_) + if features.contains(&"guilds_cache".to_string()) => + { + cache.guilds.handle(event); + } + + // Guild Scheduled event + DispatchEvent::GuildScheduledEventCreate(_) + | DispatchEvent::GuildScheduledEventDelete(_) + | DispatchEvent::GuildScheduledEventUpdate(_) + | DispatchEvent::GuildScheduledEventUserAdd(_) + | DispatchEvent::GuildScheduledEventUserRemove(_) + if features.contains(&"guild_schedules_cache".to_string()) => + { + cache.guild_schedules.handle(event); + } + + // Stage events + DispatchEvent::StageInstanceCreate(_) + | DispatchEvent::StageInstanceDelete(_) + | DispatchEvent::StageInstanceUpdate(_) + if features.contains(&"stage_instances_cache".to_string()) => + { + cache.stage_instances.handle(event); + } + + // Integration events + DispatchEvent::IntegrationCreate(_) + | DispatchEvent::IntegrationDelete(_) + | DispatchEvent::IntegrationUpdate(_) + | DispatchEvent::InteractionCreate(_) + if features.contains(&"integrations_cache".to_string()) => + { + cache.integrations.handle(event); + } + + // Member events + DispatchEvent::MemberAdd(_) + | DispatchEvent::MemberRemove(_) + | DispatchEvent::MemberUpdate(_) + | DispatchEvent::MemberChunk(_) + | DispatchEvent::UserUpdate(_) + if features.contains(&"members_cache".to_string()) => + { + cache.members.handle(event); + } + + // Ban cache + DispatchEvent::BanAdd(_) | DispatchEvent::BanRemove(_) + if features.contains(&"bans_cache".to_string()) => + { + cache.bans.handle(event); + } + + // Reaction cache + DispatchEvent::ReactionAdd(_) + | DispatchEvent::ReactionRemove(_) + | DispatchEvent::ReactionRemoveAll(_) + | DispatchEvent::ReactionRemoveEmoji(_) + if features.contains(&"reactions_cache".to_string()) => + { + cache.reactions.handle(event); + } + + // Message cache + DispatchEvent::MessageCreate(_) + | DispatchEvent::MessageDelete(_) + | DispatchEvent::MessageDeleteBulk(_) + | DispatchEvent::MessageUpdate(_) + if features.contains(&"messages_cache".to_string()) => + { + cache.messages.handle(event); + } + + // Thread cache + DispatchEvent::ThreadCreate(_) + | DispatchEvent::ThreadDelete(_) + | DispatchEvent::ThreadListSync(_) + | DispatchEvent::ThreadMemberUpdate(_) + | DispatchEvent::ThreadMembersUpdate(_) + | DispatchEvent::ThreadUpdate(_) + if features.contains(&"threads_cache".to_string()) => + { + cache.threads.handle(event); + } + + // Invite cache + DispatchEvent::InviteCreate(_) | DispatchEvent::InviteDelete(_) + if features.contains(&"invites_cache".to_string()) => + { + cache.invites.handle(event); + } + + // Roles cache + DispatchEvent::RoleCreate(_) + | DispatchEvent::RoleDelete(_) + | DispatchEvent::RoleUpdate(_) + if features.contains(&"roles_cache".to_string()) => + { + cache.roles.handle(event); + } + + // Automod rules + DispatchEvent::AutoModerationRuleCreate(_) + | DispatchEvent::AutoModerationRuleDelete(_) + | DispatchEvent::AutoModerationRuleUpdate(_) + if features.contains(&"automoderation_cache".to_string()) => + { + cache.automoderation.handle(event); + } + + // Voice State + DispatchEvent::VoiceStateUpdate(_) + if features.contains(&"voice_states_cache".to_string()) => {} + + _ => { + // just forward + } + } + } } diff --git a/exes/cache/src/managers/automoderation.rs b/exes/cache/src/managers/automoderation.rs new file mode 100644 index 0000000..4a1b119 --- /dev/null +++ b/exes/cache/src/managers/automoderation.rs @@ -0,0 +1,26 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + +#[derive(Default)] +pub struct Automoderation {} +impl CacheManager for Automoderation { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::AutoModerationRuleCreate(_) => {} + DispatchEvent::AutoModerationRuleDelete(_) => {} + DispatchEvent::AutoModerationRuleUpdate(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/bans.rs b/exes/cache/src/managers/bans.rs new file mode 100644 index 0000000..27e6a34 --- /dev/null +++ b/exes/cache/src/managers/bans.rs @@ -0,0 +1,26 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Bans {} +impl CacheManager for Bans { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::BanAdd(_) => {} + DispatchEvent::BanRemove(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/channels.rs b/exes/cache/src/managers/channels.rs new file mode 100644 index 0000000..fe34acc --- /dev/null +++ b/exes/cache/src/managers/channels.rs @@ -0,0 +1,28 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Channels {} +impl CacheManager for Channels { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::ChannelCreate(_) => {} + DispatchEvent::ChannelDelete(_) => {} + DispatchEvent::ChannelPinsUpdate(_) => {} + DispatchEvent::ChannelUpdate(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/guild_schedules.rs b/exes/cache/src/managers/guild_schedules.rs new file mode 100644 index 0000000..bcc79c5 --- /dev/null +++ b/exes/cache/src/managers/guild_schedules.rs @@ -0,0 +1,29 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct GuildSchedules {} +impl CacheManager for GuildSchedules { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::GuildScheduledEventCreate(_) => {} + DispatchEvent::GuildScheduledEventDelete(_) => {} + DispatchEvent::GuildScheduledEventUpdate(_) => {} + DispatchEvent::GuildScheduledEventUserAdd(_) => {} + DispatchEvent::GuildScheduledEventUserRemove(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/guilds.rs b/exes/cache/src/managers/guilds.rs new file mode 100644 index 0000000..3f5f4c4 --- /dev/null +++ b/exes/cache/src/managers/guilds.rs @@ -0,0 +1,34 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Guilds {} +impl CacheManager for Guilds { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::GuildCreate(_) => {}, + DispatchEvent::GuildDelete(_) => {}, + DispatchEvent::UnavailableGuild(_) => {}, + DispatchEvent::GuildUpdate(_) => {}, + DispatchEvent::WebhooksUpdate(_) => {}, + DispatchEvent::GuildStickersUpdate(_) => {}, + DispatchEvent::GuildEmojisUpdate(_) => {}, + DispatchEvent::VoiceServerUpdate(_) => {}, + DispatchEvent::GuildIntegrationsUpdate(_) => {}, + DispatchEvent::CommandPermissionsUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/integrations.rs b/exes/cache/src/managers/integrations.rs new file mode 100644 index 0000000..99d292e --- /dev/null +++ b/exes/cache/src/managers/integrations.rs @@ -0,0 +1,28 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Integrations {} +impl CacheManager for Integrations { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::IntegrationCreate(_) => {} + DispatchEvent::IntegrationDelete(_) => {} + DispatchEvent::IntegrationUpdate(_) => {} + DispatchEvent::InteractionCreate(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/invites.rs b/exes/cache/src/managers/invites.rs new file mode 100644 index 0000000..21da64f --- /dev/null +++ b/exes/cache/src/managers/invites.rs @@ -0,0 +1,26 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Invites {} +impl CacheManager for Invites { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::InviteCreate(_) => {} + DispatchEvent::InviteDelete(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/members.rs b/exes/cache/src/managers/members.rs new file mode 100644 index 0000000..3a483f1 --- /dev/null +++ b/exes/cache/src/managers/members.rs @@ -0,0 +1,29 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Members {} +impl CacheManager for Members { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::MemberAdd(_) => {}, + DispatchEvent::MemberRemove(_) => {}, + DispatchEvent::MemberUpdate(_) => {}, + DispatchEvent::MemberChunk(_) => {}, + DispatchEvent::UserUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/messages.rs b/exes/cache/src/managers/messages.rs new file mode 100644 index 0000000..7b06ae7 --- /dev/null +++ b/exes/cache/src/managers/messages.rs @@ -0,0 +1,28 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Messages {} +impl CacheManager for Messages { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::MessageCreate(_) => {}, + DispatchEvent::MessageDelete(_) => {}, + DispatchEvent::MessageDeleteBulk(_) => {}, + DispatchEvent::MessageUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/mod.rs b/exes/cache/src/managers/mod.rs new file mode 100644 index 0000000..370cda6 --- /dev/null +++ b/exes/cache/src/managers/mod.rs @@ -0,0 +1,23 @@ +use std::pin::Pin; +use twilight_model::gateway::event::DispatchEvent; +use std::future::Future; + +use crate::CacheSourcedEvents; + +pub mod channels; +pub mod guilds; +pub mod guild_schedules; +pub mod stage_instances; +pub mod integrations; +pub mod members; +pub mod bans; +pub mod reactions; +pub mod messages; +pub mod threads; +pub mod invites; +pub mod roles; +pub mod automoderation; + +pub trait CacheManager { + fn handle(&self, event: DispatchEvent) -> Pin<Box<dyn Future<Output = CacheSourcedEvents>>>; +} diff --git a/exes/cache/src/managers/reactions.rs b/exes/cache/src/managers/reactions.rs new file mode 100644 index 0000000..5d21e0b --- /dev/null +++ b/exes/cache/src/managers/reactions.rs @@ -0,0 +1,28 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Reactions {} +impl CacheManager for Reactions { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::ReactionAdd(_) => {}, + DispatchEvent::ReactionRemove(_) => {}, + DispatchEvent::ReactionRemoveAll(_) => {}, + DispatchEvent::ReactionRemoveEmoji(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/roles.rs b/exes/cache/src/managers/roles.rs new file mode 100644 index 0000000..5fa0f22 --- /dev/null +++ b/exes/cache/src/managers/roles.rs @@ -0,0 +1,27 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Roles {} +impl CacheManager for Roles { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::RoleCreate(_) => {}, + DispatchEvent::RoleDelete(_) => {}, + DispatchEvent::RoleUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/stage_instances.rs b/exes/cache/src/managers/stage_instances.rs new file mode 100644 index 0000000..314d089 --- /dev/null +++ b/exes/cache/src/managers/stage_instances.rs @@ -0,0 +1,27 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct StageInstances {} +impl CacheManager for StageInstances { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::StageInstanceCreate(_) => {}, + DispatchEvent::StageInstanceDelete(_) => {}, + DispatchEvent::StageInstanceUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/threads.rs b/exes/cache/src/managers/threads.rs new file mode 100644 index 0000000..d4efc2e --- /dev/null +++ b/exes/cache/src/managers/threads.rs @@ -0,0 +1,29 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + +#[derive(Default)] +pub struct Threads {} +impl CacheManager for Threads { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> { + Box::pin(async move { + match event { + DispatchEvent::ThreadCreate(_) => {}, + DispatchEvent::ThreadDelete(_) => {}, + DispatchEvent::ThreadListSync(_) => {}, + DispatchEvent::ThreadMemberUpdate(_) => {}, + DispatchEvent::ThreadMembersUpdate(_) => {}, + DispatchEvent::ThreadUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml index 27269c1..7ef3d98 100644 --- a/exes/gateway/Cargo.toml +++ b/exes/gateway/Cargo.toml @@ -10,4 +10,5 @@ twilight-gateway = { version = "0.14" } twilight-model = "0.14" serde = { version = "1.0.8", features = ["derive"] } futures = "0.3" -serde_json = { version = "1.0" }
\ No newline at end of file +serde_json = { version = "1.0" } +bytes = "*"
\ No newline at end of file diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index 93ca022..6968fe4 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -2,7 +2,7 @@ use config::Config; use shared::{ config::Settings, log::{debug, info}, - nats_crate::Connection, + nats_crate::Client, payloads::{CachePayload, DispatchEventTagged, Tracing}, }; use std::{convert::TryFrom, error::Error}; @@ -15,7 +15,7 @@ use twilight_model::gateway::event::DispatchEvent; async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let settings: Settings<Config> = Settings::new("gateway").unwrap(); let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents); - let nats: Connection = settings.nats.into(); + let nats: Client = settings.nats.to_client().await?; shard.start().await?; @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { } _ => { - let name = event.kind().name().unwrap(); + let name = event.kind().name(); if let Ok(dispatch_event) = DispatchEvent::try_from(event) { let data = CachePayload { tracing: Tracing { @@ -39,7 +39,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { }; let value = serde_json::to_string(&data)?; debug!("nats send: {}", value); - nats.publish(&format!("nova.cache.dispatch.{}", name), value)?; + let bytes = bytes::Bytes::from(value); + nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) + .await?; } } } diff --git a/exes/webhook/src/handler/handler.rs b/exes/webhook/src/handler/handler.rs index 3294c22..b2ef44c 100644 --- a/exes/webhook/src/handler/handler.rs +++ b/exes/webhook/src/handler/handler.rs @@ -8,7 +8,7 @@ use hyper::{ Body, Method, Request, Response, StatusCode, }; use serde::{Deserialize, Serialize}; -use shared::nats_crate::Connection; +use shared::nats_crate::Client; use shared::{ log::{debug, error}, payloads::{CachePayload, DispatchEventTagged, Tracing}, @@ -19,7 +19,6 @@ use std::{ str::from_utf8, sync::Arc, task::{Context, Poll}, - time::Duration, }; use twilight_model::gateway::event::{DispatchEvent}; use twilight_model::{ @@ -31,7 +30,7 @@ use twilight_model::{ #[derive(Clone)] pub struct HandlerService { pub config: Arc<Config>, - pub nats: Arc<Connection>, + pub nats: Arc<Client>, pub public_key: Arc<PublicKey>, } @@ -107,14 +106,13 @@ impl HandlerService { let payload = serde_json::to_string(&data).unwrap(); - match self.nats.request_timeout( - "nova.cache.dispatch.INTERACTION_CREATE", - payload, - Duration::from_secs(2), - ) { + match self.nats.request( + "nova.cache.dispatch.INTERACTION_CREATE".to_string(), + Bytes::from(payload), + ).await { Ok(response) => Ok(Response::builder() .header("Content-Type", "application/json") - .body(Body::from(response.data)) + .body(Body::from(response.reply.unwrap())) .unwrap()), Err(error) => { diff --git a/exes/webhook/src/handler/make_service.rs b/exes/webhook/src/handler/make_service.rs index 2774917..48672a1 100644 --- a/exes/webhook/src/handler/make_service.rs +++ b/exes/webhook/src/handler/make_service.rs @@ -1,7 +1,7 @@ use super::handler::HandlerService; use crate::config::Config; use hyper::service::Service; -use shared::nats_crate::Connection; +use shared::nats_crate::Client; use std::{ future::{ready, Ready}, sync::Arc, @@ -11,7 +11,7 @@ use ed25519_dalek::PublicKey; pub struct MakeSvc { pub settings: Arc<Config>, - pub nats: Arc<Connection>, + pub nats: Arc<Client>, pub public_key: Arc<PublicKey> } diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs index 9527d0f..336dd82 100644 --- a/exes/webhook/src/main.rs +++ b/exes/webhook/src/main.rs @@ -4,10 +4,10 @@ mod handler; use crate::handler::make_service::MakeSvc; use crate::config::Config; -use shared::config::Settings; -use shared::log::{error, info}; use ed25519_dalek::PublicKey; use hyper::Server; +use shared::config::Settings; +use shared::log::{error, info}; #[tokio::main] async fn main() { @@ -35,7 +35,7 @@ async fn start(settings: Settings<Config>) { Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap()); let server = Server::bind(&addr).serve(MakeSvc { settings: config, - nats: Arc::new(settings.nats.into()), + nats: Arc::new(settings.nats.to_client().await.unwrap()), public_key: public_key, }); |
