From: MatthieuCoder Date: Sat, 31 Dec 2022 18:48:40 +0000 (+0400) Subject: add base of cache component X-Git-Tag: v0.1~33 X-Git-Url: https://git.puffer.fish/?a=commitdiff_plain;h=46fd26962ef55f8b557f7e36d3aee915a819c88c;p=matthieu%2Fnova.git add base of cache component --- diff --git a/Cargo.lock b/Cargo.lock index c1ba53e..943bb09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,32 +44,39 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" [[package]] -name = "async-channel" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-lock" -version = "2.6.0" +name = "async-nats" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685" +checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32" dependencies = [ - "event-listener", - "futures-lite", + "base64", + "base64-url", + "bytes", + "futures", + "http", + "itertools", + "itoa", + "lazy_static", + "nkeys", + "nuid", + "once_cell", + "regex", + "ring", + "rustls-native-certs", + "rustls-pemfile", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "subslice", + "time 0.3.17", + "tokio", + "tokio-retry", + "tokio-rustls", + "tracing", + "url", ] -[[package]] -name = "async-task" -version = "4.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524" - [[package]] name = "async-trait" version = "0.1.60" @@ -81,12 +88,6 @@ dependencies = [ "syn", ] -[[package]] -name = "atomic-waker" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" - [[package]] name = "atty" version = "0.2.14" @@ -149,20 +150,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c67b173a56acffd6d2326fb7ab938ba0b00a71480e14902b2591c87bc5741e8" -dependencies = [ - "async-channel", - "async-lock", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", -] - [[package]] name = "bollard-stubs" version = "1.41.0" @@ -196,12 +183,15 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" name = "cache" version = "0.1.0" dependencies = [ + "async-nats", + "futures-util", "log", - "nats", "redis", "serde", "serde_json", "shared", + "tokio", + "twilight-model", ] [[package]] @@ -256,15 +246,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "concurrent-queue" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "config" version = "0.13.3" @@ -330,25 +311,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" -dependencies = [ - "cfg-if", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" -dependencies = [ - "cfg-if", -] - [[package]] name = "crypto-common" version = "0.1.6" @@ -525,6 +487,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "either" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" + [[package]] name = "enumflags2" version = "0.7.5" @@ -580,12 +548,6 @@ dependencies = [ "libc", ] -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "fastrand" version = "1.8.0" @@ -684,21 +646,6 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" -[[package]] -name = "futures-lite" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.25" @@ -744,6 +691,7 @@ dependencies = [ name = "gateway" version = "0.1.0" dependencies = [ + "bytes", "futures", "serde", "serde_json", @@ -921,8 +869,8 @@ checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ "http", "hyper", - "rustls 0.20.7", - "rustls-native-certs 0.6.2", + "rustls", + "rustls-native-certs", "tokio", "tokio-rustls", ] @@ -1015,6 +963,15 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.5" @@ -1030,12 +987,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "json" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" - [[package]] name = "json5" version = "0.4.1" @@ -1170,42 +1121,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nats" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d877cd2e71146efa7065300fc5f5da967f938694b4d65e8bc64cc4a409092c" -dependencies = [ - "base64", - "base64-url", - "blocking", - "crossbeam-channel", - "fastrand", - "itoa", - "json", - "lazy_static", - "libc", - "log", - "memchr", - "nkeys", - "nuid", - "once_cell", - "parking_lot", - "regex", - "ring", - "rustls 0.19.1", - "rustls-native-certs 0.5.0", - "rustls-pemfile 0.2.1", - "serde", - "serde_json", - "serde_nanos", - "serde_repr", - "time 0.3.17", - "url", - "webpki 0.21.4", - "winapi", -] - [[package]] name = "nkeys" version = "0.2.0" @@ -1346,12 +1261,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "parking" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" - [[package]] name = "parking_lot" version = "0.12.1" @@ -1440,6 +1349,26 @@ dependencies = [ "sha1", ] +[[package]] +name = "pin-project" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -1744,19 +1673,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64", - "log", - "ring", - "sct 0.6.1", - "webpki 0.21.4", -] - [[package]] name = "rustls" version = "0.20.7" @@ -1765,20 +1681,8 @@ checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" dependencies = [ "log", "ring", - "sct 0.7.0", - "webpki 0.22.0", -] - -[[package]] -name = "rustls-native-certs" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" -dependencies = [ - "openssl-probe", - "rustls 0.19.1", - "schannel", - "security-framework", + "sct", + "webpki", ] [[package]] @@ -1788,20 +1692,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" dependencies = [ "openssl-probe", - "rustls-pemfile 1.0.1", + "rustls-pemfile", "schannel", "security-framework", ] -[[package]] -name = "rustls-pemfile" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" -dependencies = [ - "base64", -] - [[package]] name = "rustls-pemfile" version = "1.0.1" @@ -1839,16 +1734,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sct" version = "0.7.0" @@ -2021,12 +1906,12 @@ dependencies = [ name = "shared" version = "0.1.0" dependencies = [ + "async-nats", "config", "enumflags2", "hyper", "inner", "log", - "nats", "pretty_env_logger", "prometheus", "redis", @@ -2112,6 +1997,15 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "subslice" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a8e4809a3bb02de01f1f7faf1ba01a83af9e8eabcd4d31dd6e413d14d56aae" +dependencies = [ + "memchr", +] + [[package]] name = "subtle" version = "2.4.1" @@ -2295,15 +2189,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls 0.20.7", + "rustls", "tokio", - "webpki 0.22.0", + "webpki", ] [[package]] @@ -2314,12 +2219,12 @@ checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181" dependencies = [ "futures-util", "log", - "rustls 0.20.7", - "rustls-native-certs 0.6.2", + "rustls", + "rustls-native-certs", "tokio", "tokio-rustls", "tungstenite", - "webpki 0.22.0", + "webpki", ] [[package]] @@ -2402,12 +2307,12 @@ dependencies = [ "httparse", "log", "rand 0.8.5", - "rustls 0.20.7", + "rustls", "sha-1", "thiserror", "url", "utf-8", - "webpki 0.22.0", + "webpki", ] [[package]] @@ -2420,8 +2325,8 @@ dependencies = [ "flate2", "futures-util", "leaky-bucket-lite", - "rustls 0.20.7", - "rustls-native-certs 0.6.2", + "rustls", + "rustls-native-certs", "serde", "serde_json", "tokio", @@ -2577,12 +2482,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "want" version = "0.3.0" @@ -2693,16 +2592,6 @@ dependencies = [ "twilight-model", ] -[[package]] -name = "webpki" -version = "0.21.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki" version = "0.22.0" diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml index 853903b..61bb449 100644 --- a/exes/cache/Cargo.toml +++ b/exes/cache/Cargo.toml @@ -7,8 +7,11 @@ edition = "2018" [dependencies] shared = { path = "../../libs/shared" } -nats = "0.23.1" +async-nats = "0.25.1" +tokio = { version = "1", features = ["full"] } serde = { version = "1.0.8", features = ["derive"] } log = { version = "0.4", features = ["std"] } serde_json = { version = "1.0" } -redis = "*" \ No newline at end of file +redis = "*" +futures-util = "*" +twilight-model = "0.14" \ No newline at end of file diff --git a/exes/cache/src/config.rs b/exes/cache/src/config.rs index 37b1e73..2aa71a8 100644 --- a/exes/cache/src/config.rs +++ b/exes/cache/src/config.rs @@ -1,4 +1,6 @@ use serde::Deserialize; #[derive(Debug, Deserialize, Clone, Default)] -pub struct CacheConfiguration {} \ No newline at end of file +pub struct CacheConfiguration { + pub toggles: Vec +} \ No newline at end of file diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs index ab79dc9..a74cfa6 100644 --- a/exes/cache/src/main.rs +++ b/exes/cache/src/main.rs @@ -1,14 +1,201 @@ -use shared::config::Settings; +use std::error::Error; + +use async_nats::{Client, Subscriber}; +use futures_util::stream::StreamExt; use log::info; +use managers::{ + automoderation::Automoderation, bans::Bans, channels::Channels, + guild_schedules::GuildSchedules, guilds::Guilds, integrations::Integrations, invites::Invites, + members::Members, messages::Messages, reactions::Reactions, roles::Roles, + stage_instances::StageInstances, threads::Threads, CacheManager, +}; +use shared::{config::Settings, payloads::CachePayload}; +use twilight_model::gateway::event::DispatchEvent; use crate::config::CacheConfiguration; mod config; +mod managers; + +pub enum CacheSourcedEvents { + None, +} +#[derive(Default)] +struct MegaCache { + automoderation: Automoderation, + channels: Channels, + bans: Bans, + guild_schedules: GuildSchedules, + guilds: Guilds, + integrations: Integrations, + invites: Invites, + members: Members, + messages: Messages, + reactions: Reactions, + roles: Roles, + stage_instances: StageInstances, + threads: Threads, +} -fn main() { +#[tokio::main] +async fn main() -> Result<(), Box> { let settings: Settings = Settings::new("cache").unwrap(); info!("loaded configuration: {:?}", settings); - + let nats: Client = settings.nats.to_client().await?; + // let redis: redis::Client = settings.redis.into(); + + let mut cache = MegaCache::default(); + + let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?; + listen(&mut sub, &mut cache, settings.config.toggles).await; + Ok(()) +} + +async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec) { + while let Some(data) = sub.next().await { + let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap(); + let event = cp.data.data; + match event { + // Channel events + DispatchEvent::ChannelCreate(_) + | DispatchEvent::ChannelDelete(_) + | DispatchEvent::ChannelPinsUpdate(_) + | DispatchEvent::ChannelUpdate(_) + if features.contains(&"channels_cache".to_string()) => + { + cache.channels.handle(event); + } + + // Guild Cache + DispatchEvent::GuildCreate(_) + | DispatchEvent::GuildDelete(_) + | DispatchEvent::UnavailableGuild(_) + | DispatchEvent::GuildUpdate(_) + | DispatchEvent::WebhooksUpdate(_) + | DispatchEvent::GuildStickersUpdate(_) + | DispatchEvent::GuildEmojisUpdate(_) + | DispatchEvent::VoiceServerUpdate(_) + | DispatchEvent::GuildIntegrationsUpdate(_) + | DispatchEvent::CommandPermissionsUpdate(_) + if features.contains(&"guilds_cache".to_string()) => + { + cache.guilds.handle(event); + } + + // Guild Scheduled event + DispatchEvent::GuildScheduledEventCreate(_) + | DispatchEvent::GuildScheduledEventDelete(_) + | DispatchEvent::GuildScheduledEventUpdate(_) + | DispatchEvent::GuildScheduledEventUserAdd(_) + | DispatchEvent::GuildScheduledEventUserRemove(_) + if features.contains(&"guild_schedules_cache".to_string()) => + { + cache.guild_schedules.handle(event); + } + + // Stage events + DispatchEvent::StageInstanceCreate(_) + | DispatchEvent::StageInstanceDelete(_) + | DispatchEvent::StageInstanceUpdate(_) + if features.contains(&"stage_instances_cache".to_string()) => + { + cache.stage_instances.handle(event); + } + + // Integration events + DispatchEvent::IntegrationCreate(_) + | DispatchEvent::IntegrationDelete(_) + | DispatchEvent::IntegrationUpdate(_) + | DispatchEvent::InteractionCreate(_) + if features.contains(&"integrations_cache".to_string()) => + { + cache.integrations.handle(event); + } + + // Member events + DispatchEvent::MemberAdd(_) + | DispatchEvent::MemberRemove(_) + | DispatchEvent::MemberUpdate(_) + | DispatchEvent::MemberChunk(_) + | DispatchEvent::UserUpdate(_) + if features.contains(&"members_cache".to_string()) => + { + cache.members.handle(event); + } + + // Ban cache + DispatchEvent::BanAdd(_) | DispatchEvent::BanRemove(_) + if features.contains(&"bans_cache".to_string()) => + { + cache.bans.handle(event); + } + + // Reaction cache + DispatchEvent::ReactionAdd(_) + | DispatchEvent::ReactionRemove(_) + | DispatchEvent::ReactionRemoveAll(_) + | DispatchEvent::ReactionRemoveEmoji(_) + if features.contains(&"reactions_cache".to_string()) => + { + cache.reactions.handle(event); + } + + // Message cache + DispatchEvent::MessageCreate(_) + | DispatchEvent::MessageDelete(_) + | DispatchEvent::MessageDeleteBulk(_) + | DispatchEvent::MessageUpdate(_) + if features.contains(&"messages_cache".to_string()) => + { + cache.messages.handle(event); + } + + // Thread cache + DispatchEvent::ThreadCreate(_) + | DispatchEvent::ThreadDelete(_) + | DispatchEvent::ThreadListSync(_) + | DispatchEvent::ThreadMemberUpdate(_) + | DispatchEvent::ThreadMembersUpdate(_) + | DispatchEvent::ThreadUpdate(_) + if features.contains(&"threads_cache".to_string()) => + { + cache.threads.handle(event); + } + + // Invite cache + DispatchEvent::InviteCreate(_) | DispatchEvent::InviteDelete(_) + if features.contains(&"invites_cache".to_string()) => + { + cache.invites.handle(event); + } + + // Roles cache + DispatchEvent::RoleCreate(_) + | DispatchEvent::RoleDelete(_) + | DispatchEvent::RoleUpdate(_) + if features.contains(&"roles_cache".to_string()) => + { + cache.roles.handle(event); + } + + // Automod rules + DispatchEvent::AutoModerationRuleCreate(_) + | DispatchEvent::AutoModerationRuleDelete(_) + | DispatchEvent::AutoModerationRuleUpdate(_) + if features.contains(&"automoderation_cache".to_string()) => + { + cache.automoderation.handle(event); + } + + // Voice State + DispatchEvent::VoiceStateUpdate(_) + if features.contains(&"voice_states_cache".to_string()) => {} + + _ => { + // just forward + } + } + } } diff --git a/exes/cache/src/managers/automoderation.rs b/exes/cache/src/managers/automoderation.rs new file mode 100644 index 0000000..4a1b119 --- /dev/null +++ b/exes/cache/src/managers/automoderation.rs @@ -0,0 +1,26 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + +#[derive(Default)] +pub struct Automoderation {} +impl CacheManager for Automoderation { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::AutoModerationRuleCreate(_) => {} + DispatchEvent::AutoModerationRuleDelete(_) => {} + DispatchEvent::AutoModerationRuleUpdate(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/bans.rs b/exes/cache/src/managers/bans.rs new file mode 100644 index 0000000..27e6a34 --- /dev/null +++ b/exes/cache/src/managers/bans.rs @@ -0,0 +1,26 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Bans {} +impl CacheManager for Bans { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::BanAdd(_) => {} + DispatchEvent::BanRemove(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/channels.rs b/exes/cache/src/managers/channels.rs new file mode 100644 index 0000000..fe34acc --- /dev/null +++ b/exes/cache/src/managers/channels.rs @@ -0,0 +1,28 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Channels {} +impl CacheManager for Channels { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::ChannelCreate(_) => {} + DispatchEvent::ChannelDelete(_) => {} + DispatchEvent::ChannelPinsUpdate(_) => {} + DispatchEvent::ChannelUpdate(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/guild_schedules.rs b/exes/cache/src/managers/guild_schedules.rs new file mode 100644 index 0000000..bcc79c5 --- /dev/null +++ b/exes/cache/src/managers/guild_schedules.rs @@ -0,0 +1,29 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct GuildSchedules {} +impl CacheManager for GuildSchedules { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::GuildScheduledEventCreate(_) => {} + DispatchEvent::GuildScheduledEventDelete(_) => {} + DispatchEvent::GuildScheduledEventUpdate(_) => {} + DispatchEvent::GuildScheduledEventUserAdd(_) => {} + DispatchEvent::GuildScheduledEventUserRemove(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/guilds.rs b/exes/cache/src/managers/guilds.rs new file mode 100644 index 0000000..3f5f4c4 --- /dev/null +++ b/exes/cache/src/managers/guilds.rs @@ -0,0 +1,34 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Guilds {} +impl CacheManager for Guilds { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::GuildCreate(_) => {}, + DispatchEvent::GuildDelete(_) => {}, + DispatchEvent::UnavailableGuild(_) => {}, + DispatchEvent::GuildUpdate(_) => {}, + DispatchEvent::WebhooksUpdate(_) => {}, + DispatchEvent::GuildStickersUpdate(_) => {}, + DispatchEvent::GuildEmojisUpdate(_) => {}, + DispatchEvent::VoiceServerUpdate(_) => {}, + DispatchEvent::GuildIntegrationsUpdate(_) => {}, + DispatchEvent::CommandPermissionsUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/integrations.rs b/exes/cache/src/managers/integrations.rs new file mode 100644 index 0000000..99d292e --- /dev/null +++ b/exes/cache/src/managers/integrations.rs @@ -0,0 +1,28 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Integrations {} +impl CacheManager for Integrations { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::IntegrationCreate(_) => {} + DispatchEvent::IntegrationDelete(_) => {} + DispatchEvent::IntegrationUpdate(_) => {} + DispatchEvent::InteractionCreate(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/invites.rs b/exes/cache/src/managers/invites.rs new file mode 100644 index 0000000..21da64f --- /dev/null +++ b/exes/cache/src/managers/invites.rs @@ -0,0 +1,26 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Invites {} +impl CacheManager for Invites { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::InviteCreate(_) => {} + DispatchEvent::InviteDelete(_) => {} + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/members.rs b/exes/cache/src/managers/members.rs new file mode 100644 index 0000000..3a483f1 --- /dev/null +++ b/exes/cache/src/managers/members.rs @@ -0,0 +1,29 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Members {} +impl CacheManager for Members { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::MemberAdd(_) => {}, + DispatchEvent::MemberRemove(_) => {}, + DispatchEvent::MemberUpdate(_) => {}, + DispatchEvent::MemberChunk(_) => {}, + DispatchEvent::UserUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/messages.rs b/exes/cache/src/managers/messages.rs new file mode 100644 index 0000000..7b06ae7 --- /dev/null +++ b/exes/cache/src/managers/messages.rs @@ -0,0 +1,28 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Messages {} +impl CacheManager for Messages { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::MessageCreate(_) => {}, + DispatchEvent::MessageDelete(_) => {}, + DispatchEvent::MessageDeleteBulk(_) => {}, + DispatchEvent::MessageUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/mod.rs b/exes/cache/src/managers/mod.rs new file mode 100644 index 0000000..370cda6 --- /dev/null +++ b/exes/cache/src/managers/mod.rs @@ -0,0 +1,23 @@ +use std::pin::Pin; +use twilight_model::gateway::event::DispatchEvent; +use std::future::Future; + +use crate::CacheSourcedEvents; + +pub mod channels; +pub mod guilds; +pub mod guild_schedules; +pub mod stage_instances; +pub mod integrations; +pub mod members; +pub mod bans; +pub mod reactions; +pub mod messages; +pub mod threads; +pub mod invites; +pub mod roles; +pub mod automoderation; + +pub trait CacheManager { + fn handle(&self, event: DispatchEvent) -> Pin>>; +} diff --git a/exes/cache/src/managers/reactions.rs b/exes/cache/src/managers/reactions.rs new file mode 100644 index 0000000..5d21e0b --- /dev/null +++ b/exes/cache/src/managers/reactions.rs @@ -0,0 +1,28 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Reactions {} +impl CacheManager for Reactions { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::ReactionAdd(_) => {}, + DispatchEvent::ReactionRemove(_) => {}, + DispatchEvent::ReactionRemoveAll(_) => {}, + DispatchEvent::ReactionRemoveEmoji(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/roles.rs b/exes/cache/src/managers/roles.rs new file mode 100644 index 0000000..5fa0f22 --- /dev/null +++ b/exes/cache/src/managers/roles.rs @@ -0,0 +1,27 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct Roles {} +impl CacheManager for Roles { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::RoleCreate(_) => {}, + DispatchEvent::RoleDelete(_) => {}, + DispatchEvent::RoleUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/stage_instances.rs b/exes/cache/src/managers/stage_instances.rs new file mode 100644 index 0000000..314d089 --- /dev/null +++ b/exes/cache/src/managers/stage_instances.rs @@ -0,0 +1,27 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + + +#[derive(Default)] +pub struct StageInstances {} +impl CacheManager for StageInstances { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::StageInstanceCreate(_) => {}, + DispatchEvent::StageInstanceDelete(_) => {}, + DispatchEvent::StageInstanceUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/cache/src/managers/threads.rs b/exes/cache/src/managers/threads.rs new file mode 100644 index 0000000..d4efc2e --- /dev/null +++ b/exes/cache/src/managers/threads.rs @@ -0,0 +1,29 @@ +use twilight_model::gateway::event::DispatchEvent; + +use crate::CacheSourcedEvents; + +use super::CacheManager; +use std::future::Future; + +#[derive(Default)] +pub struct Threads {} +impl CacheManager for Threads { + fn handle( + &self, + event: twilight_model::gateway::event::DispatchEvent, + ) -> std::pin::Pin>> { + Box::pin(async move { + match event { + DispatchEvent::ThreadCreate(_) => {}, + DispatchEvent::ThreadDelete(_) => {}, + DispatchEvent::ThreadListSync(_) => {}, + DispatchEvent::ThreadMemberUpdate(_) => {}, + DispatchEvent::ThreadMembersUpdate(_) => {}, + DispatchEvent::ThreadUpdate(_) => {}, + _ => unreachable!(), + }; + + CacheSourcedEvents::None + }) + } +} diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml index 27269c1..7ef3d98 100644 --- a/exes/gateway/Cargo.toml +++ b/exes/gateway/Cargo.toml @@ -10,4 +10,5 @@ twilight-gateway = { version = "0.14" } twilight-model = "0.14" serde = { version = "1.0.8", features = ["derive"] } futures = "0.3" -serde_json = { version = "1.0" } \ No newline at end of file +serde_json = { version = "1.0" } +bytes = "*" \ No newline at end of file diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index 93ca022..6968fe4 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -2,7 +2,7 @@ use config::Config; use shared::{ config::Settings, log::{debug, info}, - nats_crate::Connection, + nats_crate::Client, payloads::{CachePayload, DispatchEventTagged, Tracing}, }; use std::{convert::TryFrom, error::Error}; @@ -15,7 +15,7 @@ use twilight_model::gateway::event::DispatchEvent; async fn main() -> Result<(), Box> { let settings: Settings = Settings::new("gateway").unwrap(); let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents); - let nats: Connection = settings.nats.into(); + let nats: Client = settings.nats.to_client().await?; shard.start().await?; @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { } _ => { - let name = event.kind().name().unwrap(); + let name = event.kind().name(); if let Ok(dispatch_event) = DispatchEvent::try_from(event) { let data = CachePayload { tracing: Tracing { @@ -39,7 +39,9 @@ async fn main() -> Result<(), Box> { }; let value = serde_json::to_string(&data)?; debug!("nats send: {}", value); - nats.publish(&format!("nova.cache.dispatch.{}", name), value)?; + let bytes = bytes::Bytes::from(value); + nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) + .await?; } } } diff --git a/exes/webhook/src/handler/handler.rs b/exes/webhook/src/handler/handler.rs index 3294c22..b2ef44c 100644 --- a/exes/webhook/src/handler/handler.rs +++ b/exes/webhook/src/handler/handler.rs @@ -8,7 +8,7 @@ use hyper::{ Body, Method, Request, Response, StatusCode, }; use serde::{Deserialize, Serialize}; -use shared::nats_crate::Connection; +use shared::nats_crate::Client; use shared::{ log::{debug, error}, payloads::{CachePayload, DispatchEventTagged, Tracing}, @@ -19,7 +19,6 @@ use std::{ str::from_utf8, sync::Arc, task::{Context, Poll}, - time::Duration, }; use twilight_model::gateway::event::{DispatchEvent}; use twilight_model::{ @@ -31,7 +30,7 @@ use twilight_model::{ #[derive(Clone)] pub struct HandlerService { pub config: Arc, - pub nats: Arc, + pub nats: Arc, pub public_key: Arc, } @@ -107,14 +106,13 @@ impl HandlerService { let payload = serde_json::to_string(&data).unwrap(); - match self.nats.request_timeout( - "nova.cache.dispatch.INTERACTION_CREATE", - payload, - Duration::from_secs(2), - ) { + match self.nats.request( + "nova.cache.dispatch.INTERACTION_CREATE".to_string(), + Bytes::from(payload), + ).await { Ok(response) => Ok(Response::builder() .header("Content-Type", "application/json") - .body(Body::from(response.data)) + .body(Body::from(response.reply.unwrap())) .unwrap()), Err(error) => { diff --git a/exes/webhook/src/handler/make_service.rs b/exes/webhook/src/handler/make_service.rs index 2774917..48672a1 100644 --- a/exes/webhook/src/handler/make_service.rs +++ b/exes/webhook/src/handler/make_service.rs @@ -1,7 +1,7 @@ use super::handler::HandlerService; use crate::config::Config; use hyper::service::Service; -use shared::nats_crate::Connection; +use shared::nats_crate::Client; use std::{ future::{ready, Ready}, sync::Arc, @@ -11,7 +11,7 @@ use ed25519_dalek::PublicKey; pub struct MakeSvc { pub settings: Arc, - pub nats: Arc, + pub nats: Arc, pub public_key: Arc } diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs index 9527d0f..336dd82 100644 --- a/exes/webhook/src/main.rs +++ b/exes/webhook/src/main.rs @@ -4,10 +4,10 @@ mod handler; use crate::handler::make_service::MakeSvc; use crate::config::Config; -use shared::config::Settings; -use shared::log::{error, info}; use ed25519_dalek::PublicKey; use hyper::Server; +use shared::config::Settings; +use shared::log::{error, info}; #[tokio::main] async fn main() { @@ -35,7 +35,7 @@ async fn start(settings: Settings) { Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap()); let server = Server::bind(&addr).serve(MakeSvc { settings: config, - nats: Arc::new(settings.nats.into()), + nats: Arc::new(settings.nats.to_client().await.unwrap()), public_key: public_key, }); diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml index b3cdee7..6d6b6f6 100644 --- a/libs/shared/Cargo.toml +++ b/libs/shared/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "shared" version = "0.1.0" -edition = "2018" +edition = "2021" [dependencies] pretty_env_logger = "0.4" @@ -13,7 +13,7 @@ hyper = { version = "0.14", features = ["full"] } tokio = { version = "1", features = ["full"] } enumflags2 = { version = "0.7.1", features = ["serde"] } prometheus = { version = "0.13", features = ["process"] } -nats = "0.23" +async-nats = "0.25.1" testcontainers = "0.14" twilight-model = "0.14" serde_json = { version = "1.0" } diff --git a/libs/shared/src/error.rs b/libs/shared/src/error.rs index 0899579..31b1dcd 100644 --- a/libs/shared/src/error.rs +++ b/libs/shared/src/error.rs @@ -1,5 +1,5 @@ use config::ConfigError; -use std::fmt::Debug; +use std::{fmt::Debug, io}; use thiserror::Error; #[derive(Debug, Error)] @@ -12,4 +12,7 @@ pub enum GenericError { #[error("step `{0}` failed")] StepFailed(String), + + #[error("io error")] + Io(#[from] io::Error) } diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index 58df5e1..62d8689 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -1,5 +1,5 @@ pub use ::config as config_crate; -pub use ::nats as nats_crate; +pub use ::async_nats as nats_crate; pub use ::redis as redis_crate; pub use log; pub use prometheus; diff --git a/libs/shared/src/nats.rs b/libs/shared/src/nats.rs index 0616398..05953cc 100644 --- a/libs/shared/src/nats.rs +++ b/libs/shared/src/nats.rs @@ -1,5 +1,8 @@ -use nats::{Connection, Options}; +use async_nats::Client; use serde::Deserialize; +use std::future::Future; + +use crate::error::GenericError; #[derive(Clone, Debug, Deserialize)] pub struct NatsConfigurationClientCert { @@ -14,46 +17,13 @@ pub struct NatsConfigurationTls { #[derive(Clone, Debug, Deserialize)] pub struct NatsConfiguration { - pub client_cert: Option, - pub root_cert: Option>, - pub jetstream_api_prefix: Option, - pub max_reconnects: Option, - pub reconnect_buffer_size: Option, - pub tls: Option, - pub client_name: Option, - pub tls_required: Option, pub host: String, } // todo: Prefer From since it automatically gives a free Into implementation // Allows the configuration to directly create a nats connection -impl Into for NatsConfiguration { - fn into(self) -> Connection { - let mut options = Options::new(); - - if let Some(client_cert) = self.client_cert { - options = options.client_cert(client_cert.cert, client_cert.key); - } - - if let Some(root_certs) = self.root_cert { - for root_cert in root_certs { - options = options.add_root_certificate(root_cert); - } - } - - options = options.max_reconnects(self.max_reconnects); - options = options.no_echo(); - options = options.reconnect_buffer_size(self.reconnect_buffer_size.unwrap_or(64 * 1024)); - options = options.tls_required(self.tls_required.unwrap_or(false)); - options = options.with_name(&self.client_name.unwrap_or_else(|| "Nova".to_string())); - - if let Some(tls) = self.tls { - let mut config = nats::rustls::ClientConfig::new(); - config.set_mtu(&tls.mtu); - // todo: more options? - options = options.tls_client_config(config); - } - - options.connect(&self.host).unwrap() +impl NatsConfiguration { + pub async fn to_client(self) -> Result { + Ok(async_nats::connect(self.host).await?) } -} +} \ No newline at end of file