]> git.puffer.fish Git - matthieu/nova.git/commitdiff
add base of cache component
authorMatthieuCoder <matthieu@matthieu-dev.xyz>
Sat, 31 Dec 2022 18:48:40 +0000 (22:48 +0400)
committerMatthieuCoder <matthieu@matthieu-dev.xyz>
Sat, 31 Dec 2022 18:48:40 +0000 (22:48 +0400)
27 files changed:
Cargo.lock
exes/cache/Cargo.toml
exes/cache/src/config.rs
exes/cache/src/main.rs
exes/cache/src/managers/automoderation.rs [new file with mode: 0644]
exes/cache/src/managers/bans.rs [new file with mode: 0644]
exes/cache/src/managers/channels.rs [new file with mode: 0644]
exes/cache/src/managers/guild_schedules.rs [new file with mode: 0644]
exes/cache/src/managers/guilds.rs [new file with mode: 0644]
exes/cache/src/managers/integrations.rs [new file with mode: 0644]
exes/cache/src/managers/invites.rs [new file with mode: 0644]
exes/cache/src/managers/members.rs [new file with mode: 0644]
exes/cache/src/managers/messages.rs [new file with mode: 0644]
exes/cache/src/managers/mod.rs [new file with mode: 0644]
exes/cache/src/managers/reactions.rs [new file with mode: 0644]
exes/cache/src/managers/roles.rs [new file with mode: 0644]
exes/cache/src/managers/stage_instances.rs [new file with mode: 0644]
exes/cache/src/managers/threads.rs [new file with mode: 0644]
exes/gateway/Cargo.toml
exes/gateway/src/main.rs
exes/webhook/src/handler/handler.rs
exes/webhook/src/handler/make_service.rs
exes/webhook/src/main.rs
libs/shared/Cargo.toml
libs/shared/src/error.rs
libs/shared/src/lib.rs
libs/shared/src/nats.rs

index c1ba53e84f819e4289ed6af97a2b8cc27e63d99b..943bb096c85ea7e9e7bf3858da82e087d9a7b74b 100644 (file)
@@ -44,32 +44,39 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
 
 [[package]]
-name = "async-channel"
-version = "1.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
-dependencies = [
- "concurrent-queue",
- "event-listener",
- "futures-core",
-]
-
-[[package]]
-name = "async-lock"
-version = "2.6.0"
+name = "async-nats"
+version = "0.25.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685"
+checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32"
 dependencies = [
- "event-listener",
- "futures-lite",
+ "base64",
+ "base64-url",
+ "bytes",
+ "futures",
+ "http",
+ "itertools",
+ "itoa",
+ "lazy_static",
+ "nkeys",
+ "nuid",
+ "once_cell",
+ "regex",
+ "ring",
+ "rustls-native-certs",
+ "rustls-pemfile",
+ "serde",
+ "serde_json",
+ "serde_nanos",
+ "serde_repr",
+ "subslice",
+ "time 0.3.17",
+ "tokio",
+ "tokio-retry",
+ "tokio-rustls",
+ "tracing",
+ "url",
 ]
 
-[[package]]
-name = "async-task"
-version = "4.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524"
-
 [[package]]
 name = "async-trait"
 version = "0.1.60"
@@ -81,12 +88,6 @@ dependencies = [
  "syn",
 ]
 
-[[package]]
-name = "atomic-waker"
-version = "1.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
-
 [[package]]
 name = "atty"
 version = "0.2.14"
@@ -149,20 +150,6 @@ dependencies = [
  "generic-array",
 ]
 
-[[package]]
-name = "blocking"
-version = "1.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3c67b173a56acffd6d2326fb7ab938ba0b00a71480e14902b2591c87bc5741e8"
-dependencies = [
- "async-channel",
- "async-lock",
- "async-task",
- "atomic-waker",
- "fastrand",
- "futures-lite",
-]
-
 [[package]]
 name = "bollard-stubs"
 version = "1.41.0"
@@ -196,12 +183,15 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
 name = "cache"
 version = "0.1.0"
 dependencies = [
+ "async-nats",
+ "futures-util",
  "log",
- "nats",
  "redis",
  "serde",
  "serde_json",
  "shared",
+ "tokio",
+ "twilight-model",
 ]
 
 [[package]]
@@ -256,15 +246,6 @@ dependencies = [
  "tokio-util",
 ]
 
-[[package]]
-name = "concurrent-queue"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b"
-dependencies = [
- "crossbeam-utils",
-]
-
 [[package]]
 name = "config"
 version = "0.13.3"
@@ -330,25 +311,6 @@ dependencies = [
  "cfg-if",
 ]
 
