checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
[[package]]
-name = "async-channel"
-version = "1.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
-dependencies = [
- "concurrent-queue",
- "event-listener",
- "futures-core",
-]
-
-[[package]]
-name = "async-lock"
-version = "2.6.0"
+name = "async-nats"
+version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685"
+checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32"
dependencies = [
- "event-listener",
- "futures-lite",
+ "base64",
+ "base64-url",
+ "bytes",
+ "futures",
+ "http",
+ "itertools",
+ "itoa",
+ "lazy_static",
+ "nkeys",
+ "nuid",
+ "once_cell",
+ "regex",
+ "ring",
+ "rustls-native-certs",
+ "rustls-pemfile",
+ "serde",
+ "serde_json",
+ "serde_nanos",
+ "serde_repr",
+ "subslice",
+ "time 0.3.17",
+ "tokio",
+ "tokio-retry",
+ "tokio-rustls",
+ "tracing",
+ "url",
]
-[[package]]
-name = "async-task"
-version = "4.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524"
-
[[package]]
name = "async-trait"
version = "0.1.60"
"syn",
]
-[[package]]
-name = "atomic-waker"
-version = "1.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
-
[[package]]
name = "atty"
version = "0.2.14"
"generic-array",
]
-[[package]]
-name = "blocking"
-version = "1.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3c67b173a56acffd6d2326fb7ab938ba0b00a71480e14902b2591c87bc5741e8"
-dependencies = [
- "async-channel",
- "async-lock",
- "async-task",
- "atomic-waker",
- "fastrand",
- "futures-lite",
-]
-
[[package]]
name = "bollard-stubs"
version = "1.41.0"
name = "cache"
version = "0.1.0"
dependencies = [
+ "async-nats",
+ "futures-util",
"log",
- "nats",
"redis",
"serde",
"serde_json",
"shared",
+ "tokio",
+ "twilight-model",
]
[[package]]
"tokio-util",
]
-[[package]]
-name = "concurrent-queue"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b"
-dependencies = [
- "crossbeam-utils",
-]
-
[[package]]
name = "config"
version = "0.13.3"
"cfg-if",
]
-[[package]]
-name = "crossbeam-channel"
-version = "0.5.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
-dependencies = [
- "cfg-if",
- "crossbeam-utils",
-]
-
-[[package]]
-name = "crossbeam-utils"
-version = "0.8.14"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"
-dependencies = [
- "cfg-if",
-]
-
[[package]]
name = "crypto-common"
version = "0.1.6"
"zeroize",
]
+[[package]]
+name = "either"
+version = "1.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
+
[[package]]
name = "enumflags2"
version = "0.7.5"
"libc",
]
-[[package]]
-name = "event-listener"
-version = "2.5.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
-
[[package]]
name = "fastrand"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
-[[package]]
-name = "futures-lite"
-version = "1.12.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
-dependencies = [
- "fastrand",
- "futures-core",
- "futures-io",
- "memchr",
- "parking",
- "pin-project-lite",
- "waker-fn",
-]
-
[[package]]
name = "futures-macro"
version = "0.3.25"
name = "gateway"
version = "0.1.0"
dependencies = [
+ "bytes",
"futures",
"serde",
"serde_json",
dependencies = [
"http",
"hyper",
- "rustls 0.20.7",
- "rustls-native-certs 0.6.2",
+ "rustls",
+ "rustls-native-certs",
"tokio",
"tokio-rustls",
]
"windows-sys 0.42.0",
]
+[[package]]
+name = "itertools"
+version = "0.10.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
+dependencies = [
+ "either",
+]
+
[[package]]
name = "itoa"
version = "1.0.5"
"wasm-bindgen",
]
-[[package]]
-name = "json"
-version = "0.12.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd"
-
[[package]]
name = "json5"
version = "0.4.1"
"tempfile",
]
-[[package]]
-name = "nats"
-version = "0.23.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b3d877cd2e71146efa7065300fc5f5da967f938694b4d65e8bc64cc4a409092c"
-dependencies = [
- "base64",
- "base64-url",
- "blocking",
- "crossbeam-channel",
- "fastrand",
- "itoa",
- "json",
- "lazy_static",
- "libc",
- "log",
- "memchr",
- "nkeys",
- "nuid",
- "once_cell",
- "parking_lot",
- "regex",
- "ring",
- "rustls 0.19.1",
- "rustls-native-certs 0.5.0",
- "rustls-pemfile 0.2.1",
- "serde",
- "serde_json",
- "serde_nanos",
- "serde_repr",
- "time 0.3.17",
- "url",
- "webpki 0.21.4",
- "winapi",
-]
-
[[package]]
name = "nkeys"
version = "0.2.0"
"hashbrown",
]
-[[package]]
-name = "parking"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
-
[[package]]
name = "parking_lot"
version = "0.12.1"
"sha1",
]
+[[package]]
+name = "pin-project"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "pin-project-lite"
version = "0.2.9"
"windows-sys 0.42.0",
]
-[[package]]
-name = "rustls"
-version = "0.19.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
-dependencies = [
- "base64",
- "log",
- "ring",
- "sct 0.6.1",
- "webpki 0.21.4",
-]
-
[[package]]
name = "rustls"
version = "0.20.7"
dependencies = [
"log",
"ring",
- "sct 0.7.0",
- "webpki 0.22.0",
-]
-
-[[package]]
-name = "rustls-native-certs"
-version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092"
-dependencies = [
- "openssl-probe",
- "rustls 0.19.1",
- "schannel",
- "security-framework",
+ "sct",
+ "webpki",
]
[[package]]
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
dependencies = [
"openssl-probe",
- "rustls-pemfile 1.0.1",
+ "rustls-pemfile",
"schannel",
"security-framework",
]
-[[package]]
-name = "rustls-pemfile"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
-dependencies = [
- "base64",
-]
-
[[package]]
name = "rustls-pemfile"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
-[[package]]
-name = "sct"
-version = "0.6.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
-dependencies = [
- "ring",
- "untrusted",
-]
-
[[package]]
name = "sct"
version = "0.7.0"
name = "shared"
version = "0.1.0"
dependencies = [
+ "async-nats",
"config",
"enumflags2",
"hyper",
"inner",
"log",
- "nats",
"pretty_env_logger",
"prometheus",
"redis",
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
+[[package]]
+name = "subslice"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0a8e4809a3bb02de01f1f7faf1ba01a83af9e8eabcd4d31dd6e413d14d56aae"
+dependencies = [
+ "memchr",
+]
+
[[package]]
name = "subtle"
version = "2.4.1"
"tokio",
]
+[[package]]
+name = "tokio-retry"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
+dependencies = [
+ "pin-project",
+ "rand 0.8.5",
+ "tokio",
+]
+
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
- "rustls 0.20.7",
+ "rustls",
"tokio",
- "webpki 0.22.0",
+ "webpki",
]
[[package]]
dependencies = [
"futures-util",
"log",
- "rustls 0.20.7",
- "rustls-native-certs 0.6.2",
+ "rustls",
+ "rustls-native-certs",
"tokio",
"tokio-rustls",
"tungstenite",
- "webpki 0.22.0",
+ "webpki",
]
[[package]]
"httparse",
"log",
"rand 0.8.5",
- "rustls 0.20.7",
+ "rustls",
"sha-1",
"thiserror",
"url",
"utf-8",
- "webpki 0.22.0",
+ "webpki",
]
[[package]]
"flate2",
"futures-util",
"leaky-bucket-lite",
- "rustls 0.20.7",
- "rustls-native-certs 0.6.2",
+ "rustls",
+ "rustls-native-certs",
"serde",
"serde_json",
"tokio",
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
-[[package]]
-name = "waker-fn"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
-
[[package]]
name = "want"
version = "0.3.0"
"twilight-model",
]
-[[package]]
-name = "webpki"
-version = "0.21.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
-dependencies = [
- "ring",
- "untrusted",
-]
-
[[package]]
name = "webpki"
version = "0.22.0"
[dependencies]
shared = { path = "../../libs/shared" }
-nats = "0.23.1"
+async-nats = "0.25.1"
+tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.8", features = ["derive"] }
log = { version = "0.4", features = ["std"] }
serde_json = { version = "1.0" }
-redis = "*"
\ No newline at end of file
+redis = "*"
+futures-util = "*"
+twilight-model = "0.14"
\ No newline at end of file
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone, Default)]
-pub struct CacheConfiguration {}
\ No newline at end of file
+pub struct CacheConfiguration {
+ pub toggles: Vec<String>
+}
\ No newline at end of file
-use shared::config::Settings;
+use std::error::Error;
+
+use async_nats::{Client, Subscriber};
+use futures_util::stream::StreamExt;
use log::info;
+use managers::{
+ automoderation::Automoderation, bans::Bans, channels::Channels,
+ guild_schedules::GuildSchedules, guilds::Guilds, integrations::Integrations, invites::Invites,
+ members::Members, messages::Messages, reactions::Reactions, roles::Roles,
+ stage_instances::StageInstances, threads::Threads, CacheManager,
+};
+use shared::{config::Settings, payloads::CachePayload};
+use twilight_model::gateway::event::DispatchEvent;
use crate::config::CacheConfiguration;
mod config;
+mod managers;
+
+pub enum CacheSourcedEvents {
+ None,
+}
+#[derive(Default)]
+struct MegaCache {
+ automoderation: Automoderation,
+ channels: Channels,
+ bans: Bans,
+ guild_schedules: GuildSchedules,
+ guilds: Guilds,
+ integrations: Integrations,
+ invites: Invites,
+ members: Members,
+ messages: Messages,
+ reactions: Reactions,
+ roles: Roles,
+ stage_instances: StageInstances,
+ threads: Threads,
+}
-fn main() {
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();
info!("loaded configuration: {:?}", settings);
-
+ let nats: Client = settings.nats.to_client().await?;
+ // let redis: redis::Client = settings.redis.into();
+
+ let mut cache = MegaCache::default();
+
+ let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?;
+ listen(&mut sub, &mut cache, settings.config.toggles).await;
+ Ok(())
+}
+
+async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec<String>) {
+ while let Some(data) = sub.next().await {
+ let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap();
+ let event = cp.data.data;
+ match event {
+ // Channel events
+ DispatchEvent::ChannelCreate(_)
+ | DispatchEvent::ChannelDelete(_)
+ | DispatchEvent::ChannelPinsUpdate(_)
+ | DispatchEvent::ChannelUpdate(_)
+ if features.contains(&"channels_cache".to_string()) =>
+ {
+ cache.channels.handle(event);
+ }
+
+ // Guild Cache
+ DispatchEvent::GuildCreate(_)
+ | DispatchEvent::GuildDelete(_)
+ | DispatchEvent::UnavailableGuild(_)
+ | DispatchEvent::GuildUpdate(_)
+ | DispatchEvent::WebhooksUpdate(_)
+ | DispatchEvent::GuildStickersUpdate(_)
+ | DispatchEvent::GuildEmojisUpdate(_)
+ | DispatchEvent::VoiceServerUpdate(_)
+ | DispatchEvent::GuildIntegrationsUpdate(_)
+ | DispatchEvent::CommandPermissionsUpdate(_)
+ if features.contains(&"guilds_cache".to_string()) =>
+ {
+ cache.guilds.handle(event);
+ }
+
+ // Guild Scheduled event
+ DispatchEvent::GuildScheduledEventCreate(_)
+ | DispatchEvent::GuildScheduledEventDelete(_)
+ | DispatchEvent::GuildScheduledEventUpdate(_)
+ | DispatchEvent::GuildScheduledEventUserAdd(_)
+ | DispatchEvent::GuildScheduledEventUserRemove(_)
+ if features.contains(&"guild_schedules_cache".to_string()) =>
+ {
+ cache.guild_schedules.handle(event);
+ }
+
+ // Stage events
+ DispatchEvent::StageInstanceCreate(_)
+ | DispatchEvent::StageInstanceDelete(_)
+ | DispatchEvent::StageInstanceUpdate(_)
+ if features.contains(&"stage_instances_cache".to_string()) =>
+ {
+ cache.stage_instances.handle(event);
+ }
+
+ // Integration events
+ DispatchEvent::IntegrationCreate(_)
+ | DispatchEvent::IntegrationDelete(_)
+ | DispatchEvent::IntegrationUpdate(_)
+ | DispatchEvent::InteractionCreate(_)
+ if features.contains(&"integrations_cache".to_string()) =>
+ {
+ cache.integrations.handle(event);
+ }
+
+ // Member events
+ DispatchEvent::MemberAdd(_)
+ | DispatchEvent::MemberRemove(_)
+ | DispatchEvent::MemberUpdate(_)
+ | DispatchEvent::MemberChunk(_)
+ | DispatchEvent::UserUpdate(_)
+ if features.contains(&"members_cache".to_string()) =>
+ {
+ cache.members.handle(event);
+ }
+
+ // Ban cache
+ DispatchEvent::BanAdd(_) | DispatchEvent::BanRemove(_)
+ if features.contains(&"bans_cache".to_string()) =>
+ {
+ cache.bans.handle(event);
+ }
+
+ // Reaction cache
+ DispatchEvent::ReactionAdd(_)
+ | DispatchEvent::ReactionRemove(_)
+ | DispatchEvent::ReactionRemoveAll(_)
+ | DispatchEvent::ReactionRemoveEmoji(_)
+ if features.contains(&"reactions_cache".to_string()) =>
+ {
+ cache.reactions.handle(event);
+ }
+
+ // Message cache
+ DispatchEvent::MessageCreate(_)
+ | DispatchEvent::MessageDelete(_)
+ | DispatchEvent::MessageDeleteBulk(_)
+ | DispatchEvent::MessageUpdate(_)
+ if features.contains(&"messages_cache".to_string()) =>
+ {
+ cache.messages.handle(event);
+ }
+
+ // Thread cache
+ DispatchEvent::ThreadCreate(_)
+ | DispatchEvent::ThreadDelete(_)
+ | DispatchEvent::ThreadListSync(_)
+ | DispatchEvent::ThreadMemberUpdate(_)
+ | DispatchEvent::ThreadMembersUpdate(_)
+ | DispatchEvent::ThreadUpdate(_)
+ if features.contains(&"threads_cache".to_string()) =>
+ {
+ cache.threads.handle(event);
+ }
+
+ // Invite cache
+ DispatchEvent::InviteCreate(_) | DispatchEvent::InviteDelete(_)
+ if features.contains(&"invites_cache".to_string()) =>
+ {
+ cache.invites.handle(event);
+ }
+
+ // Roles cache
+ DispatchEvent::RoleCreate(_)
+ | DispatchEvent::RoleDelete(_)
+ | DispatchEvent::RoleUpdate(_)
+ if features.contains(&"roles_cache".to_string()) =>
+ {
+ cache.roles.handle(event);
+ }
+
+ // Automod rules
+ DispatchEvent::AutoModerationRuleCreate(_)
+ | DispatchEvent::AutoModerationRuleDelete(_)
+ | DispatchEvent::AutoModerationRuleUpdate(_)
+ if features.contains(&"automoderation_cache".to_string()) =>
+ {
+ cache.automoderation.handle(event);
+ }
+
+ // Voice State
+ DispatchEvent::VoiceStateUpdate(_)
+ if features.contains(&"voice_states_cache".to_string()) => {}
+
+ _ => {
+ // just forward
+ }
+ }
+ }
}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+#[derive(Default)]
+pub struct Automoderation {}
+impl CacheManager for Automoderation {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::AutoModerationRuleCreate(_) => {}
+ DispatchEvent::AutoModerationRuleDelete(_) => {}
+ DispatchEvent::AutoModerationRuleUpdate(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Bans {}
+impl CacheManager for Bans {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::BanAdd(_) => {}
+ DispatchEvent::BanRemove(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Channels {}
+impl CacheManager for Channels {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::ChannelCreate(_) => {}
+ DispatchEvent::ChannelDelete(_) => {}
+ DispatchEvent::ChannelPinsUpdate(_) => {}
+ DispatchEvent::ChannelUpdate(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct GuildSchedules {}
+impl CacheManager for GuildSchedules {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::GuildScheduledEventCreate(_) => {}
+ DispatchEvent::GuildScheduledEventDelete(_) => {}
+ DispatchEvent::GuildScheduledEventUpdate(_) => {}
+ DispatchEvent::GuildScheduledEventUserAdd(_) => {}
+ DispatchEvent::GuildScheduledEventUserRemove(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Guilds {}
+impl CacheManager for Guilds {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::GuildCreate(_) => {},
+ DispatchEvent::GuildDelete(_) => {},
+ DispatchEvent::UnavailableGuild(_) => {},
+ DispatchEvent::GuildUpdate(_) => {},
+ DispatchEvent::WebhooksUpdate(_) => {},
+ DispatchEvent::GuildStickersUpdate(_) => {},
+ DispatchEvent::GuildEmojisUpdate(_) => {},
+ DispatchEvent::VoiceServerUpdate(_) => {},
+ DispatchEvent::GuildIntegrationsUpdate(_) => {},
+ DispatchEvent::CommandPermissionsUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Integrations {}
+impl CacheManager for Integrations {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::IntegrationCreate(_) => {}
+ DispatchEvent::IntegrationDelete(_) => {}
+ DispatchEvent::IntegrationUpdate(_) => {}
+ DispatchEvent::InteractionCreate(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Invites {}
+impl CacheManager for Invites {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::InviteCreate(_) => {}
+ DispatchEvent::InviteDelete(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Members {}
+impl CacheManager for Members {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::MemberAdd(_) => {},
+ DispatchEvent::MemberRemove(_) => {},
+ DispatchEvent::MemberUpdate(_) => {},
+ DispatchEvent::MemberChunk(_) => {},
+ DispatchEvent::UserUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Messages {}
+impl CacheManager for Messages {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::MessageCreate(_) => {},
+ DispatchEvent::MessageDelete(_) => {},
+ DispatchEvent::MessageDeleteBulk(_) => {},
+ DispatchEvent::MessageUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use std::pin::Pin;
+use twilight_model::gateway::event::DispatchEvent;
+use std::future::Future;
+
+use crate::CacheSourcedEvents;
+
+pub mod channels;
+pub mod guilds;
+pub mod guild_schedules;
+pub mod stage_instances;
+pub mod integrations;
+pub mod members;
+pub mod bans;
+pub mod reactions;
+pub mod messages;
+pub mod threads;
+pub mod invites;
+pub mod roles;
+pub mod automoderation;
+
+pub trait CacheManager {
+ fn handle(&self, event: DispatchEvent) -> Pin<Box<dyn Future<Output = CacheSourcedEvents>>>;
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Reactions {}
+impl CacheManager for Reactions {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::ReactionAdd(_) => {},
+ DispatchEvent::ReactionRemove(_) => {},
+ DispatchEvent::ReactionRemoveAll(_) => {},
+ DispatchEvent::ReactionRemoveEmoji(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Roles {}
+impl CacheManager for Roles {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::RoleCreate(_) => {},
+ DispatchEvent::RoleDelete(_) => {},
+ DispatchEvent::RoleUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct StageInstances {}
+impl CacheManager for StageInstances {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::StageInstanceCreate(_) => {},
+ DispatchEvent::StageInstanceDelete(_) => {},
+ DispatchEvent::StageInstanceUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
--- /dev/null
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+#[derive(Default)]
+pub struct Threads {}
+impl CacheManager for Threads {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::ThreadCreate(_) => {},
+ DispatchEvent::ThreadDelete(_) => {},
+ DispatchEvent::ThreadListSync(_) => {},
+ DispatchEvent::ThreadMemberUpdate(_) => {},
+ DispatchEvent::ThreadMembersUpdate(_) => {},
+ DispatchEvent::ThreadUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
twilight-model = "0.14"
serde = { version = "1.0.8", features = ["derive"] }
futures = "0.3"
-serde_json = { version = "1.0" }
\ No newline at end of file
+serde_json = { version = "1.0" }
+bytes = "*"
\ No newline at end of file
use shared::{
config::Settings,
log::{debug, info},
- nats_crate::Connection,
+ nats_crate::Client,
payloads::{CachePayload, DispatchEventTagged, Tracing},
};
use std::{convert::TryFrom, error::Error};
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let settings: Settings<Config> = Settings::new("gateway").unwrap();
let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents);
- let nats: Connection = settings.nats.into();
+ let nats: Client = settings.nats.to_client().await?;
shard.start().await?;
}
_ => {
- let name = event.kind().name().unwrap();
+ let name = event.kind().name();
if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
let data = CachePayload {
tracing: Tracing {
};
let value = serde_json::to_string(&data)?;
debug!("nats send: {}", value);
- nats.publish(&format!("nova.cache.dispatch.{}", name), value)?;
+ let bytes = bytes::Bytes::from(value);
+ nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+ .await?;
}
}
}
Body, Method, Request, Response, StatusCode,
};
use serde::{Deserialize, Serialize};
-use shared::nats_crate::Connection;
+use shared::nats_crate::Client;
use shared::{
log::{debug, error},
payloads::{CachePayload, DispatchEventTagged, Tracing},
str::from_utf8,
sync::Arc,
task::{Context, Poll},
- time::Duration,
};
use twilight_model::gateway::event::{DispatchEvent};
use twilight_model::{
#[derive(Clone)]
pub struct HandlerService {
pub config: Arc<Config>,
- pub nats: Arc<Connection>,
+ pub nats: Arc<Client>,
pub public_key: Arc<PublicKey>,
}
let payload = serde_json::to_string(&data).unwrap();
- match self.nats.request_timeout(
- "nova.cache.dispatch.INTERACTION_CREATE",
- payload,
- Duration::from_secs(2),
- ) {
+ match self.nats.request(
+ "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
+ Bytes::from(payload),
+ ).await {
Ok(response) => Ok(Response::builder()
.header("Content-Type", "application/json")
- .body(Body::from(response.data))
+ .body(Body::from(response.reply.unwrap()))
.unwrap()),
Err(error) => {
use super::handler::HandlerService;
use crate::config::Config;
use hyper::service::Service;
-use shared::nats_crate::Connection;
+use shared::nats_crate::Client;
use std::{
future::{ready, Ready},
sync::Arc,
pub struct MakeSvc {
pub settings: Arc<Config>,
- pub nats: Arc<Connection>,
+ pub nats: Arc<Client>,
pub public_key: Arc<PublicKey>
}
use crate::handler::make_service::MakeSvc;
use crate::config::Config;
-use shared::config::Settings;
-use shared::log::{error, info};
use ed25519_dalek::PublicKey;
use hyper::Server;
+use shared::config::Settings;
+use shared::log::{error, info};
#[tokio::main]
async fn main() {
Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap());
let server = Server::bind(&addr).serve(MakeSvc {
settings: config,
- nats: Arc::new(settings.nats.into()),
+ nats: Arc::new(settings.nats.to_client().await.unwrap()),
public_key: public_key,
});
[package]
name = "shared"
version = "0.1.0"
-edition = "2018"
+edition = "2021"
[dependencies]
pretty_env_logger = "0.4"
tokio = { version = "1", features = ["full"] }
enumflags2 = { version = "0.7.1", features = ["serde"] }
prometheus = { version = "0.13", features = ["process"] }
-nats = "0.23"
+async-nats = "0.25.1"
testcontainers = "0.14"
twilight-model = "0.14"
serde_json = { version = "1.0" }
use config::ConfigError;
-use std::fmt::Debug;
+use std::{fmt::Debug, io};
use thiserror::Error;
#[derive(Debug, Error)]
#[error("step `{0}` failed")]
StepFailed(String),
+
+ #[error("io error")]
+ Io(#[from] io::Error)
}
pub use ::config as config_crate;
-pub use ::nats as nats_crate;
+pub use ::async_nats as nats_crate;
pub use ::redis as redis_crate;
pub use log;
pub use prometheus;
-use nats::{Connection, Options};
+use async_nats::Client;
use serde::Deserialize;
+use std::future::Future;
+
+use crate::error::GenericError;
#[derive(Clone, Debug, Deserialize)]
pub struct NatsConfigurationClientCert {
#[derive(Clone, Debug, Deserialize)]
pub struct NatsConfiguration {
- pub client_cert: Option<NatsConfigurationClientCert>,
- pub root_cert: Option<Vec<String>>,
- pub jetstream_api_prefix: Option<String>,
- pub max_reconnects: Option<usize>,
- pub reconnect_buffer_size: Option<usize>,
- pub tls: Option<NatsConfigurationTls>,
- pub client_name: Option<String>,
- pub tls_required: Option<bool>,
pub host: String,
}
// todo: Prefer From since it automatically gives a free Into implementation
// Allows the configuration to directly create a nats connection
-impl Into<Connection> for NatsConfiguration {
- fn into(self) -> Connection {
- let mut options = Options::new();
-
- if let Some(client_cert) = self.client_cert {
- options = options.client_cert(client_cert.cert, client_cert.key);
- }
-
- if let Some(root_certs) = self.root_cert {
- for root_cert in root_certs {
- options = options.add_root_certificate(root_cert);
- }
- }
-
- options = options.max_reconnects(self.max_reconnects);
- options = options.no_echo();
- options = options.reconnect_buffer_size(self.reconnect_buffer_size.unwrap_or(64 * 1024));
- options = options.tls_required(self.tls_required.unwrap_or(false));
- options = options.with_name(&self.client_name.unwrap_or_else(|| "Nova".to_string()));
-
- if let Some(tls) = self.tls {
- let mut config = nats::rustls::ClientConfig::new();
- config.set_mtu(&tls.mtu);
- // todo: more options?
- options = options.tls_client_config(config);
- }
-
- options.connect(&self.host).unwrap()
+impl NatsConfiguration {
+ pub async fn to_client(self) -> Result<Client, GenericError> {
+ Ok(async_nats::connect(self.host).await?)
}
-}
+}
\ No newline at end of file