summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthieuCoder <matthieu@matthieu-dev.xyz>2022-12-31 22:48:40 +0400
committerMatthieuCoder <matthieu@matthieu-dev.xyz>2022-12-31 22:48:40 +0400
commit46fd26962ef55f8b557f7e36d3aee915a819c88c (patch)
tree18038653ee532831aca0000afa7849924c2c63cc
parent0fcc68291a5f7526dbeffe33f4a84a649200e847 (diff)
add base of cache component
-rw-r--r--Cargo.lock319
-rw-r--r--exes/cache/Cargo.toml7
-rw-r--r--exes/cache/src/config.rs4
-rw-r--r--exes/cache/src/main.rs193
-rw-r--r--exes/cache/src/managers/automoderation.rs26
-rw-r--r--exes/cache/src/managers/bans.rs26
-rw-r--r--exes/cache/src/managers/channels.rs28
-rw-r--r--exes/cache/src/managers/guild_schedules.rs29
-rw-r--r--exes/cache/src/managers/guilds.rs34
-rw-r--r--exes/cache/src/managers/integrations.rs28
-rw-r--r--exes/cache/src/managers/invites.rs26
-rw-r--r--exes/cache/src/managers/members.rs29
-rw-r--r--exes/cache/src/managers/messages.rs28
-rw-r--r--exes/cache/src/managers/mod.rs23
-rw-r--r--exes/cache/src/managers/reactions.rs28
-rw-r--r--exes/cache/src/managers/roles.rs27
-rw-r--r--exes/cache/src/managers/stage_instances.rs27
-rw-r--r--exes/cache/src/managers/threads.rs29
-rw-r--r--exes/gateway/Cargo.toml3
-rw-r--r--exes/gateway/src/main.rs10
-rw-r--r--exes/webhook/src/handler/handler.rs16
-rw-r--r--exes/webhook/src/handler/make_service.rs4
-rw-r--r--exes/webhook/src/main.rs6
-rw-r--r--libs/shared/Cargo.toml4
-rw-r--r--libs/shared/src/error.rs5
-rw-r--r--libs/shared/src/lib.rs2
-rw-r--r--libs/shared/src/nats.rs46
27 files changed, 725 insertions, 282 deletions
diff --git a/Cargo.lock b/Cargo.lock
index c1ba53e..943bb09 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -44,33 +44,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
[[package]]
-name = "async-channel"
-version = "1.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
-dependencies = [
- "concurrent-queue",
- "event-listener",
- "futures-core",
-]
-
-[[package]]
-name = "async-lock"
-version = "2.6.0"
+name = "async-nats"
+version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685"
+checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32"
dependencies = [
- "event-listener",
- "futures-lite",
+ "base64",
+ "base64-url",
+ "bytes",
+ "futures",
+ "http",
+ "itertools",
+ "itoa",
+ "lazy_static",
+ "nkeys",
+ "nuid",
+ "once_cell",
+ "regex",
+ "ring",
+ "rustls-native-certs",
+ "rustls-pemfile",
+ "serde",
+ "serde_json",
+ "serde_nanos",
+ "serde_repr",
+ "subslice",
+ "time 0.3.17",
+ "tokio",
+ "tokio-retry",
+ "tokio-rustls",
+ "tracing",
+ "url",
]
[[package]]
-name = "async-task"
-version = "4.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524"
-
-[[package]]
name = "async-trait"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -82,12 +89,6 @@ dependencies = [
]
[[package]]
-name = "atomic-waker"
-version = "1.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
-
-[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -150,20 +151,6 @@ dependencies = [
]
[[package]]
-name = "blocking"
-version = "1.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3c67b173a56acffd6d2326fb7ab938ba0b00a71480e14902b2591c87bc5741e8"
-dependencies = [
- "async-channel",
- "async-lock",
- "async-task",
- "atomic-waker",
- "fastrand",
- "futures-lite",
-]
-
-[[package]]
name = "bollard-stubs"
version = "1.41.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -196,12 +183,15 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
name = "cache"
version = "0.1.0"
dependencies = [
+ "async-nats",
+ "futures-util",
"log",
- "nats",
"redis",
"serde",
"serde_json",
"shared",
+ "tokio",
+ "twilight-model",
]
[[package]]
@@ -257,15 +247,6 @@ dependencies = [
]
[[package]]
-name = "concurrent-queue"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b"
-dependencies = [
- "crossbeam-utils",
-]
-
-[[package]]
name = "config"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -331,25 +312,6 @@ dependencies = [
]
[[package]]
-name = "crossbeam-channel"
-version = "0.5.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
-dependencies = [
- "cfg-if",
- "crossbeam-utils",
-]
-
-[[package]]
-name = "crossbeam-utils"
-version = "0.8.14"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"
-dependencies = [
- "cfg-if",
-]
-
-[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -526,6 +488,12 @@ dependencies = [
]
[[package]]
+name = "either"
+version = "1.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
+
+[[package]]
name = "enumflags2"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -581,12 +549,6 @@ dependencies = [
]
[[package]]
-name = "event-listener"
-version = "2.5.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
-
-[[package]]
name = "fastrand"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -685,21 +647,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
[[package]]
-name = "futures-lite"
-version = "1.12.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
-dependencies = [
- "fastrand",
- "futures-core",
- "futures-io",
- "memchr",
- "parking",
- "pin-project-lite",
- "waker-fn",
-]
-
-[[package]]
name = "futures-macro"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -744,6 +691,7 @@ dependencies = [
name = "gateway"
version = "0.1.0"
dependencies = [
+ "bytes",
"futures",
"serde",
"serde_json",
@@ -921,8 +869,8 @@ checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
dependencies = [
"http",
"hyper",
- "rustls 0.20.7",
- "rustls-native-certs 0.6.2",
+ "rustls",
+ "rustls-native-certs",
"tokio",
"tokio-rustls",
]
@@ -1016,6 +964,15 @@ dependencies = [
]
[[package]]
+name = "itertools"
+version = "0.10.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
+dependencies = [
+ "either",
+]
+
+[[package]]
name = "itoa"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1031,12 +988,6 @@ dependencies = [
]
[[package]]
-name = "json"
-version = "0.12.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd"
-
-[[package]]
name = "json5"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1171,42 +1122,6 @@ dependencies = [
]
[[package]]
-name = "nats"
-version = "0.23.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b3d877cd2e71146efa7065300fc5f5da967f938694b4d65e8bc64cc4a409092c"
-dependencies = [
- "base64",
- "base64-url",
- "blocking",
- "crossbeam-channel",
- "fastrand",
- "itoa",
- "json",
- "lazy_static",
- "libc",
- "log",
- "memchr",
- "nkeys",
- "nuid",
- "once_cell",
- "parking_lot",
- "regex",
- "ring",
- "rustls 0.19.1",
- "rustls-native-certs 0.5.0",
- "rustls-pemfile 0.2.1",
- "serde",
- "serde_json",
- "serde_nanos",
- "serde_repr",
- "time 0.3.17",
- "url",
- "webpki 0.21.4",
- "winapi",
-]
-
-[[package]]
name = "nkeys"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1347,12 +1262,6 @@ dependencies = [
]
[[package]]
-name = "parking"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
-
-[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1441,6 +1350,26 @@ dependencies = [
]
[[package]]
+name = "pin-project"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1746,39 +1675,14 @@ dependencies = [
[[package]]
name = "rustls"
-version = "0.19.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
-dependencies = [
- "base64",
- "log",
- "ring",
- "sct 0.6.1",
- "webpki 0.21.4",
-]
-
-[[package]]
-name = "rustls"
version = "0.20.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
dependencies = [
"log",
"ring",
- "sct 0.7.0",
- "webpki 0.22.0",
-]
-
-[[package]]
-name = "rustls-native-certs"
-version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092"
-dependencies = [
- "openssl-probe",
- "rustls 0.19.1",
- "schannel",
- "security-framework",
+ "sct",
+ "webpki",
]
[[package]]
@@ -1788,22 +1692,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
dependencies = [
"openssl-probe",
- "rustls-pemfile 1.0.1",
+ "rustls-pemfile",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
-dependencies = [
- "base64",
-]
-
-[[package]]
-name = "rustls-pemfile"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
@@ -1841,16 +1736,6 @@ checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
[[package]]
name = "sct"
-version = "0.6.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
-dependencies = [
- "ring",
- "untrusted",
-]
-
-[[package]]
-name = "sct"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
@@ -2021,12 +1906,12 @@ dependencies = [
name = "shared"
version = "0.1.0"
dependencies = [
+ "async-nats",
"config",
"enumflags2",
"hyper",
"inner",
"log",
- "nats",
"pretty_env_logger",
"prometheus",
"redis",
@@ -2113,6 +1998,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
+name = "subslice"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0a8e4809a3bb02de01f1f7faf1ba01a83af9e8eabcd4d31dd6e413d14d56aae"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2296,14 +2190,25 @@ dependencies = [
]
[[package]]
+name = "tokio-retry"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
+dependencies = [
+ "pin-project",
+ "rand 0.8.5",
+ "tokio",
+]
+
+[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
- "rustls 0.20.7",
+ "rustls",
"tokio",
- "webpki 0.22.0",
+ "webpki",
]
[[package]]
@@ -2314,12 +2219,12 @@ checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
dependencies = [
"futures-util",
"log",
- "rustls 0.20.7",
- "rustls-native-certs 0.6.2",
+ "rustls",
+ "rustls-native-certs",
"tokio",
"tokio-rustls",
"tungstenite",
- "webpki 0.22.0",
+ "webpki",
]
[[package]]
@@ -2402,12 +2307,12 @@ dependencies = [
"httparse",
"log",
"rand 0.8.5",
- "rustls 0.20.7",
+ "rustls",
"sha-1",
"thiserror",
"url",
"utf-8",
- "webpki 0.22.0",
+ "webpki",
]
[[package]]
@@ -2420,8 +2325,8 @@ dependencies = [
"flate2",
"futures-util",
"leaky-bucket-lite",
- "rustls 0.20.7",
- "rustls-native-certs 0.6.2",
+ "rustls",
+ "rustls-native-certs",
"serde",
"serde_json",
"tokio",
@@ -2578,12 +2483,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
-name = "waker-fn"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
-
-[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2695,16 +2594,6 @@ dependencies = [
[[package]]
name = "webpki"
-version = "0.21.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
-dependencies = [
- "ring",
- "untrusted",
-]
-
-[[package]]
-name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml
index 853903b..61bb449 100644
--- a/exes/cache/Cargo.toml
+++ b/exes/cache/Cargo.toml
@@ -7,8 +7,11 @@ edition = "2018"
[dependencies]
shared = { path = "../../libs/shared" }
-nats = "0.23.1"
+async-nats = "0.25.1"
+tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.8", features = ["derive"] }
log = { version = "0.4", features = ["std"] }
serde_json = { version = "1.0" }
-redis = "*" \ No newline at end of file
+redis = "*"
+futures-util = "*"
+twilight-model = "0.14" \ No newline at end of file
diff --git a/exes/cache/src/config.rs b/exes/cache/src/config.rs
index 37b1e73..2aa71a8 100644
--- a/exes/cache/src/config.rs
+++ b/exes/cache/src/config.rs
@@ -1,4 +1,6 @@
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone, Default)]
-pub struct CacheConfiguration {} \ No newline at end of file
+pub struct CacheConfiguration {
+ pub toggles: Vec<String>
+} \ No newline at end of file
diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs
index ab79dc9..a74cfa6 100644
--- a/exes/cache/src/main.rs
+++ b/exes/cache/src/main.rs
@@ -1,14 +1,201 @@
-use shared::config::Settings;
+use std::error::Error;
+
+use async_nats::{Client, Subscriber};
+use futures_util::stream::StreamExt;
use log::info;
+use managers::{
+ automoderation::Automoderation, bans::Bans, channels::Channels,
+ guild_schedules::GuildSchedules, guilds::Guilds, integrations::Integrations, invites::Invites,
+ members::Members, messages::Messages, reactions::Reactions, roles::Roles,
+ stage_instances::StageInstances, threads::Threads, CacheManager,
+};
+use shared::{config::Settings, payloads::CachePayload};
+use twilight_model::gateway::event::DispatchEvent;
use crate::config::CacheConfiguration;
mod config;
+mod managers;
+
+pub enum CacheSourcedEvents {
+ None,
+}
+#[derive(Default)]
+struct MegaCache {
+ automoderation: Automoderation,
+ channels: Channels,
+ bans: Bans,
+ guild_schedules: GuildSchedules,
+ guilds: Guilds,
+ integrations: Integrations,
+ invites: Invites,
+ members: Members,
+ messages: Messages,
+ reactions: Reactions,
+ roles: Roles,
+ stage_instances: StageInstances,
+ threads: Threads,
+}
-fn main() {
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();
info!("loaded configuration: {:?}", settings);
-
+ let nats: Client = settings.nats.to_client().await?;
+ // let redis: redis::Client = settings.redis.into();
+
+ let mut cache = MegaCache::default();
+
+ let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?;
+ listen(&mut sub, &mut cache, settings.config.toggles).await;
+ Ok(())
+}
+
+async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec<String>) {
+ while let Some(data) = sub.next().await {
+ let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap();
+ let event = cp.data.data;
+ match event {
+ // Channel events
+ DispatchEvent::ChannelCreate(_)
+ | DispatchEvent::ChannelDelete(_)
+ | DispatchEvent::ChannelPinsUpdate(_)
+ | DispatchEvent::ChannelUpdate(_)
+ if features.contains(&"channels_cache".to_string()) =>
+ {
+ cache.channels.handle(event);
+ }
+
+ // Guild Cache
+ DispatchEvent::GuildCreate(_)
+ | DispatchEvent::GuildDelete(_)
+ | DispatchEvent::UnavailableGuild(_)
+ | DispatchEvent::GuildUpdate(_)
+ | DispatchEvent::WebhooksUpdate(_)
+ | DispatchEvent::GuildStickersUpdate(_)
+ | DispatchEvent::GuildEmojisUpdate(_)
+ | DispatchEvent::VoiceServerUpdate(_)
+ | DispatchEvent::GuildIntegrationsUpdate(_)
+ | DispatchEvent::CommandPermissionsUpdate(_)
+ if features.contains(&"guilds_cache".to_string()) =>
+ {
+ cache.guilds.handle(event);
+ }
+
+ // Guild Scheduled event
+ DispatchEvent::GuildScheduledEventCreate(_)
+ | DispatchEvent::GuildScheduledEventDelete(_)
+ | DispatchEvent::GuildScheduledEventUpdate(_)
+ | DispatchEvent::GuildScheduledEventUserAdd(_)
+ | DispatchEvent::GuildScheduledEventUserRemove(_)
+ if features.contains(&"guild_schedules_cache".to_string()) =>
+ {
+ cache.guild_schedules.handle(event);
+ }
+
+ // Stage events
+ DispatchEvent::StageInstanceCreate(_)
+ | DispatchEvent::StageInstanceDelete(_)
+ | DispatchEvent::StageInstanceUpdate(_)
+ if features.contains(&"stage_instances_cache".to_string()) =>
+ {
+ cache.stage_instances.handle(event);
+ }
+
+ // Integration events
+ DispatchEvent::IntegrationCreate(_)
+ | DispatchEvent::IntegrationDelete(_)
+ | DispatchEvent::IntegrationUpdate(_)
+ | DispatchEvent::InteractionCreate(_)
+ if features.contains(&"integrations_cache".to_string()) =>
+ {
+ cache.integrations.handle(event);
+ }
+
+ // Member events
+ DispatchEvent::MemberAdd(_)
+ | DispatchEvent::MemberRemove(_)
+ | DispatchEvent::MemberUpdate(_)
+ | DispatchEvent::MemberChunk(_)
+ | DispatchEvent::UserUpdate(_)
+ if features.contains(&"members_cache".to_string()) =>
+ {
+ cache.members.handle(event);
+ }
+
+ // Ban cache
+ DispatchEvent::BanAdd(_) | DispatchEvent::BanRemove(_)
+ if features.contains(&"bans_cache".to_string()) =>
+ {
+ cache.bans.handle(event);
+ }
+
+ // Reaction cache
+ DispatchEvent::ReactionAdd(_)
+ | DispatchEvent::ReactionRemove(_)
+ | DispatchEvent::ReactionRemoveAll(_)
+ | DispatchEvent::ReactionRemoveEmoji(_)
+ if features.contains(&"reactions_cache".to_string()) =>
+ {
+ cache.reactions.handle(event);
+ }
+
+ // Message cache
+ DispatchEvent::MessageCreate(_)
+ | DispatchEvent::MessageDelete(_)
+ | DispatchEvent::MessageDeleteBulk(_)
+ | DispatchEvent::MessageUpdate(_)
+ if features.contains(&"messages_cache".to_string()) =>
+ {
+ cache.messages.handle(event);
+ }
+
+ // Thread cache
+ DispatchEvent::ThreadCreate(_)
+ | DispatchEvent::ThreadDelete(_)
+ | DispatchEvent::ThreadListSync(_)
+ | DispatchEvent::ThreadMemberUpdate(_)
+ | DispatchEvent::ThreadMembersUpdate(_)
+ | DispatchEvent::ThreadUpdate(_)
+ if features.contains(&"threads_cache".to_string()) =>
+ {
+ cache.threads.handle(event);
+ }
+
+ // Invite cache
+ DispatchEvent::InviteCreate(_) | DispatchEvent::InviteDelete(_)
+ if features.contains(&"invites_cache".to_string()) =>
+ {
+ cache.invites.handle(event);
+ }
+
+ // Roles cache
+ DispatchEvent::RoleCreate(_)
+ | DispatchEvent::RoleDelete(_)
+ | DispatchEvent::RoleUpdate(_)
+ if features.contains(&"roles_cache".to_string()) =>
+ {
+ cache.roles.handle(event);
+ }
+
+ // Automod rules
+ DispatchEvent::AutoModerationRuleCreate(_)
+ | DispatchEvent::AutoModerationRuleDelete(_)
+ | DispatchEvent::AutoModerationRuleUpdate(_)
+ if features.contains(&"automoderation_cache".to_string()) =>
+ {
+ cache.automoderation.handle(event);
+ }
+
+ // Voice State
+ DispatchEvent::VoiceStateUpdate(_)
+ if features.contains(&"voice_states_cache".to_string()) => {}
+
+ _ => {
+ // just forward
+ }
+ }
+ }
}
diff --git a/exes/cache/src/managers/automoderation.rs b/exes/cache/src/managers/automoderation.rs
new file mode 100644
index 0000000..4a1b119
--- /dev/null
+++ b/exes/cache/src/managers/automoderation.rs
@@ -0,0 +1,26 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+#[derive(Default)]
+pub struct Automoderation {}
+impl CacheManager for Automoderation {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::AutoModerationRuleCreate(_) => {}
+ DispatchEvent::AutoModerationRuleDelete(_) => {}
+ DispatchEvent::AutoModerationRuleUpdate(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/bans.rs b/exes/cache/src/managers/bans.rs
new file mode 100644
index 0000000..27e6a34
--- /dev/null
+++ b/exes/cache/src/managers/bans.rs
@@ -0,0 +1,26 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Bans {}
+impl CacheManager for Bans {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::BanAdd(_) => {}
+ DispatchEvent::BanRemove(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/channels.rs b/exes/cache/src/managers/channels.rs
new file mode 100644
index 0000000..fe34acc
--- /dev/null
+++ b/exes/cache/src/managers/channels.rs
@@ -0,0 +1,28 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Channels {}
+impl CacheManager for Channels {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::ChannelCreate(_) => {}
+ DispatchEvent::ChannelDelete(_) => {}
+ DispatchEvent::ChannelPinsUpdate(_) => {}
+ DispatchEvent::ChannelUpdate(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/guild_schedules.rs b/exes/cache/src/managers/guild_schedules.rs
new file mode 100644
index 0000000..bcc79c5
--- /dev/null
+++ b/exes/cache/src/managers/guild_schedules.rs
@@ -0,0 +1,29 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct GuildSchedules {}
+impl CacheManager for GuildSchedules {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::GuildScheduledEventCreate(_) => {}
+ DispatchEvent::GuildScheduledEventDelete(_) => {}
+ DispatchEvent::GuildScheduledEventUpdate(_) => {}
+ DispatchEvent::GuildScheduledEventUserAdd(_) => {}
+ DispatchEvent::GuildScheduledEventUserRemove(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/guilds.rs b/exes/cache/src/managers/guilds.rs
new file mode 100644
index 0000000..3f5f4c4
--- /dev/null
+++ b/exes/cache/src/managers/guilds.rs
@@ -0,0 +1,34 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Guilds {}
+impl CacheManager for Guilds {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::GuildCreate(_) => {},
+ DispatchEvent::GuildDelete(_) => {},
+ DispatchEvent::UnavailableGuild(_) => {},
+ DispatchEvent::GuildUpdate(_) => {},
+ DispatchEvent::WebhooksUpdate(_) => {},
+ DispatchEvent::GuildStickersUpdate(_) => {},
+ DispatchEvent::GuildEmojisUpdate(_) => {},
+ DispatchEvent::VoiceServerUpdate(_) => {},
+ DispatchEvent::GuildIntegrationsUpdate(_) => {},
+ DispatchEvent::CommandPermissionsUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/integrations.rs b/exes/cache/src/managers/integrations.rs
new file mode 100644
index 0000000..99d292e
--- /dev/null
+++ b/exes/cache/src/managers/integrations.rs
@@ -0,0 +1,28 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Integrations {}
+impl CacheManager for Integrations {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::IntegrationCreate(_) => {}
+ DispatchEvent::IntegrationDelete(_) => {}
+ DispatchEvent::IntegrationUpdate(_) => {}
+ DispatchEvent::InteractionCreate(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/invites.rs b/exes/cache/src/managers/invites.rs
new file mode 100644
index 0000000..21da64f
--- /dev/null
+++ b/exes/cache/src/managers/invites.rs
@@ -0,0 +1,26 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Invites {}
+impl CacheManager for Invites {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::InviteCreate(_) => {}
+ DispatchEvent::InviteDelete(_) => {}
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/members.rs b/exes/cache/src/managers/members.rs
new file mode 100644
index 0000000..3a483f1
--- /dev/null
+++ b/exes/cache/src/managers/members.rs
@@ -0,0 +1,29 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Members {}
+impl CacheManager for Members {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::MemberAdd(_) => {},
+ DispatchEvent::MemberRemove(_) => {},
+ DispatchEvent::MemberUpdate(_) => {},
+ DispatchEvent::MemberChunk(_) => {},
+ DispatchEvent::UserUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/messages.rs b/exes/cache/src/managers/messages.rs
new file mode 100644
index 0000000..7b06ae7
--- /dev/null
+++ b/exes/cache/src/managers/messages.rs
@@ -0,0 +1,28 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Messages {}
+impl CacheManager for Messages {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::MessageCreate(_) => {},
+ DispatchEvent::MessageDelete(_) => {},
+ DispatchEvent::MessageDeleteBulk(_) => {},
+ DispatchEvent::MessageUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/mod.rs b/exes/cache/src/managers/mod.rs
new file mode 100644
index 0000000..370cda6
--- /dev/null
+++ b/exes/cache/src/managers/mod.rs
@@ -0,0 +1,23 @@
+use std::pin::Pin;
+use twilight_model::gateway::event::DispatchEvent;
+use std::future::Future;
+
+use crate::CacheSourcedEvents;
+
+pub mod channels;
+pub mod guilds;
+pub mod guild_schedules;
+pub mod stage_instances;
+pub mod integrations;
+pub mod members;
+pub mod bans;
+pub mod reactions;
+pub mod messages;
+pub mod threads;
+pub mod invites;
+pub mod roles;
+pub mod automoderation;
+
+pub trait CacheManager {
+ fn handle(&self, event: DispatchEvent) -> Pin<Box<dyn Future<Output = CacheSourcedEvents>>>;
+}
diff --git a/exes/cache/src/managers/reactions.rs b/exes/cache/src/managers/reactions.rs
new file mode 100644
index 0000000..5d21e0b
--- /dev/null
+++ b/exes/cache/src/managers/reactions.rs
@@ -0,0 +1,28 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Reactions {}
+impl CacheManager for Reactions {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::ReactionAdd(_) => {},
+ DispatchEvent::ReactionRemove(_) => {},
+ DispatchEvent::ReactionRemoveAll(_) => {},
+ DispatchEvent::ReactionRemoveEmoji(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/roles.rs b/exes/cache/src/managers/roles.rs
new file mode 100644
index 0000000..5fa0f22
--- /dev/null
+++ b/exes/cache/src/managers/roles.rs
@@ -0,0 +1,27 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct Roles {}
+impl CacheManager for Roles {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::RoleCreate(_) => {},
+ DispatchEvent::RoleDelete(_) => {},
+ DispatchEvent::RoleUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/stage_instances.rs b/exes/cache/src/managers/stage_instances.rs
new file mode 100644
index 0000000..314d089
--- /dev/null
+++ b/exes/cache/src/managers/stage_instances.rs
@@ -0,0 +1,27 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+
+#[derive(Default)]
+pub struct StageInstances {}
+impl CacheManager for StageInstances {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::StageInstanceCreate(_) => {},
+ DispatchEvent::StageInstanceDelete(_) => {},
+ DispatchEvent::StageInstanceUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/cache/src/managers/threads.rs b/exes/cache/src/managers/threads.rs
new file mode 100644
index 0000000..d4efc2e
--- /dev/null
+++ b/exes/cache/src/managers/threads.rs
@@ -0,0 +1,29 @@
+use twilight_model::gateway::event::DispatchEvent;
+
+use crate::CacheSourcedEvents;
+
+use super::CacheManager;
+use std::future::Future;
+
+#[derive(Default)]
+pub struct Threads {}
+impl CacheManager for Threads {
+ fn handle(
+ &self,
+ event: twilight_model::gateway::event::DispatchEvent,
+ ) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
+ Box::pin(async move {
+ match event {
+ DispatchEvent::ThreadCreate(_) => {},
+ DispatchEvent::ThreadDelete(_) => {},
+ DispatchEvent::ThreadListSync(_) => {},
+ DispatchEvent::ThreadMemberUpdate(_) => {},
+ DispatchEvent::ThreadMembersUpdate(_) => {},
+ DispatchEvent::ThreadUpdate(_) => {},
+ _ => unreachable!(),
+ };
+
+ CacheSourcedEvents::None
+ })
+ }
+}
diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml
index 27269c1..7ef3d98 100644
--- a/exes/gateway/Cargo.toml
+++ b/exes/gateway/Cargo.toml
@@ -10,4 +10,5 @@ twilight-gateway = { version = "0.14" }
twilight-model = "0.14"
serde = { version = "1.0.8", features = ["derive"] }
futures = "0.3"
-serde_json = { version = "1.0" } \ No newline at end of file
+serde_json = { version = "1.0" }
+bytes = "*" \ No newline at end of file
diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs
index 93ca022..6968fe4 100644
--- a/exes/gateway/src/main.rs
+++ b/exes/gateway/src/main.rs
@@ -2,7 +2,7 @@ use config::Config;
use shared::{
config::Settings,
log::{debug, info},
- nats_crate::Connection,
+ nats_crate::Client,
payloads::{CachePayload, DispatchEventTagged, Tracing},
};
use std::{convert::TryFrom, error::Error};
@@ -15,7 +15,7 @@ use twilight_model::gateway::event::DispatchEvent;
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let settings: Settings<Config> = Settings::new("gateway").unwrap();
let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents);
- let nats: Connection = settings.nats.into();
+ let nats: Client = settings.nats.to_client().await?;
shard.start().await?;
@@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
}
_ => {
- let name = event.kind().name().unwrap();
+ let name = event.kind().name();
if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
let data = CachePayload {
tracing: Tracing {
@@ -39,7 +39,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
};
let value = serde_json::to_string(&data)?;
debug!("nats send: {}", value);
- nats.publish(&format!("nova.cache.dispatch.{}", name), value)?;
+ let bytes = bytes::Bytes::from(value);
+ nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+ .await?;
}
}
}
diff --git a/exes/webhook/src/handler/handler.rs b/exes/webhook/src/handler/handler.rs
index 3294c22..b2ef44c 100644
--- a/exes/webhook/src/handler/handler.rs
+++ b/exes/webhook/src/handler/handler.rs
@@ -8,7 +8,7 @@ use hyper::{
Body, Method, Request, Response, StatusCode,
};
use serde::{Deserialize, Serialize};
-use shared::nats_crate::Connection;
+use shared::nats_crate::Client;
use shared::{
log::{debug, error},
payloads::{CachePayload, DispatchEventTagged, Tracing},
@@ -19,7 +19,6 @@ use std::{
str::from_utf8,
sync::Arc,
task::{Context, Poll},
- time::Duration,
};
use twilight_model::gateway::event::{DispatchEvent};
use twilight_model::{
@@ -31,7 +30,7 @@ use twilight_model::{
#[derive(Clone)]
pub struct HandlerService {
pub config: Arc<Config>,
- pub nats: Arc<Connection>,
+ pub nats: Arc<Client>,
pub public_key: Arc<PublicKey>,
}
@@ -107,14 +106,13 @@ impl HandlerService {
let payload = serde_json::to_string(&data).unwrap();
- match self.nats.request_timeout(
- "nova.cache.dispatch.INTERACTION_CREATE",
- payload,
- Duration::from_secs(2),
- ) {
+ match self.nats.request(
+ "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
+ Bytes::from(payload),
+ ).await {
Ok(response) => Ok(Response::builder()
.header("Content-Type", "application/json")
- .body(Body::from(response.data))
+ .body(Body::from(response.reply.unwrap()))
.unwrap()),
Err(error) => {
diff --git a/exes/webhook/src/handler/make_service.rs b/exes/webhook/src/handler/make_service.rs
index 2774917..48672a1 100644
--- a/exes/webhook/src/handler/make_service.rs
+++ b/exes/webhook/src/handler/make_service.rs
@@ -1,7 +1,7 @@
use super::handler::HandlerService;
use crate::config::Config;
use hyper::service::Service;
-use shared::nats_crate::Connection;
+use shared::nats_crate::Client;
use std::{
future::{ready, Ready},
sync::Arc,
@@ -11,7 +11,7 @@ use ed25519_dalek::PublicKey;
pub struct MakeSvc {
pub settings: Arc<Config>,
- pub nats: Arc<Connection>,
+ pub nats: Arc<Client>,
pub public_key: Arc<PublicKey>
}
diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs
index 9527d0f..336dd82 100644
--- a/exes/webhook/src/main.rs
+++ b/exes/webhook/src/main.rs
@@ -4,10 +4,10 @@ mod handler;
use crate::handler::make_service::MakeSvc;
use crate::config::Config;
-use shared::config::Settings;
-use shared::log::{error, info};
use ed25519_dalek::PublicKey;
use hyper::Server;
+use shared::config::Settings;
+use shared::log::{error, info};
#[tokio::main]
async fn main() {
@@ -35,7 +35,7 @@ async fn start(settings: Settings<Config>) {
Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap());
let server = Server::bind(&addr).serve(MakeSvc {
settings: config,
- nats: Arc::new(settings.nats.into()),
+ nats: Arc::new(settings.nats.to_client().await.unwrap()),
public_key: public_key,
});
diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml
index b3cdee7..6d6b6f6 100644
--- a/libs/shared/Cargo.toml
+++ b/libs/shared/Cargo.toml
@@ -1,7 +1,7 @@
[package]
name = "shared"
version = "0.1.0"
-edition = "2018"
+edition = "2021"
[dependencies]
pretty_env_logger = "0.4"
@@ -13,7 +13,7 @@ hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
enumflags2 = { version = "0.7.1", features = ["serde"] }
prometheus = { version = "0.13", features = ["process"] }
-nats = "0.23"
+async-nats = "0.25.1"
testcontainers = "0.14"
twilight-model = "0.14"
serde_json = { version = "1.0" }
diff --git a/libs/shared/src/error.rs b/libs/shared/src/error.rs
index 0899579..31b1dcd 100644
--- a/libs/shared/src/error.rs
+++ b/libs/shared/src/error.rs
@@ -1,5 +1,5 @@
use config::ConfigError;
-use std::fmt::Debug;
+use std::{fmt::Debug, io};
use thiserror::Error;
#[derive(Debug, Error)]
@@ -12,4 +12,7 @@ pub enum GenericError {
#[error("step `{0}` failed")]
StepFailed(String),
+
+ #[error("io error")]
+ Io(#[from] io::Error)
}
diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs
index 58df5e1..62d8689 100644
--- a/libs/shared/src/lib.rs
+++ b/libs/shared/src/lib.rs
@@ -1,5 +1,5 @@
pub use ::config as config_crate;
-pub use ::nats as nats_crate;
+pub use ::async_nats as nats_crate;
pub use ::redis as redis_crate;
pub use log;
pub use prometheus;
diff --git a/libs/shared/src/nats.rs b/libs/shared/src/nats.rs
index 0616398..05953cc 100644
--- a/libs/shared/src/nats.rs
+++ b/libs/shared/src/nats.rs
@@ -1,5 +1,8 @@
-use nats::{Connection, Options};
+use async_nats::Client;
use serde::Deserialize;
+use std::future::Future;
+
+use crate::error::GenericError;
#[derive(Clone, Debug, Deserialize)]
pub struct NatsConfigurationClientCert {
@@ -14,46 +17,13 @@ pub struct NatsConfigurationTls {
#[derive(Clone, Debug, Deserialize)]
pub struct NatsConfiguration {
- pub client_cert: Option<NatsConfigurationClientCert>,
- pub root_cert: Option<Vec<String>>,
- pub jetstream_api_prefix: Option<String>,
- pub max_reconnects: Option<usize>,
- pub reconnect_buffer_size: Option<usize>,
- pub tls: Option<NatsConfigurationTls>,
- pub client_name: Option<String>,
- pub tls_required: Option<bool>,
pub host: String,
}
// todo: Prefer From since it automatically gives a free Into implementation
// Allows the configuration to directly create a nats connection
-impl Into<Connection> for NatsConfiguration {
- fn into(self) -> Connection {
- let mut options = Options::new();
-
- if let Some(client_cert) = self.client_cert {
- options = options.client_cert(client_cert.cert, client_cert.key);
- }
-
- if let Some(root_certs) = self.root_cert {
- for root_cert in root_certs {
- options = options.add_root_certificate(root_cert);
- }
- }
-
- options = options.max_reconnects(self.max_reconnects);
- options = options.no_echo();
- options = options.reconnect_buffer_size(self.reconnect_buffer_size.unwrap_or(64 * 1024));
- options = options.tls_required(self.tls_required.unwrap_or(false));
- options = options.with_name(&self.client_name.unwrap_or_else(|| "Nova".to_string()));
-
- if let Some(tls) = self.tls {
- let mut config = nats::rustls::ClientConfig::new();
- config.set_mtu(&tls.mtu);
- // todo: more options?
- options = options.tls_client_config(config);
- }
-
- options.connect(&self.host).unwrap()
+impl NatsConfiguration {
+ pub async fn to_client(self) -> Result<Client, GenericError> {
+ Ok(async_nats::connect(self.host).await?)
}
-}
+} \ No newline at end of file