-[[package]]
-name = "crossbeam-channel"
-version = "0.5.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
-dependencies = [
- "cfg-if",
- "crossbeam-utils",
-]
-
-[[package]]
-name = "crossbeam-utils"
-version = "0.8.14"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"
-dependencies = [
- "cfg-if",
-]
-
 [[package]]
 name = "crypto-common"
 version = "0.1.6"
@@ -525,6 +487,12 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "either"
+version = "1.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
+
 [[package]]
 name = "enumflags2"
 version = "0.7.5"
@@ -580,12 +548,6 @@ dependencies = [
  "libc",
 ]
 
-[[package]]
-name = "event-listener"
-version = "2.5.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
-
 [[package]]
 name = "fastrand"
 version = "1.8.0"
@@ -684,21 +646,6 @@ version = "0.3.25"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
 
-[[package]]
-name = "futures-lite"
-version = "1.12.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
-dependencies = [
- "fastrand",
- "futures-core",
- "futures-io",
- "memchr",
- "parking",
- "pin-project-lite",
- "waker-fn",
-]
-
 [[package]]
 name = "futures-macro"
 version = "0.3.25"
@@ -744,6 +691,7 @@ dependencies = [
 name = "gateway"
 version = "0.1.0"
 dependencies = [
+ "bytes",
  "futures",
  "serde",
  "serde_json",
@@ -921,8 +869,8 @@ checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
 dependencies = [
  "http",
  "hyper",
- "rustls 0.20.7",
- "rustls-native-certs 0.6.2",
+ "rustls",
+ "rustls-native-certs",
  "tokio",
  "tokio-rustls",
 ]
@@ -1015,6 +963,15 @@ dependencies = [
  "windows-sys 0.42.0",
 ]
 
+[[package]]
+name = "itertools"
+version = "0.10.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
+dependencies = [
+ "either",
+]
+
 [[package]]
 name = "itoa"
 version = "1.0.5"
@@ -1030,12 +987,6 @@ dependencies = [
  "wasm-bindgen",
 ]
 
-[[package]]
-name = "json"
-version = "0.12.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd"
-
 [[package]]
 name = "json5"
 version = "0.4.1"
@@ -1170,42 +1121,6 @@ dependencies = [
  "tempfile",
 ]
 
-[[package]]
-name = "nats"
-version = "0.23.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b3d877cd2e71146efa7065300fc5f5da967f938694b4d65e8bc64cc4a409092c"
-dependencies = [
- "base64",
- "base64-url",
- "blocking",
- "crossbeam-channel",
- "fastrand",
- "itoa",
- "json",
- "lazy_static",
- "libc",
- "log",
- "memchr",
- "nkeys",
- "nuid",
- "once_cell",
- "parking_lot",
- "regex",
- "ring",
- "rustls 0.19.1",
- "rustls-native-certs 0.5.0",
- "rustls-pemfile 0.2.1",
- "serde",
- "serde_json",
- "serde_nanos",
- "serde_repr",
- "time 0.3.17",
- "url",
- "webpki 0.21.4",
- "winapi",
-]
-
 [[package]]
 name = "nkeys"
 version = "0.2.0"
@@ -1346,12 +1261,6 @@ dependencies = [
  "hashbrown",
 ]
 
-[[package]]
-name = "parking"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
-
 [[package]]
 name = "parking_lot"
 version = "0.12.1"
@@ -1440,6 +1349,26 @@ dependencies = [
  "sha1",
 ]
 
+[[package]]
+name = "pin-project"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "pin-project-lite"
 version = "0.2.9"
@@ -1744,19 +1673,6 @@ dependencies = [
  "windows-sys 0.42.0",
 ]
 
-[[package]]
-name = "rustls"
-version = "0.19.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
-dependencies = [
- "base64",
- "log",
- "ring",
- "sct 0.6.1",
- "webpki 0.21.4",
-]
-
 [[package]]
 name = "rustls"
 version = "0.20.7"
@@ -1765,20 +1681,8 @@ checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
 dependencies = [
  "log",
  "ring",
- "sct 0.7.0",
- "webpki 0.22.0",
-]
-
-[[package]]
-name = "rustls-native-certs"
-version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092"
-dependencies = [
- "openssl-probe",
- "rustls 0.19.1",
- "schannel",
- "security-framework",
+ "sct",
+ "webpki",
 ]
 
 [[package]]
@@ -1788,20 +1692,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
 dependencies = [
  "openssl-probe",
- "rustls-pemfile 1.0.1",
+ "rustls-pemfile",
  "schannel",
  "security-framework",
 ]
 
-[[package]]
-name = "rustls-pemfile"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
-dependencies = [
- "base64",
-]
-
 [[package]]
 name = "rustls-pemfile"
 version = "1.0.1"
@@ -1839,16 +1734,6 @@ version = "1.0.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
 
-[[package]]
-name = "sct"
-version = "0.6.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
-dependencies = [
- "ring",
- "untrusted",
-]
-
 [[package]]
 name = "sct"
 version = "0.7.0"
@@ -2021,12 +1906,12 @@ dependencies = [
 name = "shared"
 version = "0.1.0"
 dependencies = [
+ "async-nats",
  "config",
  "enumflags2",
  "hyper",
  "inner",
  "log",
- "nats",
  "pretty_env_logger",
  "prometheus",
  "redis",
@@ -2112,6 +1997,15 @@ version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
 
+[[package]]
+name = "subslice"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0a8e4809a3bb02de01f1f7faf1ba01a83af9e8eabcd4d31dd6e413d14d56aae"
+dependencies = [
+ "memchr",
+]
+
 [[package]]
 name = "subtle"
 version = "2.4.1"
@@ -2295,15 +2189,26 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tokio-retry"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
+dependencies = [
+ "pin-project",
+ "rand 0.8.5",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-rustls"
 version = "0.23.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
 dependencies = [
- "rustls 0.20.7",
+ "rustls",
  "tokio",
- "webpki 0.22.0",
+ "webpki",
 ]
 
 [[package]]
@@ -2314,12 +2219,12 @@ checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
 dependencies = [
  "futures-util",
  "log",
- "rustls 0.20.7",
- "rustls-native-certs 0.6.2",
+ "rustls",
+ "rustls-native-certs",
  "tokio",
  "tokio-rustls",
  "tungstenite",
- "webpki 0.22.0",
+ "webpki",
 ]
 
 [[package]]
@@ -2402,12 +2307,12 @@ dependencies = [
  "httparse",
  "log",
  "rand 0.8.5",
- "rustls 0.20.7",
+ "rustls",
  "sha-1",
  "thiserror",
  "url",
  "utf-8",
- "webpki 0.22.0",
+ "webpki",
 ]
 
 [[package]]
@@ -2420,8 +2325,8 @@ dependencies = [
  "flate2",
  "futures-util",
  "leaky-bucket-lite",
- "rustls 0.20.7",
- "rustls-native-certs 0.6.2",
+ "rustls",
+ "rustls-native-certs",
  "serde",
  "serde_json",
  "tokio",
@@ -2577,12 +2482,6 @@ version = "0.9.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
 
-[[package]]
-name = "waker-fn"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
-
 [[package]]
 name = "want"
 version = "0.3.0"
@@ -2693,16 +2592,6 @@ dependencies = [
  "twilight-model",
 ]
 
-[[package]]
-name = "webpki"
-version = "0.21.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
-dependencies = [
- "ring",
- "untrusted",
-]
-
 [[package]]
 name = "webpki"
 version = "0.22.0"
index 853903ba0cd72641d02a20bb2e6405318c6c8e1b..61bb449f54217af05fe3724b2a3f96b3602dcc58 100644 (file)
@@ -7,8 +7,11 @@ edition = "2018"
 
 [dependencies]
 shared = { path = "../../libs/shared" }
-nats = "0.23.1"
+async-nats = "0.25.1"
+tokio = { version = "1", features = ["full"] }
 serde = { version = "1.0.8", features = ["derive"] }
 log = { version = "0.4", features = ["std"] }
 serde_json = { version = "1.0" }
-redis = "*"
\ No newline at end of file
+redis = "*"
+futures-util = "*"
+twilight-model = "0.14"
\ No newline at end of file
index 37b1e7305613f424d2045ccd645707aec46e962f..2aa71a8b4229700ae22de0e8da2cf2bf2ec46891 100644 (file)
@@ -1,4 +1,6 @@
 use serde::Deserialize;
 
 #[derive(Debug, Deserialize, Clone, Default)]
-pub struct CacheConfiguration {}
\ No newline at end of file
+pub struct CacheConfiguration {
+    pub toggles: Vec<String>
+}
\ No newline at end of file
index ab79dc9c393bbcdc531e37cc103fdb7de0ad63f2..a74cfa6491d830c15cc753e92f679b69bcf906a6 100644 (file)
-use shared::config::Settings;
+use std::error::Error;
+
+use async_nats::{Client, Subscriber};
+use futures_util::stream::StreamExt;
 use log::info;
+use managers::{
+    automoderation::Automoderation, bans::Bans, channels::Channels,
+    guild_schedules::GuildSchedules, guilds::Guilds, integrations::Integrations, invites::Invites,
+    members::Members, messages::Messages, reactions::Reactions, roles::Roles,
+    stage_instances::StageInstances, threads::Threads, CacheManager,
+};
+use shared::{config::Settings, payloads::CachePayload};
+use twilight_model::gateway::event::DispatchEvent;
 
 use crate::config::CacheConfiguration;
 
 mod config;
+mod managers;
+
+pub enum CacheSourcedEvents {
+    None,
+}
 
+#[derive(Default)]
+struct MegaCache {
+    automoderation: Automoderation,
+    channels: Channels,
+    bans: Bans,
+    guild_schedules: GuildSchedules,
+    guilds: Guilds,
+    integrations: Integrations,
+    invites: Invites,
+    members: Members,
+    messages: Messages,
+    reactions: Reactions,
+    roles: Roles,
+    stage_instances: StageInstances,
+    threads: Threads,
+}
 
-fn main() {
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
     let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();
     info!("loaded configuration: {:?}", settings);
 
-    
+    let nats: Client = settings.nats.to_client().await?;
+    // let redis: redis::Client = settings.redis.into();
+
+    let mut cache = MegaCache::default();
+
+    let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?;
+    listen(&mut sub, &mut cache, settings.config.toggles).await;
+    Ok(())
+}
+
+async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec<String>) {
+    while let Some(data) = sub.next().await {
+        let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap();
+        let event = cp.data.data;
+        match event {
+            // Channel events
+            DispatchEvent::ChannelCreate(_)
+            | DispatchEvent::ChannelDelete(_)
+            | DispatchEvent::ChannelPinsUpdate(_)
+            | DispatchEvent::ChannelUpdate(_)
+                if features.contains(&"channels_cache".to_string()) =>
+            {
+                cache.channels.handle(event);
+            }
+
+            // Guild Cache
+            DispatchEvent::GuildCreate(_)
+            | DispatchEvent::GuildDelete(_)
+            | DispatchEvent::UnavailableGuild(_)
+            | DispatchEvent::GuildUpdate(_)
+            | DispatchEvent::WebhooksUpdate(_)
+            | DispatchEvent::GuildStickersUpdate(_)
+            | DispatchEvent::GuildEmojisUpdate(_)
+            | DispatchEvent::VoiceServerUpdate(_)
+            | DispatchEvent::GuildIntegrationsUpdate(_)
+            | DispatchEvent::CommandPermissionsUpdate(_)
+                if features.contains(&"guilds_cache".to_string()) =>
+            {
+                cache.guilds.handle(event);
+            }
+
+            // Guild Scheduled event
+            DispatchEvent::GuildScheduledEventCreate(_)
+            | DispatchEvent::GuildScheduledEventDelete(_)
+            | DispatchEvent::GuildScheduledEventUpdate(_)
+            | DispatchEvent::GuildScheduledEventUserAdd(_)
+            | DispatchEvent::GuildScheduledEventUserRemove(_)
+                if features.contains(&"guild_schedules_cache".to_string()) =>
+            {
+                cache.guild_schedules.handle(event);
+            }
+
+            // Stage events
+            DispatchEvent::StageInstanceCreate(_)
+            | DispatchEvent::StageInstanceDelete(_)
+            | DispatchEvent::StageInstanceUpdate(_)
+                if features.contains(&"stage_instances_cache".to_string()) =>
+            {
+                cache.stage_instances.handle(event);
+            }
+
+            // Integration events
+            DispatchEvent::IntegrationCreate(_)
+            | DispatchEvent::IntegrationDelete(_)
+            | DispatchEvent::IntegrationUpdate(_)
+            | DispatchEvent::InteractionCreate(_)
+                if features.contains(&"integrations_cache".to_string()) =>
+            {
+                cache.integrations.handle(event);
+            }
+
+            // Member events
+            DispatchEvent::MemberAdd(_)
+            | DispatchEvent::MemberRemove(_)
+            | DispatchEvent::MemberUpdate(_)
+            | DispatchEvent::MemberChunk(_)
+            | DispatchEvent::UserUpdate(_)
+                if features.contains(&"members_cache".to_string()) =>
+            {
+                cache.members.handle(event);
+            }
+
+            // Ban cache
+            DispatchEvent::BanAdd(_) | DispatchEvent::BanRemove(_)
+                if features.contains(&"bans_cache".to_string()) =>
+            {
+                cache.bans.handle(event);
+            }
+
+            // Reaction cache
+            DispatchEvent::ReactionAdd(_)
+            | DispatchEvent::ReactionRemove(_)
+            | DispatchEvent::ReactionRemoveAll(_)
+            | DispatchEvent::ReactionRemoveEmoji(_)
+                if features.contains(&"reactions_cache".to_string()) =>
+            {
+                cache.reactions.handle(event);
+            }
+
+            // Message cache
+            DispatchEvent::MessageCreate(_)
+            | DispatchEvent::MessageDelete(_)
+            | DispatchEvent::MessageDeleteBulk(_)
+            | DispatchEvent::MessageUpdate(_)
+                if features.contains(&"messages_cache".to_string()) =>
+            {
+                cache.messages.handle(event);
+            }
+
+            // Thread cache
+            DispatchEvent::ThreadCreate(_)
+            | DispatchEvent::ThreadDelete(_)
+            | DispatchEvent::ThreadListSync(_)
+            | DispatchEvent::ThreadMemberUpdate(_)
+            | DispatchEvent::ThreadMembersUpdate(_)
+            | DispatchEvent::ThreadUpdate(_)
+                if features.contains(&"threads_cache".to_string()) =>
+            {
+                cache.threads.handle(event);
+            }
+
+            // Invite cache
+            DispatchEvent::InviteCreate(_) | DispatchEvent::InviteDelete(_)
+                if features.contains(&"invites_cache".to_string()) =>
+            {
+                cache.invites.handle(event);
+            }
+
+            // Roles cache
+            DispatchEvent::RoleCreate(_)
+            | DispatchEvent::RoleDelete(_)
+            | DispatchEvent::RoleUpdate(_)
+                if features.contains(&"roles_cache".to_string()) =>
+            {
+                cache.roles.handle(event);
+            }
+
+            // Automod rules
+            DispatchEvent::AutoModerationRuleCreate(_)
+            | DispatchEvent::AutoModerationRuleDelete(_)
+            | DispatchEvent::AutoModerationRuleUpdate(_)
+                if features.contains(&"automoderation_cache".to_string()) =>
+            {
+                cache.automoderation.handle(event);
+            }
+
+            // Voice State
+            DispatchEvent::VoiceStateUpdate(_)
+                if features.contains(&"voice_states_cache".to_string()) => {}
+
+            _ => {
+                // just forward
+            }
+        }
+    }
 }
diff --git a/exes/cache/src/managers/automoderation.rs b/exes/cache/src/managers/automoderation.rs
new file mode 100644 (file)
index 0000000..4a1b119
--- /dev/null
@@ -0,0 +1,26 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+#[derive(Default)]
+pub struct Automoderation {}
+impl CacheManager for Automoderation {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::AutoModerationRuleCreate(_) => {}
+                DispatchEvent::AutoModerationRuleDelete(_) => {}
+                DispatchEvent::AutoModerationRuleUpdate(_) => {}
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/bans.rs b/exes/cache/src/managers/bans.rs
new file mode 100644 (file)
index 0000000..27e6a34
--- /dev/null
@@ -0,0 +1,26 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Bans {}
+impl CacheManager for Bans {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::BanAdd(_) => {}
+                DispatchEvent::BanRemove(_) => {}
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/channels.rs b/exes/cache/src/managers/channels.rs
new file mode 100644 (file)
index 0000000..fe34acc
--- /dev/null
@@ -0,0 +1,28 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Channels {}
+impl CacheManager for Channels {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::ChannelCreate(_) => {}
+                DispatchEvent::ChannelDelete(_) => {}
+                DispatchEvent::ChannelPinsUpdate(_) => {}
+                DispatchEvent::ChannelUpdate(_) => {}
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/guild_schedules.rs b/exes/cache/src/managers/guild_schedules.rs
new file mode 100644 (file)
index 0000000..bcc79c5
--- /dev/null
@@ -0,0 +1,29 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct GuildSchedules {}
+impl CacheManager for GuildSchedules {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::GuildScheduledEventCreate(_) => {}
+                DispatchEvent::GuildScheduledEventDelete(_) => {}
+                DispatchEvent::GuildScheduledEventUpdate(_) => {}
+                DispatchEvent::GuildScheduledEventUserAdd(_) => {}
+                DispatchEvent::GuildScheduledEventUserRemove(_) => {}
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/guilds.rs b/exes/cache/src/managers/guilds.rs
new file mode 100644 (file)
index 0000000..3f5f4c4
--- /dev/null
@@ -0,0 +1,34 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Guilds {}
+impl CacheManager for Guilds {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::GuildCreate(_) => {},
+                DispatchEvent::GuildDelete(_) => {},
+                DispatchEvent::UnavailableGuild(_) => {},
+                DispatchEvent::GuildUpdate(_) => {},
+                DispatchEvent::WebhooksUpdate(_) => {},
+                DispatchEvent::GuildStickersUpdate(_) => {},
+                DispatchEvent::GuildEmojisUpdate(_) => {},
+                DispatchEvent::VoiceServerUpdate(_) => {},
+                DispatchEvent::GuildIntegrationsUpdate(_) => {},
+                DispatchEvent::CommandPermissionsUpdate(_) => {},
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/integrations.rs b/exes/cache/src/managers/integrations.rs
new file mode 100644 (file)
index 0000000..99d292e
--- /dev/null
@@ -0,0 +1,28 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Integrations {}
+impl CacheManager for Integrations {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::IntegrationCreate(_) => {}
+                DispatchEvent::IntegrationDelete(_) => {}
+                DispatchEvent::IntegrationUpdate(_) => {}
+                DispatchEvent::InteractionCreate(_) => {}
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/invites.rs b/exes/cache/src/managers/invites.rs
new file mode 100644 (file)
index 0000000..21da64f
--- /dev/null
@@ -0,0 +1,26 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Invites {}
+impl CacheManager for Invites {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::InviteCreate(_) => {}
+                DispatchEvent::InviteDelete(_) => {}
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/members.rs b/exes/cache/src/managers/members.rs
new file mode 100644 (file)
index 0000000..3a483f1
--- /dev/null
@@ -0,0 +1,29 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Members {}
+impl CacheManager for Members {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::MemberAdd(_) => {},
+                DispatchEvent::MemberRemove(_) => {},
+                DispatchEvent::MemberUpdate(_) => {},
+                DispatchEvent::MemberChunk(_) => {},
+                DispatchEvent::UserUpdate(_) => {},
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/messages.rs b/exes/cache/src/managers/messages.rs
new file mode 100644 (file)
index 0000000..7b06ae7
--- /dev/null
@@ -0,0 +1,28 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Messages {}
+impl CacheManager for Messages {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::MessageCreate(_) => {},
+                DispatchEvent::MessageDelete(_) => {},
+                DispatchEvent::MessageDeleteBulk(_) => {},
+                DispatchEvent::MessageUpdate(_) => {},
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/mod.rs b/exes/cache/src/managers/mod.rs
new file mode 100644 (file)
index 0000000..370cda6
--- /dev/null
@@ -0,0 +1,23 @@
+use std::pin::Pin;
+use twilight_model::gateway::event::DispatchEvent;
+use std::future::Future;
+
+use crate::CacheSourcedEvents;
+
+pub mod channels;
+pub mod guilds;
+pub mod guild_schedules;
+pub mod stage_instances;
+pub mod integrations;
+pub mod members;
+pub mod bans;
+pub mod reactions;
+pub mod messages;
+pub mod threads;
+pub mod invites;
+pub mod roles;
+pub mod automoderation;
+
+pub trait CacheManager {
+    fn handle(&self, event: DispatchEvent) -> Pin<Box<dyn Future<Output = CacheSourcedEvents>>>;
+}
diff --git a/exes/cache/src/managers/reactions.rs b/exes/cache/src/managers/reactions.rs
new file mode 100644 (file)
index 0000000..5d21e0b
--- /dev/null
@@ -0,0 +1,28 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Reactions {}
+impl CacheManager for Reactions {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::ReactionAdd(_) => {},
+                DispatchEvent::ReactionRemove(_) => {},
+                DispatchEvent::ReactionRemoveAll(_) => {},
+                DispatchEvent::ReactionRemoveEmoji(_) => {},
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/roles.rs b/exes/cache/src/managers/roles.rs
new file mode 100644 (file)
index 0000000..5fa0f22
--- /dev/null
@@ -0,0 +1,27 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Roles {}
+impl CacheManager for Roles {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::RoleCreate(_) => {},
+                DispatchEvent::RoleDelete(_) => {},
+                DispatchEvent::RoleUpdate(_) => {},
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/stage_instances.rs b/exes/cache/src/managers/stage_instances.rs
new file mode 100644 (file)
index 0000000..314d089
--- /dev/null
@@ -0,0 +1,27 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct StageInstances {}
+impl CacheManager for StageInstances {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::StageInstanceCreate(_) => {},
+                DispatchEvent::StageInstanceDelete(_) => {},
+                DispatchEvent::StageInstanceUpdate(_) => {},
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
diff --git a/exes/cache/src/managers/threads.rs b/exes/cache/src/managers/threads.rs
new file mode 100644 (file)
index 0000000..d4efc2e
--- /dev/null
@@ -0,0 +1,29 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+#[derive(Default)]
+pub struct Threads {}
+impl CacheManager for Threads {
+    fn handle(
+        &self,
+        event: twilight_model::gateway::event::DispatchEvent,
+    ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+        Box::pin(async move {
+            match event {
+                DispatchEvent::ThreadCreate(_) => {},
+                DispatchEvent::ThreadDelete(_) => {},
+                DispatchEvent::ThreadListSync(_) => {},
+                DispatchEvent::ThreadMemberUpdate(_) => {},
+                DispatchEvent::ThreadMembersUpdate(_) => {},
+                DispatchEvent::ThreadUpdate(_) => {},
+                _ => unreachable!(),
+            };
+
+            CacheSourcedEvents::None
+        })
+    }
+}
index 27269c16f8133a446c4ce6430fcb87ea2c5acd99..7ef3d9882fa4ccb2c0d9e0963e0badb2a22b47e6 100644 (file)
@@ -10,4 +10,5 @@ twilight-gateway = { version = "0.14" }
 twilight-model = "0.14"
 serde = { version = "1.0.8", features = ["derive"] }
 futures = "0.3"
-serde_json = { version = "1.0" }
\ No newline at end of file
+serde_json = { version = "1.0" }
+bytes = "*"
\ No newline at end of file
index 93ca022c26e6a301f0eca6474e9919992dda5227..6968fe431a5704acb1ddb24750d5e3b7333dcfee 100644 (file)
@@ -2,7 +2,7 @@ use config::Config;
 use shared::{
     config::Settings,
     log::{debug, info},
-    nats_crate::Connection,
+    nats_crate::Client,
     payloads::{CachePayload, DispatchEventTagged, Tracing},
 };
 use std::{convert::TryFrom, error::Error};
@@ -15,7 +15,7 @@ use twilight_model::gateway::event::DispatchEvent;
 async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
     let settings: Settings<Config> = Settings::new("gateway").unwrap();
     let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents);
-    let nats: Connection = settings.nats.into();
+    let nats: Client = settings.nats.to_client().await?;
 
     shard.start().await?;
 
@@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
             }
 
             _ => {
-                let name = event.kind().name().unwrap();
+                let name = event.kind().name();
                 if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
                     let data = CachePayload {
                         tracing: Tracing {
@@ -39,7 +39,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
                     };
                     let value = serde_json::to_string(&data)?;
                     debug!("nats send: {}", value);
-                    nats.publish(&format!("nova.cache.dispatch.{}", name), value)?;
+                    let bytes = bytes::Bytes::from(value);
+                    nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+                        .await?;
                 }
             }
         }
index 3294c22019da4679c2a2a6bf18e1341aa0aa56e4..b2ef44cf4f04806ee49285e60094182552b5b9df 100644 (file)
@@ -8,7 +8,7 @@ use hyper::{
     Body, Method, Request, Response, StatusCode,
 };
 use serde::{Deserialize, Serialize};
-use shared::nats_crate::Connection;
+use shared::nats_crate::Client;
 use shared::{
     log::{debug, error},
     payloads::{CachePayload, DispatchEventTagged, Tracing},
@@ -19,7 +19,6 @@ use std::{
     str::from_utf8,
     sync::Arc,
     task::{Context, Poll},
-    time::Duration,
 };
 use twilight_model::gateway::event::{DispatchEvent};
 use twilight_model::{
@@ -31,7 +30,7 @@ use twilight_model::{
 #[derive(Clone)]
 pub struct HandlerService {
     pub config: Arc<Config>,
-    pub nats: Arc<Connection>,
+    pub nats: Arc<Client>,
     pub public_key: Arc<PublicKey>,
 }
 
@@ -107,14 +106,13 @@ impl HandlerService {
 
                                     let payload = serde_json::to_string(&data).unwrap();
 
-                                    match self.nats.request_timeout(
-                                        "nova.cache.dispatch.INTERACTION_CREATE",
-                                        payload,
-                                        Duration::from_secs(2),
-                                    ) {
+                                    match self.nats.request(
+                                        "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
+                                        Bytes::from(payload),
+                                    ).await {
                                         Ok(response) => Ok(Response::builder()
                                             .header("Content-Type", "application/json")
-                                            .body(Body::from(response.data))
+                                            .body(Body::from(response.reply.unwrap()))
                                             .unwrap()),
 
                                         Err(error) => {
index 27749172e90f44f77b26644f293e16ae6e8a547c..48672a17a386d08002ef0ec32923b665419a0a65 100644 (file)
@@ -1,7 +1,7 @@
 use super::handler::HandlerService;
 use crate::config::Config;
 use hyper::service::Service;
-use shared::nats_crate::Connection;
+use shared::nats_crate::Client;
 use std::{
     future::{ready, Ready},
     sync::Arc,
@@ -11,7 +11,7 @@ use ed25519_dalek::PublicKey;
 
 pub struct MakeSvc {
     pub settings: Arc<Config>,
-    pub nats: Arc<Connection>,
+    pub nats: Arc<Client>,
     pub public_key: Arc<PublicKey>
 }
 
index 9527d0fd1d8982da727fd1caf2a7f81cdef7d831..336dd820385242e9ffade082c832cebe13326120 100644 (file)
@@ -4,10 +4,10 @@ mod handler;
 use crate::handler::make_service::MakeSvc;
 
 use crate::config::Config;
-use shared::config::Settings;
-use shared::log::{error, info};
 use ed25519_dalek::PublicKey;
 use hyper::Server;
+use shared::config::Settings;
+use shared::log::{error, info};
 
 #[tokio::main]
 async fn main() {
@@ -35,7 +35,7 @@ async fn start(settings: Settings<Config>) {
         Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap());
     let server = Server::bind(&addr).serve(MakeSvc {
         settings: config,
-        nats: Arc::new(settings.nats.into()),
+        nats: Arc::new(settings.nats.to_client().await.unwrap()),
         public_key: public_key,
     });
 
index b3cdee7f4ea7051d81cc4876260127ddf1b0ecb5..6d6b6f67321ab0fb6e96aee138e9f930f4e9b541 100644 (file)
@@ -1,7 +1,7 @@
 [package]
 name = "shared"
 version = "0.1.0"
-edition = "2018"
+edition = "2021"
 
 [dependencies]
 pretty_env_logger = "0.4"
@@ -13,7 +13,7 @@ hyper = { version = "0.14", features = ["full"] }
 tokio = { version = "1", features = ["full"] }
 enumflags2 = { version = "0.7.1", features = ["serde"] }
 prometheus = { version = "0.13", features = ["process"] }
-nats = "0.23"
+async-nats = "0.25.1"
 testcontainers = "0.14"
 twilight-model = "0.14"
 serde_json = { version = "1.0" }
index 08995790c7d8702385ed733b62216b52eb8d47c1..31b1dcdc23695d0a7709b91fb369a70728bcaab4 100644 (file)
@@ -1,5 +1,5 @@
 use config::ConfigError;
-use std::fmt::Debug;
+use std::{fmt::Debug, io};
 use thiserror::Error;
 
 #[derive(Debug, Error)]
@@ -12,4 +12,7 @@ pub enum GenericError {
 
     #[error("step `{0}` failed")]
     StepFailed(String),
+
+    #[error("io error")]
+    Io(#[from] io::Error)
 }
index 58df5e132f20461894cceb055fee9ec47702a8d9..62d8689d059a9fcc15d9d74dd7ecfbf898a2cd75 100644 (file)
@@ -1,5 +1,5 @@
 pub use ::config as config_crate;
-pub use ::nats as nats_crate;
+pub use ::async_nats as nats_crate;
 pub use ::redis as redis_crate;
 pub use log;
 pub use prometheus;
index 06163984644fcf29474dea42c9f88e864c672702..05953cc6f0c2abcb63f18457d93cd0d40b1f1cb6 100644 (file)
@@ -1,5 +1,8 @@
-use nats::{Connection, Options};
+use async_nats::Client;
 use serde::Deserialize;
+use std::future::Future;
+
+use crate::error::GenericError;
 
 #[derive(Clone, Debug, Deserialize)]
 pub struct NatsConfigurationClientCert {
@@ -14,46 +17,13 @@ pub struct NatsConfigurationTls {
 
 #[derive(Clone, Debug, Deserialize)]
 pub struct NatsConfiguration {
-    pub client_cert: Option<NatsConfigurationClientCert>,
-    pub root_cert: Option<Vec<String>>,
-    pub jetstream_api_prefix: Option<String>,
-    pub max_reconnects: Option<usize>,
-    pub reconnect_buffer_size: Option<usize>,
-    pub tls: Option<NatsConfigurationTls>,
-    pub client_name: Option<String>,
-    pub tls_required: Option<bool>,
     pub host: String,
 }
 
 // todo: Prefer From since it automatically gives a free Into implementation
 // Allows the configuration to directly create a nats connection
-impl Into<Connection> for NatsConfiguration {
-    fn into(self) -> Connection {
-        let mut options = Options::new();
-
-        if let Some(client_cert) = self.client_cert {
-            options = options.client_cert(client_cert.cert, client_cert.key);
-        }
-
-        if let Some(root_certs) = self.root_cert {
-            for root_cert in root_certs {
-                options = options.add_root_certificate(root_cert);
-            }
-        }
-
-        options = options.max_reconnects(self.max_reconnects);
-        options = options.no_echo();
-        options = options.reconnect_buffer_size(self.reconnect_buffer_size.unwrap_or(64 * 1024));
-        options = options.tls_required(self.tls_required.unwrap_or(false));
-        options = options.with_name(&self.client_name.unwrap_or_else(|| "Nova".to_string()));
-
-        if let Some(tls) = self.tls {
-            let mut config = nats::rustls::ClientConfig::new();
-            config.set_mtu(&tls.mtu);
-            // todo: more options?
-            options = options.tls_client_config(config);
-        }
-
-        options.connect(&self.host).unwrap()
+impl NatsConfiguration {
+    pub async fn to_client(self) -> Result<Client, GenericError> {
+        Ok(async_nats::connect(self.host).await?)
     }
-}
+}
\ No newline at end of file