From 3f342846149c0b1f8d1ac1f0b857a9d9fdf2034b Mon Sep 17 00:00:00 2001 From: MatthieuCoder Date: Sat, 14 Jan 2023 17:45:48 +0400 Subject: [PATCH] clippy, tests and a bit of docs --- Cargo.lock | 255 ++++++++++++++++++ docs/architecture.md | 42 +++ docs/quickstart.md | 22 -- exes/all-in-one/Makefile | 15 -- exes/all-in-one/main.go | 72 ----- exes/all-in-one/src/errors.rs | 13 +- exes/all-in-one/src/ffi.rs | 8 +- exes/all-in-one/src/lib.rs | 11 + exes/all-in-one/src/utils.rs | 4 +- exes/cache/Cargo.toml | 8 +- exes/cache/README.md | 3 + exes/gateway/src/config.rs | 4 +- exes/gateway/src/lib.rs | 94 ++++--- exes/ratelimit/Cargo.toml | 14 +- exes/ratelimit/bench/req.rs | 0 exes/ratelimit/benches/bucket.rs | 56 ++++ exes/ratelimit/src/buckets/atomic_instant.rs | 23 +- exes/ratelimit/src/buckets/bucket.rs | 207 ++++++++++++-- exes/ratelimit/src/buckets/mod.rs | 13 + exes/ratelimit/src/buckets/noop_lock.rs | 18 ++ exes/ratelimit/src/buckets/redis_lock.rs | 131 +++++---- exes/ratelimit/src/config.rs | 2 +- exes/ratelimit/src/grpc.rs | 18 +- exes/ratelimit/src/lib.rs | 18 +- exes/rest/src/config.rs | 2 +- exes/rest/src/handler.rs | 10 +- exes/rest/src/lib.rs | 16 +- exes/rest/src/ratelimit_client/mod.rs | 22 +- .../src/ratelimit_client/remote_hashring.rs | 6 +- exes/webhook/src/config.rs | 2 +- exes/webhook/src/handler/error.rs | 36 --- exes/webhook/src/handler/make_service.rs | 2 +- exes/webhook/src/handler/mod.rs | 140 ++++------ exes/webhook/src/handler/signature.rs | 9 +- exes/webhook/src/handler/tests/signature.rs | 18 +- exes/webhook/src/lib.rs | 16 +- libs/leash/src/lib.rs | 11 + libs/shared/src/config.rs | 14 +- libs/shared/src/lib.rs | 13 +- libs/shared/src/nats.rs | 17 +- libs/shared/src/payloads.rs | 2 +- libs/shared/src/redis.rs | 6 +- 42 files changed, 943 insertions(+), 450 deletions(-) create mode 100644 docs/architecture.md delete mode 100644 exes/all-in-one/Makefile delete mode 100644 exes/all-in-one/main.go create mode 100644 exes/cache/README.md delete mode 100644 exes/ratelimit/bench/req.rs create mode 100644 exes/ratelimit/benches/bucket.rs create mode 100644 exes/ratelimit/src/buckets/noop_lock.rs delete mode 100644 exes/webhook/src/handler/error.rs diff --git a/Cargo.lock b/Cargo.lock index 24585f9..4588509 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,12 @@ dependencies = [ "webhook", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anyhow" version = "1.0.68" @@ -301,6 +307,12 @@ dependencies = [ "twilight-model", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cbindgen" version = "0.24.3" @@ -332,6 +344,33 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "ciborium" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c137568cc60b904a7724001b35ce2630fd00d5d84805fbb608ab89509d788f" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "346de753af073cc87b52b2083a506b38ac176a44cfb05497b622e27be899b369" + +[[package]] +name = "ciborium-ll" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213030a2b5a4e0c0892b6652260cf6ccac84827b83a85a534e178e3906c4cf1b" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clap" version = "3.2.23" @@ -435,6 +474,44 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" +dependencies = [ + "anes", + "atty", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "itertools", + "lazy_static", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -445,6 +522,30 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-utils" version = "0.8.14" @@ -844,6 +945,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" + [[package]] name = "hashbrown" version = "0.12.3" @@ -1185,6 +1292,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.16" @@ -1315,6 +1431,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + [[package]] name = "opaque-debug" version = "0.3.0" @@ -1598,6 +1720,34 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" +[[package]] +name = "plotters" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b639e642295546c50fcd545198c9d64ee2a38620a628724a3b266d5fbf97" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193228616381fecdc1224c62e96946dfbc73ff4384fba576e052ff8c1bea8142" + +[[package]] +name = "plotters-svg" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9a81d2759aae1dae668f783c308bc5c8ebd191ff4184aaa1b37f65a6ae5a56f" +dependencies = [ + "plotters-backend", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1779,6 +1929,8 @@ name = "ratelimit" version = "0.1.0" dependencies = [ "anyhow", + "criterion", + "env_logger 0.7.1", "hyper", "leash", "opentelemetry", @@ -1787,14 +1939,40 @@ dependencies = [ "redis", "serde", "shared", + "test-log", "tokio", "tokio-stream", + "tokio-test", "tonic", "tracing", "tracing-opentelemetry", + "tracing-subscriber", + "tracing-test", "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)", ] +[[package]] +name = "rayon" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db3a213adf02b3bcfd2d3846bb41cb22857d131789e01df434fb7e7bc0759b7" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cac410af5d00ab6884528b4ab69d1e8e146e8d471201800fa1b4524126de6ad3" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + [[package]] name = "redis" version = "0.22.1" @@ -1991,6 +2169,15 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.20" @@ -2330,6 +2517,17 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "test-log" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f0c854faeb68a048f0f2dc410c5ddae3bf83854ef0e4977d58306a5edef50e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "textwrap" version = "0.16.0" @@ -2392,6 +2590,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -2481,6 +2689,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-test" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.17.2" @@ -2704,6 +2925,29 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-test" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3d272c44878d2bbc9f4a20ad463724f03e19dbc667c6e84ac433ab7ffcc70b" +dependencies = [ + "lazy_static", + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744324b12d69a9fc1edea4b38b7b1311295b662d161ad5deac17bb1358224a08" +dependencies = [ + "lazy_static", + "quote", + "syn", +] + [[package]] name = "try-lock" version = "0.2.3" @@ -2909,6 +3153,17 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "want" version = "0.3.0" diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..b30f98f --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,42 @@ +# Nova architecture + +The nova architecture is composed of multiple components. Each of them is horizontally scale-able. + +``` + ┌──────────────────┐ + │ │ + ┌─────────────┤ Discord API ├──────────────┐ + │ │ │ │ + │ └────────┬─────────┘ │ + │ │ │ + │ │ │ + │ │ │ + ┌─────────┴────────┐ ┌────────┴─────────┐ ┌─────────┴────────┐ + │ │ │ │ │ │ + │ Rest Proxy │ │ Gateway client │ │ Webhook Server │ + │ │ │ │ │ │ + └─────────┬──┬─────┘ └────────┬─────────┘ └─────────┬────────┘ + │ │ │ │ + │ │ │ │ + │ │ │ │ + │ │ │ │ + │ │ │ │ + │ │ │ │ + │ └───────┐ │ │ +┌────────────────┐ ┌────────┴───────┐ │ ┌───────┴────────┐ │ +│ │ │ │ │ │ ├───────────────┘ +│ Redis ├───┤ Ratelimit │ │ │ Nats broker │ +│ │ │ │ │ │ ├──────────────────┐ +└────────────────┘ └────────────────┘ │ └───────┬────────┘ │ + │ │ │ + │ │ │ + │ ┌───────┴────────┐ ┌──────┴─────┐ + │ │ │ │ │ + │ │ Cache manager ├───────────┤ User │ + │ │ │ │ │ + │ └────────────────┘ └──────┬─────┘ + └───────────────────────────────────────┘ +``` + +## Rest Proxy + diff --git a/docs/quickstart.md b/docs/quickstart.md index 1eace72..d4ca78b 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -1,24 +1,2 @@ # 5 Minutes quickstart -This page shows you how to start a new nova project -under five minutes using a typescript project, -hold tight this is going to be fast. - -## Requirements - -* A discord bot application available -* [Docker](https://docker.io/) (or alternatives) available to you. -* A domain name / [ngrok.io](https://ngrok.com/) domain (for webhooks only) - -> If you are deploying nova to production, consider following the -> production guide instead. - -## Setting up a nova instance - -Clone the [example repo](https://github.com/discordnova/nova-quickstart.git) like so - -`git clone https://github.com/discordnova/nova-quickstart.git`, - -In this folder, find the `config.yml.example` file and rename it to match `config.yml` - -Next, you need to fill all the environment variables to match your discord bot. diff --git a/exes/all-in-one/Makefile b/exes/all-in-one/Makefile deleted file mode 100644 index 8ed17c4..0000000 --- a/exes/all-in-one/Makefile +++ /dev/null @@ -1,15 +0,0 @@ -clean: - rm ./build/* - -library: - cargo build --release - -build: library - cp ../../target/release/liball_in_one.a ./build - go build -a -ldflags '-s' -o build/all-in-one - -all: library build - -run: all-in-one - ./build/all-in-one - diff --git a/exes/all-in-one/main.go b/exes/all-in-one/main.go deleted file mode 100644 index 1de08db..0000000 --- a/exes/all-in-one/main.go +++ /dev/null @@ -1,72 +0,0 @@ -package main - -/* -#cgo LDFLAGS: -L./build -lall_in_one -lz -lm -#include "./build/all-in-one.h" -*/ -import "C" - -import ( - "fmt" - "os" - "os/signal" - "syscall" - "time" - - "github.com/Jeffail/gabs" - "github.com/alicebob/miniredis/v2" - - server "github.com/nats-io/nats-server/v2/server" -) - -func main() { - // Intialise les logs de la librarie Rust - C.init_logs() - // Charge la configuration - str := C.GoString(C.load_config()) - - // Démarre une instance MiniRedis - mr := miniredis.NewMiniRedis() - err := mr.Start() - - if err != nil { - panic(err) - } - - // Démarre un serveur Nats - opts := &server.Options{} - opts.Host = "0.0.0.0" - ns, err := server.NewServer(opts) - - if err != nil { - panic(err) - } - - go ns.Start() - - if !ns.ReadyForConnections(4 * time.Second) { - panic("not ready for connection") - } - - // Edite le json de configuration donné - // Et injecte la configuration des servers Nats et MiniRedis - json, _ := gabs.ParseJSON([]byte(str)) - json.Set(fmt.Sprintf("redis://%s", mr.Addr()), "redis", "url") - json.Set("localhost", "nats", "host") - json.Set(1, "webhook", "discord", "client_id") - - // Démarre une instance de nova - instance := C.start_instance(C.CString(json.String())) - - // Wait for a SIGINT - c := make(chan os.Signal, 1) - signal.Notify(c, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - <-c - - println("Arret de nova all in one") - C.stop_instance(instance) -} diff --git a/exes/all-in-one/src/errors.rs b/exes/all-in-one/src/errors.rs index d676e8d..1d2a9e2 100644 --- a/exes/all-in-one/src/errors.rs +++ b/exes/all-in-one/src/errors.rs @@ -1,7 +1,6 @@ use std::cell::RefCell; use anyhow::Result; -use libc::c_int; use tracing::error; thread_local! { @@ -9,7 +8,7 @@ thread_local! { } /// Update the most recent error, clearing whatever may have been there before. -pub fn stacktrace(err: anyhow::Error) -> String { +#[must_use] pub fn stacktrace(err: &anyhow::Error) -> String { format!("{err}") } @@ -23,13 +22,15 @@ where Ok(ok) => Some(ok), Err(error) => { // Call the handler - handle_error(error); + handle_error(&error); None } } } -pub fn handle_error(error: anyhow::Error) { +/// # Panics +/// Panics if the stacktrace size is > than an i32 +pub fn handle_error(error: &anyhow::Error) { ERROR_HANDLER.with(|val| { let mut stacktrace = stacktrace(error); @@ -38,8 +39,8 @@ pub fn handle_error(error: anyhow::Error) { // Call the error handler unsafe { func( - stacktrace.len() as c_int + 1, - stacktrace.as_mut_ptr() as *mut i8, + (stacktrace.len() + 1).try_into().unwrap(), + stacktrace.as_mut_ptr().cast::(), ); } } diff --git a/exes/all-in-one/src/ffi.rs b/exes/all-in-one/src/ffi.rs index 7a8821f..449d1cc 100644 --- a/exes/all-in-one/src/ffi.rs +++ b/exes/all-in-one/src/ffi.rs @@ -69,8 +69,12 @@ pub unsafe extern "C" fn stop_instance(instance: *mut AllInOneInstance) { }); } +/// # Panics +/// Panics if an incorrect `RUST_LOG` variables is specified. #[no_mangle] pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneInstance { + // Returning a null pointer (unaligned) is expected. + #[allow(clippy::cast_ptr_alignment)] wrap_result(move || { let value = CString::from_raw(config); let json = value.to_str()?; @@ -87,7 +91,7 @@ pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneIn .with(fmt::layer()) .with( EnvFilter::builder() - .with_default_directive(Directive::from_str("info").unwrap()) + .with_default_directive(Directive::from_str("info").expect("")) .from_env() .unwrap(), ) @@ -96,7 +100,7 @@ pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneIn // Error handling task runtime.spawn(async move { while let Some(error) = errors.recv().await { - handle_error(error) + handle_error(&error); } }); diff --git a/exes/all-in-one/src/lib.rs b/exes/all-in-one/src/lib.rs index 74625c0..493bf46 100644 --- a/exes/all-in-one/src/lib.rs +++ b/exes/all-in-one/src/lib.rs @@ -1,3 +1,14 @@ +#![deny( + clippy::all, + clippy::correctness, + clippy::suspicious, + clippy::style, + clippy::complexity, + clippy::perf, + clippy::pedantic, + clippy::nursery, +)] + pub mod errors; pub mod ffi; pub mod utils; diff --git a/exes/all-in-one/src/utils.rs b/exes/all-in-one/src/utils.rs index 7a7134d..159d98d 100644 --- a/exes/all-in-one/src/utils.rs +++ b/exes/all-in-one/src/utils.rs @@ -26,7 +26,7 @@ fn load_settings_for( name: &str, ) -> Result> { let value: Value = serde_json::from_str(settings)?; - let section: T = serde_json::from_value(value.get(name).unwrap().to_owned())?; + let section: T = serde_json::from_value(value.get(name).unwrap().clone())?; let mut settings: Settings = serde_json::from_value(value)?; settings.config = section; @@ -69,7 +69,7 @@ pub(crate) fn load_config_file() -> Result { let mode = std::env::var("ENV").unwrap_or_else(|_| "development".into()); info!("Configuration Environment: {}", mode); - builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false)); + builder = builder.add_source(File::with_name(&format!("config/{mode}")).required(false)); builder = builder.add_source(File::with_name("config/local").required(false)); let env = Environment::with_prefix("NOVA").separator("__"); diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml index 349deaa..70cb6a3 100644 --- a/exes/cache/Cargo.toml +++ b/exes/cache/Cargo.toml @@ -2,8 +2,12 @@ name = "cache" version = "0.1.0" edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +description = "Stores the data from discord if needed" +readme = "README.md" +repository = "https://github.com/discordnova/nova.git" +keywords = ["discord", "scaleable", "cache"] +categories = ["microservices", "nova"] +license = "APACHE2" [dependencies] shared = { path = "../../libs/shared" } diff --git a/exes/cache/README.md b/exes/cache/README.md new file mode 100644 index 0000000..0395009 --- /dev/null +++ b/exes/cache/README.md @@ -0,0 +1,3 @@ +# Cache + +Stores the data from discord if needed \ No newline at end of file diff --git a/exes/gateway/src/config.rs b/exes/gateway/src/config.rs index 7b12bfe..a1a2fad 100644 --- a/exes/gateway/src/config.rs +++ b/exes/gateway/src/config.rs @@ -2,14 +2,14 @@ use serde::{Deserialize, Serialize}; use twilight_gateway::Intents; #[derive(Serialize, Deserialize, Clone)] -pub struct GatewayConfig { +pub struct Gateway { pub token: String, pub intents: Intents, pub shard: u64, pub shard_total: u64, } -impl Default for GatewayConfig { +impl Default for Gateway { fn default() -> Self { Self { intents: Intents::empty(), diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs index ec3337b..e54bb5c 100644 --- a/exes/gateway/src/lib.rs +++ b/exes/gateway/src/lib.rs @@ -1,5 +1,17 @@ +#![deny( + clippy::all, + clippy::correctness, + clippy::suspicious, + clippy::style, + clippy::complexity, + clippy::perf, + clippy::pedantic, + clippy::nursery, + unsafe_code +)] +#![allow(clippy::redundant_pub_crate)] use async_nats::{Client, HeaderMap, HeaderValue}; -use config::GatewayConfig; +use config::Gateway; use leash::{AnyhowResultFuture, Component}; use opentelemetry::{global, propagation::Injector}; use shared::{ @@ -12,21 +24,21 @@ use tokio_stream::StreamExt; use tracing_opentelemetry::OpenTelemetrySpanExt; use twilight_gateway::{Event, Shard}; pub mod config; -use tracing::{debug, info, trace_span}; +use tracing::{debug, error, info, trace_span}; use twilight_model::gateway::event::DispatchEvent; struct MetadataMap<'a>(&'a mut HeaderMap); impl<'a> Injector for MetadataMap<'a> { fn set(&mut self, key: &str, value: String) { - self.0.insert(key, HeaderValue::from_str(&value).unwrap()) + self.0.insert(key, HeaderValue::from_str(&value).unwrap()); } } pub struct GatewayServer {} impl Component for GatewayServer { - type Config = GatewayConfig; + type Config = Gateway; const SERVICE_NAME: &'static str = "gateway"; fn start( @@ -35,7 +47,7 @@ impl Component for GatewayServer { mut stop: oneshot::Receiver<()>, ) -> AnyhowResultFuture<()> { Box::pin(async move { - let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) + let (shard, mut events) = Shard::builder(settings.token.clone(), settings.intents) .shard(settings.shard, settings.shard_total)? .build(); @@ -48,40 +60,10 @@ impl Component for GatewayServer { loop { select! { event = events.next() => { - if let Some(event) = event { - match event { - Event::Ready(ready) => { - info!("Logged in as {}", ready.user.name); - }, - - _ => { - - let name = event.kind().name(); - if let Ok(dispatch_event) = DispatchEvent::try_from(event) { - debug!("handling event {}", name.unwrap()); - - let data = CachePayload { - data: DispatchEventTagged { - data: dispatch_event, - }, - }; - let value = serde_json::to_string(&data)?; - let bytes = bytes::Bytes::from(value); - - let span = trace_span!("nats send"); - - let mut header_map = HeaderMap::new(); - let context = span.context(); - global::get_text_map_propagator(|propagator| { - propagator.inject_context(&context, &mut MetadataMap(&mut header_map)) - }); - - nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes) - .await?; - } - } - } + let _ = handle_event(event, &nats) + .await + .map_err(|err| error!(error = ?err, "event publish failed")); } else { break } @@ -101,3 +83,39 @@ impl Component for GatewayServer { Self {} } } + +async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> { + if let Event::Ready(ready) = event { + info!("Logged in as {}", ready.user.name); + } else { + let name = event.kind().name(); + if let Ok(dispatch_event) = DispatchEvent::try_from(event) { + debug!("handling event {}", name.unwrap()); + + let data = CachePayload { + data: DispatchEventTagged { + data: dispatch_event, + }, + }; + let value = serde_json::to_string(&data)?; + let bytes = bytes::Bytes::from(value); + + let span = trace_span!("nats send"); + + let mut header_map = HeaderMap::new(); + let context = span.context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&context, &mut MetadataMap(&mut header_map)); + }); + + nats.publish_with_headers( + format!("nova.cache.dispatch.{}", name.unwrap()), + header_map, + bytes, + ) + .await?; + } + } + + Ok(()) +} diff --git a/exes/ratelimit/Cargo.toml b/exes/ratelimit/Cargo.toml index d82d8c9..3989735 100644 --- a/exes/ratelimit/Cargo.toml +++ b/exes/ratelimit/Cargo.toml @@ -26,4 +26,16 @@ tonic = "0.8.3" tokio-stream = "0.1.11" -redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] } \ No newline at end of file +redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] } + +[dev-dependencies] +criterion = { version = "0.4", features = ["async_tokio"] } +tokio-test = "*" +tracing-test = "0.2.3" +tracing-subscriber = "*" +env_logger = "*" +test-log = { version = "0.2.11", features = ["log", "trace"] } + +[[bench]] +name = "bucket" +harness = false diff --git a/exes/ratelimit/bench/req.rs b/exes/ratelimit/bench/req.rs deleted file mode 100644 index e69de29..0000000 diff --git a/exes/ratelimit/benches/bucket.rs b/exes/ratelimit/benches/bucket.rs new file mode 100644 index 0000000..d049d62 --- /dev/null +++ b/exes/ratelimit/benches/bucket.rs @@ -0,0 +1,56 @@ +use std::ops::Add; +use std::time::{Duration, SystemTime}; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use ratelimit::buckets::bucket::Bucket; +use tokio::runtime::Runtime; +use twilight_http_ratelimiting::RatelimitHeaders; + +pub fn acquire_ticket(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let bucket = rt.block_on(async move { + let bucket = Bucket::new(); + + let mreset = SystemTime::now() + .add(Duration::from_secs(3600)) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() + .to_string(); + let headers = [ + ( + "x-ratelimit-bucket", + "d721dea6054f6322373d361f98e5c38b".as_bytes(), + ), + ("x-ratelimit-limit", "100".as_bytes()), + ("x-ratelimit-remaining", "1".as_bytes()), + ("x-ratelimit-reset", mreset.as_bytes()), + ("x-ratelimit-reset-after", "100000.000".as_bytes()), + ]; + if let RatelimitHeaders::Present(present) = + RatelimitHeaders::from_pairs(headers.into_iter()).unwrap() + { + bucket.update( + &present, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + ); + } + bucket + }); + + let size: usize = 1024; + c.bench_with_input(BenchmarkId::new("input_example", size), &size, |b, _| { + // Insert a call to `to_async` to convert the bencher to async mode. + // The timing loops are the same as with the normal bencher. + b.to_async(&rt).iter(|| async { + bucket.ticket().await.unwrap(); + }); + }); +} + +criterion_group!(benches, acquire_ticket); +criterion_main!(benches); diff --git a/exes/ratelimit/src/buckets/atomic_instant.rs b/exes/ratelimit/src/buckets/atomic_instant.rs index 67dd0ee..992adb9 100644 --- a/exes/ratelimit/src/buckets/atomic_instant.rs +++ b/exes/ratelimit/src/buckets/atomic_instant.rs @@ -3,33 +3,46 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; +use tracing::debug; + #[derive(Default, Debug)] pub struct AtomicInstant(AtomicU64); impl AtomicInstant { + #[must_use] pub const fn empty() -> Self { Self(AtomicU64::new(0)) } pub fn elapsed(&self) -> Duration { + // Truncation is expected + #[allow(clippy::cast_possible_truncation)] Duration::from_millis( SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .expect("time went backwards") .as_millis() as u64 - - self.0.load(Ordering::SeqCst), + - self.0.load(Ordering::Relaxed), ) } pub fn as_millis(&self) -> u64 { - self.0.load(Ordering::SeqCst) + self.0.load(Ordering::Relaxed) } pub fn set_millis(&self, millis: u64) { - self.0.store(millis, Ordering::SeqCst); + // get address of struct + let b = self as *const _ as usize; + debug!(millis, this = ?b, "settings instant millis"); + self.0.store(millis, Ordering::Relaxed); } pub fn is_empty(&self) -> bool { - self.as_millis() == 0 + let millis = self.as_millis(); + // get address of struct + let b = self as *const _ as usize; + debug!(millis, this = ?b, "settings instant millis"); + debug!(empty = (millis == 0), millis, this = ?b, "instant empty check"); + millis == 0 } } diff --git a/exes/ratelimit/src/buckets/bucket.rs b/exes/ratelimit/src/buckets/bucket.rs index f8fa8b9..c40f059 100644 --- a/exes/ratelimit/src/buckets/bucket.rs +++ b/exes/ratelimit/src/buckets/bucket.rs @@ -1,3 +1,4 @@ +use super::{async_queue::AsyncQueue, atomic_instant::AtomicInstant}; use std::{ sync::{ atomic::{AtomicU64, Ordering}, @@ -9,8 +10,6 @@ use tokio::{sync::oneshot, task::JoinHandle}; use tracing::{debug, trace}; use twilight_http_ratelimiting::headers::Present; -use super::{async_queue::AsyncQueue, atomic_instant::AtomicInstant, redis_lock::RedisLock}; - #[derive(Clone, Debug)] pub enum TimeRemaining { Finished, @@ -18,22 +17,57 @@ pub enum TimeRemaining { Some(Duration), } +/// A bucket is a simple atomic implementation of a bucket used for ratelimiting +/// It can be updated dynamically depending on the discord api responses. +/// +/// # Usage +/// ``` +/// use ratelimit::buckets::bucket::Bucket; +/// use twilight_http_ratelimiting::RatelimitHeaders; +/// use std::time::SystemTime; +/// +/// let bucket = Bucket::new(); +/// +/// // Feed the headers informations into the bucket to update it +/// let headers = [ +/// ( "x-ratelimit-bucket", "bucket id".as_bytes()), +/// ("x-ratelimit-limit", "100".as_bytes()), +/// ("x-ratelimit-remaining", "0".as_bytes()), +/// ("x-ratelimit-reset", "".as_bytes()), +/// ("x-ratelimit-reset-after", "10.000".as_bytes()), +/// ]; +/// +/// // Parse the headers +/// let present = if let Ok(RatelimitHeaders::Present(present)) = RatelimitHeaders::from_pairs(headers.into_iter()) { present } else { todo!() }; +/// +/// // this should idealy the time of the request +/// let current_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64; +/// +/// bucket.update(present, current_time).await; +/// +/// ``` +/// +/// # Async +/// You need to call this struct new method in a tokio 1.x async runtime. #[derive(Debug)] pub struct Bucket { + /// Limits of tickets that can be accepted pub limit: AtomicU64, - /// Queue associated with this bucket. - pub queue: AsyncQueue, - /// Number of tickets remaining. + /// Remaining requests that can be executed pub remaining: AtomicU64, - /// Duration after the [`Self::last_update`] time the bucket will refresh. + /// Time to wait after [`Self::last_update`] before accepting new tickets. pub reset_after: AtomicU64, - /// When the bucket's ratelimit refresh countdown started (unix millis) + /// Last update got from the discord upstream pub last_update: AtomicInstant, - pub tasks: Vec>, + /// List of tasks that dequeue tasks from [`Self::queue`] + tasks: Vec>, + /// Queue of tickets to be processed. + queue: AsyncQueue, } impl Drop for Bucket { + /// Simply abord the dequeue tasks to aboid leaking memory via arc(s) fn drop(&mut self) { for join in &self.tasks { join.abort(); @@ -42,8 +76,11 @@ impl Drop for Bucket { } impl Bucket { - /// Create a new bucket for the specified [`Path`]. - pub fn new(global: Arc) -> Arc { + /// Creates a new bucket with four dequeue tasks + /// # Async + /// This functions **should** be called in a tokio 1.x runtime, otherwise the function *will* panic. + #[must_use] + pub fn new() -> Arc { let tasks = vec![]; let this = Arc::new(Self { @@ -58,19 +95,16 @@ impl Bucket { // Run with 4 dequeue tasks for _ in 0..4 { let this = this.clone(); - let global = global.clone(); tokio::spawn(async move { + // continuously wait for elements in the queue to process them sequantially. + // this is using parallel tasks to allow (hopefully) better performance. while let Some(element) = this.queue.pop().await { - // we need to wait - if let Some(duration) = global.locked_for().await { - tokio::time::sleep(duration).await; - } - if this.remaining() == 0 { debug!("0 tickets remaining, we have to wait."); match this.time_remaining() { TimeRemaining::Finished => { + debug!("waiting seems finished."); this.try_reset(); } TimeRemaining::Some(duration) => { @@ -79,7 +113,9 @@ impl Bucket { this.try_reset(); } - TimeRemaining::NotStarted => {} + TimeRemaining::NotStarted => { + debug!("we should not wait"); + } } } @@ -94,19 +130,17 @@ impl Bucket { this } - /// Total number of tickets allotted in a cycle. + /// Total number of tickets allowed in a cycle. pub fn limit(&self) -> u64 { self.limit.load(Ordering::Relaxed) } - /// Number of tickets remaining. + /// Number of tickets remaining in the current cycle. pub fn remaining(&self) -> u64 { self.remaining.load(Ordering::Relaxed) } - /// Duration after the [`started_at`] time the bucket will refresh. - /// - /// [`started_at`]: Self::started_at + /// Duration after the [`Self::last_update`] time the bucket will refresh. pub fn reset_after(&self) -> u64 { self.reset_after.load(Ordering::Relaxed) } @@ -117,6 +151,10 @@ impl Bucket { let last_update = &self.last_update; if last_update.is_empty() { + debug!("last update is empty"); + + TimeRemaining::NotStarted + } else { let elapsed = last_update.elapsed(); if elapsed > Duration::from_millis(reset_after) { @@ -124,16 +162,12 @@ impl Bucket { } TimeRemaining::Some(Duration::from_millis(reset_after) - elapsed) - } else { - TimeRemaining::NotStarted } } - /// Try to reset this bucket's [`started_at`] value if it has finished. + /// Try to reset this bucket's [`Self::last_update`] value if it has finished. /// /// Returns whether resetting was possible. - /// - /// [`started_at`]: Self::started_at pub fn try_reset(&self) -> bool { if self.last_update.is_empty() { return false; @@ -150,10 +184,12 @@ impl Bucket { } /// Update this bucket's ratelimit data after a request has been made. - pub fn update(&self, ratelimits: Present, time: u64) { + /// The time of the request should be given. + pub fn update(&self, ratelimits: &Present, time: u64) { let bucket_limit = self.limit(); if self.last_update.is_empty() { + debug!(millis = time, "updated the last update time"); self.last_update.set_millis(time); } @@ -167,9 +203,122 @@ impl Bucket { .store(ratelimits.remaining(), Ordering::Relaxed); } - pub async fn ticket(&self) -> oneshot::Receiver<()> { + /// Submits a ticket to the queue + /// A oneshot receiver is returned and will be called when the ticket is accepted. + pub fn ticket(&self) -> oneshot::Receiver<()> { let (tx, rx) = oneshot::channel(); self.queue.push(tx); rx } } + +#[cfg(test)] +mod tests { + use std::{ + ops::Add, + time::{Duration, Instant, SystemTime}, + }; + + use tokio::time::timeout; + use tracing::info; + use twilight_http_ratelimiting::RatelimitHeaders; + + use super::Bucket; + + #[test_log::test(tokio::test)] + async fn should_ratelimit() { + let bucket = Bucket::new(); + + // Intialize a bucket with one remaining ticket + // and that resets in oue hour + let mreset = SystemTime::now() + .add(Duration::from_secs(100)) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() + .to_string(); + let headers: [(&str, &[u8]); 5] = [ + ("x-ratelimit-bucket", b"123"), + ("x-ratelimit-limit", b"100"), + ("x-ratelimit-remaining", b"1"), + ("x-ratelimit-reset", mreset.as_bytes()), + ("x-ratelimit-reset-after", b"100.000"), + ]; + if let RatelimitHeaders::Present(present) = + RatelimitHeaders::from_pairs(headers.into_iter()).unwrap() + { + // Integer truncating is expected + #[allow(clippy::cast_possible_truncation)] + bucket.update( + &present, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + ); + } + + let ticket = bucket.ticket(); + + info!("first request"); + // We should accept one ticket + let respo = timeout(Duration::from_secs(10), ticket).await; + assert!(respo.is_ok()); + + info!("second request"); + + let ticket = bucket.ticket(); + // We should accept one ticket + let respo = timeout(Duration::from_secs(1), ticket).await; + + // the ticket should not have responded because the queue is locked + assert!(respo.is_err()); + } + + #[test_log::test(tokio::test)] + async fn should_block_until_possible() { + let bucket = Bucket::new(); + + // Intialize a bucket with one remaining ticket + // and that resets in oue hour + let mreset = SystemTime::now() + .add(Duration::from_secs(100)) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() + .to_string(); + let headers: [(&str, &[u8]); 5] = [ + ("x-ratelimit-bucket", b"123"), + ("x-ratelimit-limit", b"100"), + ("x-ratelimit-remaining", b"0"), + ("x-ratelimit-reset", mreset.as_bytes()), + ("x-ratelimit-reset-after", b"100.000"), + ]; + + if let RatelimitHeaders::Present(present) = + RatelimitHeaders::from_pairs(headers.into_iter()).unwrap() + { + // Integer truncating is expected + #[allow(clippy::cast_possible_truncation)] + bucket.update( + &present, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + ); + } + + let ticket = bucket.ticket(); + let start = Instant::now(); + + // in this case, the ratelimiter should wait 10 seconds + let respo = timeout(Duration::from_secs(12), ticket).await; + let end = start.elapsed().as_secs(); + + // we should have waited 10 seconds (+- 1s) + assert_eq!(10, end); + // and the ticket should be a success + assert!(respo.is_ok()); + } +} diff --git a/exes/ratelimit/src/buckets/mod.rs b/exes/ratelimit/src/buckets/mod.rs index c86d623..5263765 100644 --- a/exes/ratelimit/src/buckets/mod.rs +++ b/exes/ratelimit/src/buckets/mod.rs @@ -1,4 +1,17 @@ +use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; + pub mod async_queue; pub mod atomic_instant; pub mod bucket; +pub mod noop_lock; pub mod redis_lock; + +pub trait GlobalLock: Send + Sync { + fn lock_for<'a>( + self: &'a Arc, + duration: Duration, + ) -> Pin + Send + 'a>>; + fn is_locked<'a>( + self: &'a Arc, + ) -> Pin> + Send + 'a>>; +} diff --git a/exes/ratelimit/src/buckets/noop_lock.rs b/exes/ratelimit/src/buckets/noop_lock.rs new file mode 100644 index 0000000..614eba3 --- /dev/null +++ b/exes/ratelimit/src/buckets/noop_lock.rs @@ -0,0 +1,18 @@ +use super::GlobalLock; + +pub struct NoOpLock; +impl GlobalLock for NoOpLock { + fn lock_for<'a>( + self: &'a std::sync::Arc, + _duration: std::time::Duration, + ) -> std::pin::Pin + Send + 'a>> { + Box::pin(async move {}) + } + + fn is_locked<'a>( + self: &'a std::sync::Arc, + ) -> std::pin::Pin> + Send + 'a>> + { + Box::pin(async move { None }) + } +} diff --git a/exes/ratelimit/src/buckets/redis_lock.rs b/exes/ratelimit/src/buckets/redis_lock.rs index fdae149..333d726 100644 --- a/exes/ratelimit/src/buckets/redis_lock.rs +++ b/exes/ratelimit/src/buckets/redis_lock.rs @@ -1,4 +1,6 @@ use std::{ + future::Future, + pin::Pin, sync::{atomic::AtomicU64, Arc}, time::{Duration, SystemTime}, }; @@ -7,7 +9,9 @@ use redis::{aio::MultiplexedConnection, AsyncCommands}; use tokio::sync::Mutex; use tracing::debug; -/// This is flawed and needs to be replaced sometime with the real RedisLock algorithm +use super::GlobalLock; + +/// This is flawed and needs to be replaced sometime with the real `RedisLock` algorithm #[derive(Debug)] pub struct RedisLock { redis: Mutex, @@ -15,70 +19,89 @@ pub struct RedisLock { } impl RedisLock { - /// Set the global ratelimit as exhausted. - pub async fn lock_for(self: &Arc, duration: Duration) { - debug!("locking globally for {}", duration.as_secs()); - let _: () = self - .redis - .lock() - .await - .set_ex( - "nova:rls:lock", - 1, - (duration.as_secs() + 1).try_into().unwrap(), - ) - .await - .unwrap(); - - self.is_locked.store( - (SystemTime::now() + duration) - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64, - std::sync::atomic::Ordering::Relaxed, - ); + #[must_use] + pub fn new(redis: MultiplexedConnection) -> Arc { + Arc::new(Self { + redis: Mutex::new(redis), + is_locked: AtomicU64::new(0), + }) } +} + +impl GlobalLock for RedisLock { + fn lock_for<'a>( + self: &'a Arc, + duration: Duration, + ) -> Pin + Send + 'a>> { + Box::pin(async move { + debug!("locking globally for {}", duration.as_secs()); + let _: () = self + .redis + .lock() + .await + .set_ex( + "nova:rls:lock", + 1, + (duration.as_secs() + 1).try_into().unwrap(), + ) + .await + .unwrap(); - pub async fn locked_for(self: &Arc) -> Option { - let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed); - if load != 0 { - if load - > SystemTime::now() + // Integer truncating is expected + #[allow(clippy::cast_possible_truncation)] + self.is_locked.store( + (SystemTime::now() + duration) .duration_since(SystemTime::UNIX_EPOCH) .unwrap() - .as_millis() as u64 - { - return Some(Duration::from_millis(load)); - } else { + .as_millis() as u64, + std::sync::atomic::Ordering::Relaxed, + ); + }) + } + + fn is_locked<'a>( + self: &'a Arc, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed); + if load != 0 { + // Integer truncating is expected + #[allow(clippy::cast_possible_truncation)] + if load + > SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64 + { + return Some(Duration::from_millis(load)); + } self.is_locked .store(0, std::sync::atomic::Ordering::Relaxed); } - } - let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await; - match result { - Ok(remaining_time) => { - if remaining_time > 0 { - let duration = Duration::from_secs(remaining_time as u64); - debug!("external global lock detected, locking"); - self.lock_for(duration).await; - Some(duration) - } else { - None + let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await; + match result { + Ok(remaining_time) => { + if remaining_time > 0 { + // Sign loss is allowed since we know it's a positive number + // because a ttl is always positive when the key exists and have a ttl + // otherwise redis *will* return a negative number, hence the check for + // a positive sign. + #[allow(clippy::cast_sign_loss)] + let duration = Duration::from_secs(remaining_time as u64); + debug!("external global lock detected, locking"); + self.lock_for(duration).await; + Some(duration) + } else { + None + } } - } - Err(error) => { - debug!("redis call failed: {}", error); + Err(error) => { + debug!("redis call failed: {}", error); - None + None + } } - } - } - - pub fn new(redis: MultiplexedConnection) -> Arc { - Arc::new(Self { - redis: Mutex::new(redis), - is_locked: AtomicU64::new(0), }) } } diff --git a/exes/ratelimit/src/config.rs b/exes/ratelimit/src/config.rs index df18b76..4114c7b 100644 --- a/exes/ratelimit/src/config.rs +++ b/exes/ratelimit/src/config.rs @@ -18,6 +18,6 @@ impl Default for ServerSettings { } #[derive(Debug, Deserialize, Clone, Default)] -pub struct RatelimitServerConfig { +pub struct Ratelimit { pub server: ServerSettings, } diff --git a/exes/ratelimit/src/grpc.rs b/exes/ratelimit/src/grpc.rs index 09885c1..9e5d31c 100644 --- a/exes/ratelimit/src/grpc.rs +++ b/exes/ratelimit/src/grpc.rs @@ -16,6 +16,7 @@ use twilight_http_ratelimiting::RatelimitHeaders; use crate::buckets::bucket::Bucket; use crate::buckets::redis_lock::RedisLock; +use crate::buckets::GlobalLock; pub struct RLServer { global: Arc, @@ -34,12 +35,12 @@ impl RLServer { struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); impl<'a> Extractor for MetadataMap<'a> { - /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None + /// Get a value for a key from the `MetadataMap`. If the value can't be converted to &str, returns None fn get(&self, key: &str) -> Option<&str> { self.0.get(key).and_then(|metadata| metadata.to_str().ok()) } - /// Collect all the keys from the MetadataMap. + /// Collect all the keys from the `MetadataMap`. fn keys(&self) -> Vec<&str> { self.0 .keys() @@ -70,6 +71,10 @@ impl Ratelimiter for RLServer { ) .unwrap(); + if let Some(duration) = self.global.is_locked().await { + tokio::time::sleep(duration).await; + } + let bucket: Arc = if self.buckets.read().await.contains_key(&data.path) { self.buckets .read() @@ -78,7 +83,7 @@ impl Ratelimiter for RLServer { .expect("impossible") .clone() } else { - let bucket = Bucket::new(self.global.clone()); + let bucket = Bucket::new(); self.buckets.write().await.insert(data.path, bucket.clone()); bucket }; @@ -93,13 +98,14 @@ impl Ratelimiter for RLServer { global.retry_after() ); self.global + .clone() .lock_for(Duration::from_secs(global.retry_after())) .await; } RatelimitHeaders::None => {} RatelimitHeaders::Present(present) => { // we should update the bucket. - bucket.update(present, data.precise_time); + bucket.update(&present, data.precise_time); } _ => unreachable!(), }; @@ -127,13 +133,13 @@ impl Ratelimiter for RLServer { .expect("impossible") .clone() } else { - let bucket = Bucket::new(self.global.clone()); + let bucket = Bucket::new(); self.buckets.write().await.insert(data.path, bucket.clone()); bucket }; // wait for the ticket to be accepted - bucket.ticket().await; + let _ = bucket.ticket().await; Ok(Response::new(())) } diff --git a/exes/ratelimit/src/lib.rs b/exes/ratelimit/src/lib.rs index 6653157..d1bd6e0 100644 --- a/exes/ratelimit/src/lib.rs +++ b/exes/ratelimit/src/lib.rs @@ -1,5 +1,17 @@ +#![deny( + clippy::all, + clippy::correctness, + clippy::suspicious, + clippy::style, + clippy::complexity, + clippy::perf, + clippy::pedantic, + clippy::nursery, + unsafe_code +)] + use buckets::redis_lock::RedisLock; -use config::RatelimitServerConfig; +use config::Ratelimit; use grpc::RLServer; use leash::{AnyhowResultFuture, Component}; use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer; @@ -10,13 +22,13 @@ use std::pin::Pin; use tokio::sync::oneshot; use tonic::transport::Server; -mod buckets; +pub mod buckets; mod config; mod grpc; pub struct RatelimiterServerComponent {} impl Component for RatelimiterServerComponent { - type Config = RatelimitServerConfig; + type Config = Ratelimit; const SERVICE_NAME: &'static str = "ratelimiter"; fn start( diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index e87757c..9593ac8 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -23,7 +23,7 @@ pub struct Discord { } #[derive(Debug, Deserialize, Clone, Default)] -pub struct ReverseProxyConfig { +pub struct ReverseProxy { pub server: ServerSettings, pub discord: Discord, pub ratelimiter_address: String, diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index a8f43cd..ab158fe 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -58,7 +58,7 @@ pub async fn handle_request( let request_path = request.uri().path(); let (api_path, trimmed_path) = normalize_path(request_path); - let mut uri_string = format!("http://127.0.0.1:9999{}{}", api_path, trimmed_path); + let mut uri_string = format!("http://127.0.0.1:9999{api_path}{trimmed_path}"); if let Some(query) = request.uri().query() { uri_string.push('?'); uri_string.push_str(query); @@ -103,12 +103,12 @@ pub async fn handle_request( .expect("Failed to check header") .starts_with("Bot") { - *auth = HeaderValue::from_str(&format!("Bot {}", token))?; + *auth = HeaderValue::from_str(&format!("Bot {token}"))?; } } else { request.headers_mut().insert( AUTHORIZATION, - HeaderValue::from_str(&format!("Bot {}", token))?, + HeaderValue::from_str(&format!("Bot {token}"))?, ); } @@ -132,12 +132,12 @@ pub async fn handle_request( let headers = resp .headers() .into_iter() - .map(|(k, v)| (k.to_string(), v.to_str().map(|f| f.to_string()))) + .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string))) .filter(|f| f.1.is_ok()) .map(|f| (f.0, f.1.expect("errors should be filtered"))) .collect(); - let _ = ratelimiter + let _submit_headers = ratelimiter .submit_headers(hash, headers) .instrument(info_span!("submitting headers")) .await; diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs index 53ab12a..174bea0 100644 --- a/exes/rest/src/lib.rs +++ b/exes/rest/src/lib.rs @@ -1,4 +1,16 @@ -use config::ReverseProxyConfig; +#![deny( + clippy::all, + clippy::correctness, + clippy::suspicious, + clippy::style, + clippy::complexity, + clippy::perf, + clippy::pedantic, + clippy::nursery, + unsafe_code +)] + +use config::ReverseProxy; use handler::handle_request; use hyper::{ @@ -20,7 +32,7 @@ mod ratelimit_client; pub struct ReverseProxyServer {} impl Component for ReverseProxyServer { - type Config = ReverseProxyConfig; + type Config = ReverseProxy; const SERVICE_NAME: &'static str = "rest"; fn start( diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index c9bd52e..e2f7e0e 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -1,4 +1,4 @@ -use crate::config::ReverseProxyConfig; +use crate::config::ReverseProxy; use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode}; use anyhow::anyhow; @@ -23,7 +23,7 @@ pub struct RemoteRatelimiter { current_remotes: Vec, stop: Arc>, - config: ReverseProxyConfig, + config: ReverseProxy, } impl Drop for RemoteRatelimiter { @@ -41,15 +41,15 @@ impl RemoteRatelimiter { // get list of dns responses let responses: Vec = dns_lookup::lookup_host(&self.config.ratelimiter_address)? .into_iter() - .filter(|address| address.is_ipv4()) + .filter(std::net::IpAddr::is_ipv4) .map(|address| address.to_string()) .collect(); let mut write = self.remotes.write().await; for ip in &responses { - if !self.current_remotes.contains(&ip) { - let a = VNode::new(ip.to_owned(), self.config.ratelimiter_port).await?; + if !self.current_remotes.contains(ip) { + let a = VNode::new(ip.clone(), self.config.ratelimiter_port).await?; write.add(a.clone()); } } @@ -58,13 +58,13 @@ impl RemoteRatelimiter { } #[must_use] - pub fn new(config: ReverseProxyConfig) -> Self { + pub fn new(config: ReverseProxy) -> Self { let (rx, mut tx) = broadcast::channel(1); let obj = Self { remotes: Arc::new(RwLock::new(HashRingWrapper::default())), stop: Arc::new(rx), config, - current_remotes: vec![] + current_remotes: vec![], }; let obj_clone = obj.clone(); @@ -75,7 +75,7 @@ impl RemoteRatelimiter { match obj_clone.get_ratelimiters().await { Ok(_) => { - debug!("refreshed ratelimiting servers") + debug!("refreshed ratelimiting servers"); } Err(err) => { error!("refreshing ratelimiting servers failed {}", err); @@ -122,7 +122,7 @@ impl RemoteRatelimiter { let context = span.context(); let mut request = Request::new(BucketSubmitTicketRequest { path }); global::get_text_map_propagator(|propagator| { - propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())) + propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())); }); // Requesting @@ -158,13 +158,15 @@ impl RemoteRatelimiter { let time = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH)? .as_millis(); + // truncation is expected + #[allow(clippy::cast_possible_truncation)] let mut request = Request::new(HeadersSubmitRequest { path, precise_time: time as u64, headers, }); global::get_text_map_propagator(|propagator| { - propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())) + propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())); }); node.submit_headers(request).await?; diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs index ac025c8..b99276d 100644 --- a/exes/rest/src/ratelimit_client/remote_hashring.rs +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -38,7 +38,7 @@ impl Hash for VNode { pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap); impl<'a> Injector for MetadataMap<'a> { - /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs + /// Set a key and value in the `MetadataMap`. Does nothing if the key or value are not valid inputs fn set(&mut self, key: &str, value: String) { if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { @@ -50,11 +50,11 @@ impl<'a> Injector for MetadataMap<'a> { impl VNode { pub async fn new(address: String, port: u16) -> Result { - let host = format!("http://{}:{}", address.clone(), port); + let host = format!("http://{}:{port}", address.clone()); debug!("connecting to {}", host); let client = RatelimiterClient::connect(host).await?; - Ok(VNode { client, address }) + Ok(Self { address, client }) } } diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs index b96f368..ccc7894 100644 --- a/exes/webhook/src/config.rs +++ b/exes/webhook/src/config.rs @@ -35,7 +35,7 @@ pub struct Discord { } #[derive(Debug, Deserialize, Clone, Default, Copy)] -pub struct WebhookConfig { +pub struct Webhook { pub server: ServerSettings, pub discord: Discord, } diff --git a/exes/webhook/src/handler/error.rs b/exes/webhook/src/handler/error.rs deleted file mode 100644 index ffa4cca..0000000 --- a/exes/webhook/src/handler/error.rs +++ /dev/null @@ -1,36 +0,0 @@ -use hyper::{header::ToStrError, Body, Response, StatusCode}; - -pub struct WebhookError { - pub code: StatusCode, - pub message: String, -} - -impl WebhookError { - pub fn new(code: StatusCode, message: &str) -> WebhookError { - WebhookError { - code, - message: message.to_string(), - } - } -} - -impl From for Response { - fn from(value: WebhookError) -> Self { - Response::builder() - .status(value.code) - .body(value.message.into()) - .unwrap() - } -} - -impl From for WebhookError { - fn from(_: hyper::Error) -> Self { - WebhookError::new(StatusCode::BAD_REQUEST, "invalid request") - } -} - -impl From for WebhookError { - fn from(_: ToStrError) -> Self { - WebhookError::new(StatusCode::BAD_REQUEST, "invalid request") - } -} diff --git a/exes/webhook/src/handler/make_service.rs b/exes/webhook/src/handler/make_service.rs index b51494a..4202cd7 100644 --- a/exes/webhook/src/handler/make_service.rs +++ b/exes/webhook/src/handler/make_service.rs @@ -23,7 +23,7 @@ impl Service for MakeSvc { } impl MakeSvc { - pub fn new(service: T) -> Self { + pub const fn new(service: T) -> Self { Self { service } } } diff --git a/exes/webhook/src/handler/mod.rs b/exes/webhook/src/handler/mod.rs index 594919b..ea7ecca 100644 --- a/exes/webhook/src/handler/mod.rs +++ b/exes/webhook/src/handler/mod.rs @@ -1,18 +1,17 @@ -use crate::config::WebhookConfig; +use crate::config::Webhook; +use anyhow::bail; use async_nats::Client; use ed25519_dalek::PublicKey; -use error::WebhookError; use hyper::{ body::{to_bytes, Bytes}, service::Service, - Body, Method, Request, Response, StatusCode, + Body, Method, Request, Response, }; use shared::payloads::{CachePayload, DispatchEventTagged}; -use signature::validate_signature; +use signature::validate; use std::{ future::Future, pin::Pin, - str::from_utf8, task::{Context, Poll}, }; use tracing::{debug, error}; @@ -22,7 +21,6 @@ use twilight_model::{ gateway::payload::incoming::InteractionCreate, }; -mod error; pub mod make_service; mod signature; @@ -32,46 +30,37 @@ pub mod tests; /// Hyper service used to handle the discord webhooks #[derive(Clone)] pub struct WebhookService { - pub config: WebhookConfig, + pub config: Webhook, pub nats: Client, } impl WebhookService { - async fn check_request(req: Request, pk: PublicKey) -> Result { + async fn check_request(req: Request, pk: PublicKey) -> Result { if req.method() == Method::POST { let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") { - sig.to_owned() + sig.clone() } else { - return Err(WebhookError::new( - StatusCode::BAD_REQUEST, - "missing signature header", - )); + bail!("Missing signature header"); }; let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") { - timestamp.to_owned() + timestamp.clone() } else { - return Err(WebhookError::new( - StatusCode::BAD_REQUEST, - "missing timestamp header", - )); + bail!("Missing timestamp header"); }; let data = to_bytes(req.into_body()).await?; - if validate_signature( + if validate( &pk, &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(), signature.to_str()?, ) { Ok(data) } else { - Err(WebhookError::new( - StatusCode::UNAUTHORIZED, - "invalid signature", - )) + bail!("invalid signature"); } } else { - Err(WebhookError::new(StatusCode::NOT_FOUND, "not found")) + bail!("not found"); } } @@ -79,69 +68,46 @@ impl WebhookService { req: Request, nats: Client, pk: PublicKey, - ) -> Result, WebhookError> { - match Self::check_request(req, pk).await { - Ok(data) => { - let utf8 = from_utf8(&data); - match utf8 { - Ok(data) => match serde_json::from_str::(data) { - Ok(value) => { - match value.kind { - InteractionType::Ping => Ok(Response::builder() - .header("Content-Type", "application/json") - .body(r#"{"type":1}"#.into()) - .unwrap()), - _ => { - debug!("calling nats"); - // this should hopefully not fail ? - - let data = CachePayload { - data: DispatchEventTagged { - data: DispatchEvent::InteractionCreate(Box::new( - InteractionCreate(value), - )), - }, - }; - - let payload = serde_json::to_string(&data).unwrap(); - - match nats - .request( - "nova.cache.dispatch.INTERACTION_CREATE".to_string(), - Bytes::from(payload), - ) - .await - { - Ok(response) => Ok(Response::builder() - .header("Content-Type", "application/json") - .body(Body::from(response.payload)) - .unwrap()), - - Err(error) => { - error!("failed to request nats: {}", error); - Err(WebhookError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "failed to request nats", - )) - } - } - } - } - } - - Err(error) => { - error!("invalid json body: {}", error); - Err(WebhookError::new( - StatusCode::BAD_REQUEST, - "invalid json body", - )) - } - }, - - Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")), + ) -> Result, anyhow::Error> { + let data = Self::check_request(req, pk).await?; + let interaction: Interaction = serde_json::from_slice(&data)?; + + if interaction.kind == InteractionType::Ping { + Ok(Response::builder() + .header("Content-Type", "application/json") + .body(r#"{"type":1}"#.into()) + .unwrap()) + } else { + debug!("calling nats"); + // this should hopefully not fail ? + + let data = CachePayload { + data: DispatchEventTagged { + data: DispatchEvent::InteractionCreate(Box::new(InteractionCreate( + interaction, + ))), + }, + }; + + let payload = serde_json::to_string(&data).unwrap(); + + match nats + .request( + "nova.cache.dispatch.INTERACTION_CREATE".to_string(), + Bytes::from(payload), + ) + .await + { + Ok(response) => Ok(Response::builder() + .header("Content-Type", "application/json") + .body(Body::from(response.payload)) + .unwrap()), + + Err(error) => { + error!("failed to request nats: {}", error); + Err(anyhow::anyhow!("internal error")) } } - Err(error) => Err(error), } } } @@ -149,7 +115,7 @@ impl WebhookService { /// Implementation of the service impl Service> for WebhookService { type Response = hyper::Response; - type Error = hyper::Error; + type Error = anyhow::Error; type Future = Pin> + Send>>; fn poll_ready(&mut self, _: &mut Context) -> Poll> { @@ -163,7 +129,7 @@ impl Service> for WebhookService { match response { Ok(r) => Ok(r), - Err(e) => Ok(e.into()), + Err(e) => Err(e), } }) } diff --git a/exes/webhook/src/handler/signature.rs b/exes/webhook/src/handler/signature.rs index 05221d3..5a48645 100644 --- a/exes/webhook/src/handler/signature.rs +++ b/exes/webhook/src/handler/signature.rs @@ -1,14 +1,13 @@ use ed25519_dalek::{PublicKey, Signature, Verifier}; #[inline] -pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool { +pub fn validate(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool { let mut slice: [u8; Signature::BYTE_SIZE] = [0; Signature::BYTE_SIZE]; let signature_result = hex::decode_to_slice(hex_signature, &mut slice); - let mut result = false; if signature_result.is_ok() { - result = public_key.verify(data, &Signature::from(slice)).is_ok(); + public_key.verify(data, &Signature::from(slice)).is_ok() + } else { + false } - - result } diff --git a/exes/webhook/src/handler/tests/signature.rs b/exes/webhook/src/handler/tests/signature.rs index 0bed86a..4ff52ff 100644 --- a/exes/webhook/src/handler/tests/signature.rs +++ b/exes/webhook/src/handler/tests/signature.rs @@ -1,16 +1,16 @@ -use crate::handler::signature::validate_signature; +use crate::handler::signature::validate; use ed25519_dalek::PublicKey; #[test] fn validate_signature_test() { let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002"; - let content = "message de test incroyable".as_bytes().to_vec(); + let content = b"message de test incroyable"; let public_key = PublicKey::from_bytes( &hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap(), ) .unwrap(); - assert!(validate_signature(&public_key, &content, signature)) + assert!(validate(&public_key, content, signature)); } #[test] @@ -21,10 +21,8 @@ fn validate_signature_reverse_test() { ) .unwrap(); - let content = "ceci est un test qui ne fonctionnera pas!" - .as_bytes() - .to_vec(); - assert!(!validate_signature(&public_key, &content, signature)) + let content = b"ceci est un test qui ne fonctionnera pas!"; + assert!(!validate(&public_key, content, signature)); } #[test] @@ -35,8 +33,6 @@ fn invalid_hex() { ) .unwrap(); - let content = "ceci est un test qui ne fonctionnera pas!" - .as_bytes() - .to_vec(); - assert!(!validate_signature(&public_key, &content, signature)) + let content = b"ceci est un test qui ne fonctionnera pas!"; + assert!(!validate(&public_key, content, signature)); } diff --git a/exes/webhook/src/lib.rs b/exes/webhook/src/lib.rs index 057e70f..933f38e 100644 --- a/exes/webhook/src/lib.rs +++ b/exes/webhook/src/lib.rs @@ -1,9 +1,21 @@ +#![deny( + clippy::all, + clippy::correctness, + clippy::suspicious, + clippy::style, + clippy::complexity, + clippy::perf, + clippy::pedantic, + clippy::nursery, + unsafe_code +)] + mod config; mod handler; use std::{future::Future, pin::Pin}; use crate::{ - config::WebhookConfig, + config::Webhook, handler::{make_service::MakeSvc, WebhookService}, }; use async_nats::Client; @@ -16,7 +28,7 @@ use tracing::info; pub struct WebhookServer {} impl Component for WebhookServer { - type Config = WebhookConfig; + type Config = Webhook; const SERVICE_NAME: &'static str = "webhook"; fn start( diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs index cd67075..37e2b7c 100644 --- a/libs/leash/src/lib.rs +++ b/libs/leash/src/lib.rs @@ -1,3 +1,14 @@ +#![deny( + clippy::all, + clippy::correctness, + clippy::suspicious, + clippy::style, + clippy::complexity, + clippy::perf, + clippy::pedantic, + clippy::nursery, +)] + use anyhow::Result; use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::sdk::trace::{self}; diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs index adcaf79..cdf0bd3 100644 --- a/libs/shared/src/config.rs +++ b/libs/shared/src/config.rs @@ -5,22 +5,24 @@ use std::{env, ops::Deref}; use tracing::info; #[derive(Debug, Deserialize, Clone)] -pub struct Settings { +pub struct Settings { #[serde(skip_deserializing)] pub config: T, - pub nats: crate::nats::NatsConfiguration, - pub redis: crate::redis::RedisConfiguration, + pub nats: crate::nats::Configuration, + pub redis: crate::redis::Configuration, } impl Settings { - pub fn new(service_name: &str) -> Result> { + /// # Errors + /// Fails it the config could not be deserialized to `Self::T` + pub fn new(service_name: &str) -> Result { let mut builder = Config::builder(); builder = builder.add_source(File::with_name("config/default")); let mode = env::var("ENV").unwrap_or_else(|_| "development".into()); info!("Configuration Environment: {}", mode); - builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false)); + builder = builder.add_source(File::with_name(&format!("config/{mode}")).required(false)); builder = builder.add_source(File::with_name("config/local").required(false)); let env = Environment::with_prefix("NOVA").separator("__"); @@ -28,7 +30,7 @@ impl Settings { builder = builder.add_source(env); let config = builder.build()?; - let mut settings: Settings = config.clone().try_deserialize()?; + let mut settings: Self = config.clone().try_deserialize()?; // try to load the config settings.config = config.get::(service_name)?; diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index a714a1b..92e7a05 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -1,5 +1,14 @@ -/// This crate is all the utilities shared by the nova rust projects -/// It includes logging, config and protocols. +#![deny( + clippy::all, + clippy::correctness, + clippy::suspicious, + clippy::style, + clippy::complexity, + clippy::perf, + clippy::pedantic, + clippy::nursery, +)] + pub mod config; pub mod nats; pub mod payloads; diff --git a/libs/shared/src/nats.rs b/libs/shared/src/nats.rs index 3529c46..7d4d3d1 100644 --- a/libs/shared/src/nats.rs +++ b/libs/shared/src/nats.rs @@ -4,23 +4,12 @@ use async_nats::Client; use serde::Deserialize; #[derive(Clone, Debug, Deserialize)] -pub struct NatsConfigurationClientCert { - pub cert: String, - pub key: String, -} - -#[derive(Clone, Debug, Deserialize)] -pub struct NatsConfigurationTls { - pub mtu: Option, -} - -#[derive(Clone, Debug, Deserialize)] -pub struct NatsConfiguration { +pub struct Configuration { pub host: String, } -impl From for Pin> + Send>> { - fn from(value: NatsConfiguration) -> Self { +impl From for Pin> + Send>> { + fn from(value: Configuration) -> Self { Box::pin(async move { Ok(async_nats::connect(value.host).await?) }) } } diff --git a/libs/shared/src/payloads.rs b/libs/shared/src/payloads.rs index 6e51b2d..3f183a9 100644 --- a/libs/shared/src/payloads.rs +++ b/libs/shared/src/payloads.rs @@ -31,7 +31,7 @@ impl<'de> Deserialize<'de> for DispatchEventTagged { let tagged = DispatchEventTaggedSerialized::deserialize(deserializer)?; let deserializer_seed = DispatchEventWithTypeDeserializer::new(&tagged.kind); let dispatch_event = deserializer_seed.deserialize(tagged.data).unwrap(); - Ok(DispatchEventTagged { + Ok(Self { data: dispatch_event, }) } diff --git a/libs/shared/src/redis.rs b/libs/shared/src/redis.rs index a623c2f..77aa97f 100644 --- a/libs/shared/src/redis.rs +++ b/libs/shared/src/redis.rs @@ -3,14 +3,14 @@ use serde::Deserialize; use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, Deserialize)] -pub struct RedisConfiguration { +pub struct Configuration { pub url: String, } -impl From +impl From for Pin> + Send>> { - fn from(value: RedisConfiguration) -> Self { + fn from(value: Configuration) -> Self { Box::pin(async move { let con = Client::open(value.url)?; let (multiplex, ready) = con.create_multiplexed_tokio_connection().await?; -- 2.39.5