diff options
42 files changed, 943 insertions, 450 deletions
@@ -64,6 +64,12 @@ dependencies = [  ]  [[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]]  name = "anyhow"  version = "1.0.68"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -302,6 +308,12 @@ dependencies = [  ]  [[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + +[[package]]  name = "cbindgen"  version = "0.24.3"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -333,6 +345,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"  [[package]] +name = "ciborium" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c137568cc60b904a7724001b35ce2630fd00d5d84805fbb608ab89509d788f" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "346de753af073cc87b52b2083a506b38ac176a44cfb05497b622e27be899b369" + +[[package]] +name = "ciborium-ll" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213030a2b5a4e0c0892b6652260cf6ccac84827b83a85a534e178e3906c4cf1b" +dependencies = [ + "ciborium-io", + "half", +] + +[[package]]  name = "clap"  version = "3.2.23"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -436,6 +475,44 @@ dependencies = [  ]  [[package]] +name = "criterion" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" +dependencies = [ + "anes", + "atty", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "itertools", + "lazy_static", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + +[[package]]  name = "crossbeam-channel"  version = "0.5.6"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -446,6 +523,30 @@ dependencies = [  ]  [[package]] +name = "crossbeam-deque" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]]  name = "crossbeam-utils"  version = "0.8.14"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -845,6 +946,12 @@ dependencies = [  ]  [[package]] +name = "half" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" + +[[package]]  name = "hashbrown"  version = "0.12.3"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1186,6 +1293,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"  [[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + +[[package]]  name = "mime"  version = "0.3.16"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1316,6 +1432,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"  [[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + +[[package]]  name = "opaque-debug"  version = "0.3.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1599,6 +1721,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"  [[package]] +name = "plotters" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b639e642295546c50fcd545198c9d64ee2a38620a628724a3b266d5fbf97" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193228616381fecdc1224c62e96946dfbc73ff4384fba576e052ff8c1bea8142" + +[[package]] +name = "plotters-svg" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9a81d2759aae1dae668f783c308bc5c8ebd191ff4184aaa1b37f65a6ae5a56f" +dependencies = [ + "plotters-backend", +] + +[[package]]  name = "ppv-lite86"  version = "0.2.17"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1779,6 +1929,8 @@ name = "ratelimit"  version = "0.1.0"  dependencies = [   "anyhow", + "criterion", + "env_logger 0.7.1",   "hyper",   "leash",   "opentelemetry", @@ -1787,15 +1939,41 @@ dependencies = [   "redis",   "serde",   "shared", + "test-log",   "tokio",   "tokio-stream", + "tokio-test",   "tonic",   "tracing",   "tracing-opentelemetry", + "tracing-subscriber", + "tracing-test",   "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",  ]  [[package]] +name = "rayon" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db3a213adf02b3bcfd2d3846bb41cb22857d131789e01df434fb7e7bc0759b7" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cac410af5d00ab6884528b4ab69d1e8e146e8d471201800fa1b4524126de6ad3" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + +[[package]]  name = "redis"  version = "0.22.1"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1992,6 +2170,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"  [[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]]  name = "schannel"  version = "0.1.20"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2331,6 +2518,17 @@ dependencies = [  ]  [[package]] +name = "test-log" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f0c854faeb68a048f0f2dc410c5ddae3bf83854ef0e4977d58306a5edef50e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]]  name = "textwrap"  version = "0.16.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2393,6 +2591,16 @@ dependencies = [  ]  [[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + +[[package]]  name = "tinyvec"  version = "1.6.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2482,6 +2690,19 @@ dependencies = [  ]  [[package]] +name = "tokio-test" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + +[[package]]  name = "tokio-tungstenite"  version = "0.17.2"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2705,6 +2926,29 @@ dependencies = [  ]  [[package]] +name = "tracing-test" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3d272c44878d2bbc9f4a20ad463724f03e19dbc667c6e84ac433ab7ffcc70b" +dependencies = [ + "lazy_static", + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744324b12d69a9fc1edea4b38b7b1311295b662d161ad5deac17bb1358224a08" +dependencies = [ + "lazy_static", + "quote", + "syn", +] + +[[package]]  name = "try-lock"  version = "0.2.3"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2910,6 +3154,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"  [[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + +[[package]]  name = "want"  version = "0.3.0"  source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..b30f98f --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,42 @@ +# Nova architecture + +The nova architecture is composed of multiple components. Each of them is horizontally scale-able. + +``` +                                            ┌──────────────────┐ +                                            │                  │ +                              ┌─────────────┤    Discord API   ├──────────────┐ +                              │             │                  │              │ +                              │             └────────┬─────────┘              │ +                              │                      │                        │ +                              │                      │                        │ +                              │                      │                        │ +                    ┌─────────┴────────┐    ┌────────┴─────────┐    ┌─────────┴────────┐ +                    │                  │    │                  │    │                  │ +                    │    Rest Proxy    │    │  Gateway client  │    │  Webhook Server  │ +                    │                  │    │                  │    │                  │ +                    └─────────┬──┬─────┘    └────────┬─────────┘    └─────────┬────────┘ +                              │  │                   │                        │ +                              │  │                   │                        │ +                              │  │                   │                        │ +                              │  │                   │                        │ +                              │  │                   │                        │ +                              │  │                   │                        │ +                              │  └───────┐           │                        │ +┌────────────────┐   ┌────────┴───────┐  │   ┌───────┴────────┐               │ +│                │   │                │  │   │                ├───────────────┘ +│     Redis      ├───┤    Ratelimit   │  │   │  Nats broker   │ +│                │   │                │  │   │                ├──────────────────┐ +└────────────────┘   └────────────────┘  │   └───────┬────────┘                  │ +                                         │           │                           │ +                                         │           │                           │ +                                         │   ┌───────┴────────┐           ┌──────┴─────┐ +                                         │   │                │           │            │ +                                         │   │  Cache manager ├───────────┤    User    │ +                                         │   │                │           │            │ +                                         │   └────────────────┘           └──────┬─────┘ +                                         └───────────────────────────────────────┘ +``` + +## Rest Proxy + diff --git a/docs/quickstart.md b/docs/quickstart.md index 1eace72..d4ca78b 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -1,24 +1,2 @@  # 5 Minutes quickstart -This page shows you how to start a new nova project -under five minutes using a typescript project, -hold tight this is going to be fast. - -## Requirements - -* A discord bot application available -* [Docker](https://docker.io/) (or alternatives) available to you. -* A domain name / [ngrok.io](https://ngrok.com/) domain (for webhooks only) - -> If you are deploying nova to production, consider following the -> production guide instead. - -## Setting up a nova instance - -Clone the [example repo](https://github.com/discordnova/nova-quickstart.git) like so - -`git clone https://github.com/discordnova/nova-quickstart.git`, - -In this folder, find the `config.yml.example` file and rename it to match `config.yml` - -Next, you need to fill all the environment variables to match your discord bot. diff --git a/exes/all-in-one/Makefile b/exes/all-in-one/Makefile deleted file mode 100644 index 8ed17c4..0000000 --- a/exes/all-in-one/Makefile +++ /dev/null @@ -1,15 +0,0 @@ -clean: -	rm ./build/* - -library: -	cargo build --release - -build: library -	cp ../../target/release/liball_in_one.a ./build -	go build -a -ldflags '-s' -o build/all-in-one - -all: library build - -run: all-in-one -	./build/all-in-one - diff --git a/exes/all-in-one/main.go b/exes/all-in-one/main.go deleted file mode 100644 index 1de08db..0000000 --- a/exes/all-in-one/main.go +++ /dev/null @@ -1,72 +0,0 @@ -package main - -/* -#cgo LDFLAGS: -L./build -lall_in_one -lz -lm -#include "./build/all-in-one.h" -*/ -import "C" - -import ( -	"fmt" -	"os" -	"os/signal" -	"syscall" -	"time" - -	"github.com/Jeffail/gabs" -	"github.com/alicebob/miniredis/v2" - -	server "github.com/nats-io/nats-server/v2/server" -) - -func main() { -	// Intialise les logs de la librarie Rust -	C.init_logs() -	// Charge la configuration -	str := C.GoString(C.load_config()) - -	// Démarre une instance MiniRedis -	mr := miniredis.NewMiniRedis() -	err := mr.Start() - -	if err != nil { -		panic(err) -	} - -	// Démarre un serveur Nats -	opts := &server.Options{} -	opts.Host = "0.0.0.0" -	ns, err := server.NewServer(opts) - -	if err != nil { -		panic(err) -	} - -	go ns.Start() - -	if !ns.ReadyForConnections(4 * time.Second) { -		panic("not ready for connection") -	} - -	// Edite le json de configuration donné -	// Et injecte la configuration des servers Nats et MiniRedis -	json, _ := gabs.ParseJSON([]byte(str)) -	json.Set(fmt.Sprintf("redis://%s", mr.Addr()), "redis", "url") -	json.Set("localhost", "nats", "host") -	json.Set(1, "webhook", "discord", "client_id") - -	// Démarre une instance de nova -	instance := C.start_instance(C.CString(json.String())) - -	// Wait for a SIGINT -	c := make(chan os.Signal, 1) -	signal.Notify(c, -		syscall.SIGHUP, -		syscall.SIGINT, -		syscall.SIGTERM, -		syscall.SIGQUIT) -	<-c - -	println("Arret de nova all in one") -	C.stop_instance(instance) -} diff --git a/exes/all-in-one/src/errors.rs b/exes/all-in-one/src/errors.rs index d676e8d..1d2a9e2 100644 --- a/exes/all-in-one/src/errors.rs +++ b/exes/all-in-one/src/errors.rs @@ -1,7 +1,6 @@  use std::cell::RefCell;  use anyhow::Result; -use libc::c_int;  use tracing::error;  thread_local! { @@ -9,7 +8,7 @@ thread_local! {  }  /// Update the most recent error, clearing whatever may have been there before. -pub fn stacktrace(err: anyhow::Error) -> String { +#[must_use] pub fn stacktrace(err: &anyhow::Error) -> String {      format!("{err}")  } @@ -23,13 +22,15 @@ where          Ok(ok) => Some(ok),          Err(error) => {              // Call the handler -            handle_error(error); +            handle_error(&error);              None          }      }  } -pub fn handle_error(error: anyhow::Error) { +/// # Panics +/// Panics if the stacktrace size is > than an i32 +pub fn handle_error(error: &anyhow::Error) {      ERROR_HANDLER.with(|val| {          let mut stacktrace = stacktrace(error); @@ -38,8 +39,8 @@ pub fn handle_error(error: anyhow::Error) {              // Call the error handler              unsafe {                  func( -                    stacktrace.len() as c_int + 1, -                    stacktrace.as_mut_ptr() as *mut i8, +                    (stacktrace.len() + 1).try_into().unwrap(), +                    stacktrace.as_mut_ptr().cast::<i8>(),                  );              }          } diff --git a/exes/all-in-one/src/ffi.rs b/exes/all-in-one/src/ffi.rs index 7a8821f..449d1cc 100644 --- a/exes/all-in-one/src/ffi.rs +++ b/exes/all-in-one/src/ffi.rs @@ -69,8 +69,12 @@ pub unsafe extern "C" fn stop_instance(instance: *mut AllInOneInstance) {      });  } +/// # Panics +/// Panics if an incorrect `RUST_LOG` variables is specified.  #[no_mangle]  pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneInstance { +    // Returning a null pointer (unaligned) is expected. +    #[allow(clippy::cast_ptr_alignment)]      wrap_result(move || {          let value = CString::from_raw(config);          let json = value.to_str()?; @@ -87,7 +91,7 @@ pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneIn              .with(fmt::layer())              .with(                  EnvFilter::builder() -                    .with_default_directive(Directive::from_str("info").unwrap()) +                    .with_default_directive(Directive::from_str("info").expect(""))                      .from_env()                      .unwrap(),              ) @@ -96,7 +100,7 @@ pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneIn          // Error handling task          runtime.spawn(async move {              while let Some(error) = errors.recv().await { -                handle_error(error) +                handle_error(&error);              }          }); diff --git a/exes/all-in-one/src/lib.rs b/exes/all-in-one/src/lib.rs index 74625c0..493bf46 100644 --- a/exes/all-in-one/src/lib.rs +++ b/exes/all-in-one/src/lib.rs @@ -1,3 +1,14 @@ +#![deny( +    clippy::all, +    clippy::correctness, +    clippy::suspicious, +    clippy::style, +    clippy::complexity, +    clippy::perf, +    clippy::pedantic, +    clippy::nursery, +)] +  pub mod errors;  pub mod ffi;  pub mod utils; diff --git a/exes/all-in-one/src/utils.rs b/exes/all-in-one/src/utils.rs index 7a7134d..159d98d 100644 --- a/exes/all-in-one/src/utils.rs +++ b/exes/all-in-one/src/utils.rs @@ -26,7 +26,7 @@ fn load_settings_for<T: Default + DeserializeOwned + Clone>(      name: &str,  ) -> Result<Settings<T>> {      let value: Value = serde_json::from_str(settings)?; -    let section: T = serde_json::from_value(value.get(name).unwrap().to_owned())?; +    let section: T = serde_json::from_value(value.get(name).unwrap().clone())?;      let mut settings: Settings<T> = serde_json::from_value(value)?;      settings.config = section; @@ -69,7 +69,7 @@ pub(crate) fn load_config_file() -> Result<Value> {      let mode = std::env::var("ENV").unwrap_or_else(|_| "development".into());      info!("Configuration Environment: {}", mode); -    builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false)); +    builder = builder.add_source(File::with_name(&format!("config/{mode}")).required(false));      builder = builder.add_source(File::with_name("config/local").required(false));      let env = Environment::with_prefix("NOVA").separator("__"); diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml index 349deaa..70cb6a3 100644 --- a/exes/cache/Cargo.toml +++ b/exes/cache/Cargo.toml @@ -2,8 +2,12 @@  name = "cache"  version = "0.1.0"  edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +description = "Stores the data from discord if needed" +readme = "README.md" +repository = "https://github.com/discordnova/nova.git" +keywords = ["discord", "scaleable", "cache"] +categories = ["microservices", "nova"] +license = "APACHE2"  [dependencies]  shared = { path = "../../libs/shared" } diff --git a/exes/cache/README.md b/exes/cache/README.md new file mode 100644 index 0000000..0395009 --- /dev/null +++ b/exes/cache/README.md @@ -0,0 +1,3 @@ +# Cache + +Stores the data from discord if needed
\ No newline at end of file diff --git a/exes/gateway/src/config.rs b/exes/gateway/src/config.rs index 7b12bfe..a1a2fad 100644 --- a/exes/gateway/src/config.rs +++ b/exes/gateway/src/config.rs @@ -2,14 +2,14 @@ use serde::{Deserialize, Serialize};  use twilight_gateway::Intents;  #[derive(Serialize, Deserialize, Clone)] -pub struct GatewayConfig { +pub struct Gateway {      pub token: String,      pub intents: Intents,      pub shard: u64,      pub shard_total: u64,  } -impl Default for GatewayConfig { +impl Default for Gateway {      fn default() -> Self {          Self {              intents: Intents::empty(), diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs index ec3337b..e54bb5c 100644 --- a/exes/gateway/src/lib.rs +++ b/exes/gateway/src/lib.rs @@ -1,5 +1,17 @@ +#![deny( +    clippy::all, +    clippy::correctness, +    clippy::suspicious, +    clippy::style, +    clippy::complexity, +    clippy::perf, +    clippy::pedantic, +    clippy::nursery, +    unsafe_code +)] +#![allow(clippy::redundant_pub_crate)]  use async_nats::{Client, HeaderMap, HeaderValue}; -use config::GatewayConfig; +use config::Gateway;  use leash::{AnyhowResultFuture, Component};  use opentelemetry::{global, propagation::Injector};  use shared::{ @@ -12,21 +24,21 @@ use tokio_stream::StreamExt;  use tracing_opentelemetry::OpenTelemetrySpanExt;  use twilight_gateway::{Event, Shard};  pub mod config; -use tracing::{debug, info, trace_span}; +use tracing::{debug, error, info, trace_span};  use twilight_model::gateway::event::DispatchEvent;  struct MetadataMap<'a>(&'a mut HeaderMap);  impl<'a> Injector for MetadataMap<'a> {      fn set(&mut self, key: &str, value: String) { -        self.0.insert(key, HeaderValue::from_str(&value).unwrap()) +        self.0.insert(key, HeaderValue::from_str(&value).unwrap());      }  }  pub struct GatewayServer {}  impl Component for GatewayServer { -    type Config = GatewayConfig; +    type Config = Gateway;      const SERVICE_NAME: &'static str = "gateway";      fn start( @@ -35,7 +47,7 @@ impl Component for GatewayServer {          mut stop: oneshot::Receiver<()>,      ) -> AnyhowResultFuture<()> {          Box::pin(async move { -            let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) +            let (shard, mut events) = Shard::builder(settings.token.clone(), settings.intents)                  .shard(settings.shard, settings.shard_total)?                  .build(); @@ -48,40 +60,10 @@ impl Component for GatewayServer {              loop {                  select! {                      event = events.next() => { -                          if let Some(event) = event { -                            match event { -                                Event::Ready(ready) => { -                                    info!("Logged in as {}", ready.user.name); -                                }, - -                                _ => { - -                                    let name = event.kind().name(); -                                    if let Ok(dispatch_event) = DispatchEvent::try_from(event) { -                                        debug!("handling event {}", name.unwrap()); - -                                        let data = CachePayload { -                                            data: DispatchEventTagged { -                                                data: dispatch_event, -                                            }, -                                        }; -                                        let value = serde_json::to_string(&data)?; -                                        let bytes = bytes::Bytes::from(value); - -                                        let span = trace_span!("nats send"); - -                                        let mut header_map = HeaderMap::new(); -                                        let context = span.context(); -                                        global::get_text_map_propagator(|propagator| { -                                            propagator.inject_context(&context, &mut MetadataMap(&mut header_map)) -                                        }); - -                                        nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes) -                                            .await?; -                                    } -                                } -                            } +                           let _ = handle_event(event, &nats) +                            .await +                            .map_err(|err| error!(error = ?err, "event publish failed"));                          } else {                              break                          } @@ -101,3 +83,39 @@ impl Component for GatewayServer {          Self {}      }  } + +async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> { +    if let Event::Ready(ready) = event { +        info!("Logged in as {}", ready.user.name); +    } else { +        let name = event.kind().name(); +        if let Ok(dispatch_event) = DispatchEvent::try_from(event) { +            debug!("handling event {}", name.unwrap()); + +            let data = CachePayload { +                data: DispatchEventTagged { +                    data: dispatch_event, +                }, +            }; +            let value = serde_json::to_string(&data)?; +            let bytes = bytes::Bytes::from(value); + +            let span = trace_span!("nats send"); + +            let mut header_map = HeaderMap::new(); +            let context = span.context(); +            global::get_text_map_propagator(|propagator| { +                propagator.inject_context(&context, &mut MetadataMap(&mut header_map)); +            }); + +            nats.publish_with_headers( +                format!("nova.cache.dispatch.{}", name.unwrap()), +                header_map, +                bytes, +            ) +            .await?; +        } +    } + +    Ok(()) +} diff --git a/exes/ratelimit/Cargo.toml b/exes/ratelimit/Cargo.toml index d82d8c9..3989735 100644 --- a/exes/ratelimit/Cargo.toml +++ b/exes/ratelimit/Cargo.toml @@ -26,4 +26,16 @@ tonic = "0.8.3"  tokio-stream = "0.1.11" -redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
\ No newline at end of file +redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] } + +[dev-dependencies] +criterion = { version = "0.4", features = ["async_tokio"] } +tokio-test = "*" +tracing-test = "0.2.3" +tracing-subscriber = "*" +env_logger = "*" +test-log = { version = "0.2.11", features = ["log", "trace"] } + +[[bench]] +name = "bucket" +harness = false diff --git a/exes/ratelimit/bench/req.rs b/exes/ratelimit/bench/req.rs deleted file mode 100644 index e69de29..0000000 --- a/exes/ratelimit/bench/req.rs +++ /dev/null diff --git a/exes/ratelimit/benches/bucket.rs b/exes/ratelimit/benches/bucket.rs new file mode 100644 index 0000000..d049d62 --- /dev/null +++ b/exes/ratelimit/benches/bucket.rs @@ -0,0 +1,56 @@ +use std::ops::Add; +use std::time::{Duration, SystemTime}; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use ratelimit::buckets::bucket::Bucket; +use tokio::runtime::Runtime; +use twilight_http_ratelimiting::RatelimitHeaders; + +pub fn acquire_ticket(c: &mut Criterion) { +    let rt = Runtime::new().unwrap(); + +    let bucket = rt.block_on(async move { +        let bucket = Bucket::new(); + +        let mreset = SystemTime::now() +            .add(Duration::from_secs(3600)) +            .duration_since(SystemTime::UNIX_EPOCH) +            .unwrap() +            .as_millis() +            .to_string(); +        let headers = [ +            ( +                "x-ratelimit-bucket", +                "d721dea6054f6322373d361f98e5c38b".as_bytes(), +            ), +            ("x-ratelimit-limit", "100".as_bytes()), +            ("x-ratelimit-remaining", "1".as_bytes()), +            ("x-ratelimit-reset", mreset.as_bytes()), +            ("x-ratelimit-reset-after", "100000.000".as_bytes()), +        ]; +        if let RatelimitHeaders::Present(present) = +            RatelimitHeaders::from_pairs(headers.into_iter()).unwrap() +        { +            bucket.update( +                &present, +                SystemTime::now() +                    .duration_since(SystemTime::UNIX_EPOCH) +                    .unwrap() +                    .as_millis() as u64, +            ); +        } +        bucket +    }); + +    let size: usize = 1024; +    c.bench_with_input(BenchmarkId::new("input_example", size), &size, |b, _| { +        // Insert a call to `to_async` to convert the bencher to async mode. +        // The timing loops are the same as with the normal bencher. +        b.to_async(&rt).iter(|| async { +            bucket.ticket().await.unwrap(); +        }); +    }); +} + +criterion_group!(benches, acquire_ticket); +criterion_main!(benches); diff --git a/exes/ratelimit/src/buckets/atomic_instant.rs b/exes/ratelimit/src/buckets/atomic_instant.rs index 67dd0ee..992adb9 100644 --- a/exes/ratelimit/src/buckets/atomic_instant.rs +++ b/exes/ratelimit/src/buckets/atomic_instant.rs @@ -3,33 +3,46 @@ use std::{      time::{Duration, SystemTime, UNIX_EPOCH},  }; +use tracing::debug; +  #[derive(Default, Debug)]  pub struct AtomicInstant(AtomicU64);  impl AtomicInstant { +    #[must_use]      pub const fn empty() -> Self {          Self(AtomicU64::new(0))      }      pub fn elapsed(&self) -> Duration { +        // Truncation is expected +        #[allow(clippy::cast_possible_truncation)]          Duration::from_millis(              SystemTime::now()                  .duration_since(UNIX_EPOCH) -                .unwrap() +                .expect("time went backwards")                  .as_millis() as u64 -                - self.0.load(Ordering::SeqCst), +                - self.0.load(Ordering::Relaxed),          )      }      pub fn as_millis(&self) -> u64 { -        self.0.load(Ordering::SeqCst) +        self.0.load(Ordering::Relaxed)      }      pub fn set_millis(&self, millis: u64) { -        self.0.store(millis, Ordering::SeqCst); +        // get address of struct +        let b = self as *const _ as usize; +        debug!(millis, this = ?b, "settings instant millis"); +        self.0.store(millis, Ordering::Relaxed);      }      pub fn is_empty(&self) -> bool { -        self.as_millis() == 0 +        let millis = self.as_millis(); +        // get address of struct +        let b = self as *const _ as usize; +        debug!(millis, this = ?b, "settings instant millis"); +        debug!(empty = (millis == 0), millis, this = ?b, "instant empty check"); +        millis == 0      }  } diff --git a/exes/ratelimit/src/buckets/bucket.rs b/exes/ratelimit/src/buckets/bucket.rs index f8fa8b9..c40f059 100644 --- a/exes/ratelimit/src/buckets/bucket.rs +++ b/exes/ratelimit/src/buckets/bucket.rs @@ -1,3 +1,4 @@ +use super::{async_queue::AsyncQueue, atomic_instant::AtomicInstant};  use std::{      sync::{          atomic::{AtomicU64, Ordering}, @@ -9,8 +10,6 @@ use tokio::{sync::oneshot, task::JoinHandle};  use tracing::{debug, trace};  use twilight_http_ratelimiting::headers::Present; -use super::{async_queue::AsyncQueue, atomic_instant::AtomicInstant, redis_lock::RedisLock}; -  #[derive(Clone, Debug)]  pub enum TimeRemaining {      Finished, @@ -18,22 +17,57 @@ pub enum TimeRemaining {      Some(Duration),  } +/// A bucket is a simple atomic implementation of a bucket used for ratelimiting +/// It can be updated dynamically depending on the discord api responses. +/// +/// # Usage +/// ``` +/// use ratelimit::buckets::bucket::Bucket; +/// use twilight_http_ratelimiting::RatelimitHeaders; +/// use std::time::SystemTime; +/// +/// let bucket = Bucket::new(); +/// +/// // Feed the headers informations into the bucket to update it +/// let headers = [ +///     ( "x-ratelimit-bucket", "bucket id".as_bytes()), +///     ("x-ratelimit-limit", "100".as_bytes()), +///     ("x-ratelimit-remaining", "0".as_bytes()), +///     ("x-ratelimit-reset", "".as_bytes()), +///     ("x-ratelimit-reset-after", "10.000".as_bytes()), +/// ]; +/// +/// // Parse the headers +/// let present = if let Ok(RatelimitHeaders::Present(present)) = RatelimitHeaders::from_pairs(headers.into_iter()) { present } else { todo!() }; +/// +/// // this should idealy the time of the request +/// let current_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64; +/// +/// bucket.update(present, current_time).await; +/// +/// ``` +/// +/// # Async +/// You need to call this struct new method in a tokio 1.x async runtime.  #[derive(Debug)]  pub struct Bucket { +    /// Limits of tickets that can be accepted      pub limit: AtomicU64, -    /// Queue associated with this bucket. -    pub queue: AsyncQueue, -    /// Number of tickets remaining. +    /// Remaining requests that can be executed      pub remaining: AtomicU64, -    /// Duration after the [`Self::last_update`] time the bucket will refresh. +    /// Time to wait after [`Self::last_update`] before accepting new tickets.      pub reset_after: AtomicU64, -    /// When the bucket's ratelimit refresh countdown started (unix millis) +    /// Last update got from the discord upstream      pub last_update: AtomicInstant, -    pub tasks: Vec<JoinHandle<()>>, +    /// List of tasks that dequeue tasks from [`Self::queue`] +    tasks: Vec<JoinHandle<()>>, +    /// Queue of tickets to be processed. +    queue: AsyncQueue,  }  impl Drop for Bucket { +    /// Simply abord the dequeue tasks to aboid leaking memory via arc(s)      fn drop(&mut self) {          for join in &self.tasks {              join.abort(); @@ -42,8 +76,11 @@ impl Drop for Bucket {  }  impl Bucket { -    /// Create a new bucket for the specified [`Path`]. -    pub fn new(global: Arc<RedisLock>) -> Arc<Self> { +    /// Creates a new bucket with four dequeue tasks +    /// # Async +    /// This functions **should** be called in a tokio 1.x runtime, otherwise the function *will* panic. +    #[must_use] +    pub fn new() -> Arc<Self> {          let tasks = vec![];          let this = Arc::new(Self { @@ -58,19 +95,16 @@ impl Bucket {          // Run with 4 dequeue tasks          for _ in 0..4 {              let this = this.clone(); -            let global = global.clone();              tokio::spawn(async move { +                // continuously wait for elements in the queue to process them sequantially. +                // this is using parallel tasks to allow (hopefully) better performance.                  while let Some(element) = this.queue.pop().await { -                    // we need to wait -                    if let Some(duration) = global.locked_for().await { -                        tokio::time::sleep(duration).await; -                    } -                      if this.remaining() == 0 {                          debug!("0 tickets remaining, we have to wait.");                          match this.time_remaining() {                              TimeRemaining::Finished => { +                                debug!("waiting seems finished.");                                  this.try_reset();                              }                              TimeRemaining::Some(duration) => { @@ -79,7 +113,9 @@ impl Bucket {                                  this.try_reset();                              } -                            TimeRemaining::NotStarted => {} +                            TimeRemaining::NotStarted => { +                                debug!("we should not wait"); +                            }                          }                      } @@ -94,19 +130,17 @@ impl Bucket {          this      } -    /// Total number of tickets allotted in a cycle. +    /// Total number of tickets allowed in a cycle.      pub fn limit(&self) -> u64 {          self.limit.load(Ordering::Relaxed)      } -    /// Number of tickets remaining. +    /// Number of tickets remaining in the current cycle.      pub fn remaining(&self) -> u64 {          self.remaining.load(Ordering::Relaxed)      } -    /// Duration after the [`started_at`] time the bucket will refresh. -    /// -    /// [`started_at`]: Self::started_at +    /// Duration after the [`Self::last_update`] time the bucket will refresh.      pub fn reset_after(&self) -> u64 {          self.reset_after.load(Ordering::Relaxed)      } @@ -117,6 +151,10 @@ impl Bucket {          let last_update = &self.last_update;          if last_update.is_empty() { +            debug!("last update is empty"); + +            TimeRemaining::NotStarted +        } else {              let elapsed = last_update.elapsed();              if elapsed > Duration::from_millis(reset_after) { @@ -124,16 +162,12 @@ impl Bucket {              }              TimeRemaining::Some(Duration::from_millis(reset_after) - elapsed) -        } else { -            TimeRemaining::NotStarted          }      } -    /// Try to reset this bucket's [`started_at`] value if it has finished. +    /// Try to reset this bucket's [`Self::last_update`] value if it has finished.      ///      /// Returns whether resetting was possible. -    /// -    /// [`started_at`]: Self::started_at      pub fn try_reset(&self) -> bool {          if self.last_update.is_empty() {              return false; @@ -150,10 +184,12 @@ impl Bucket {      }      /// Update this bucket's ratelimit data after a request has been made. -    pub fn update(&self, ratelimits: Present, time: u64) { +    /// The time of the request should be given. +    pub fn update(&self, ratelimits: &Present, time: u64) {          let bucket_limit = self.limit();          if self.last_update.is_empty() { +            debug!(millis = time, "updated the last update time");              self.last_update.set_millis(time);          } @@ -167,9 +203,122 @@ impl Bucket {              .store(ratelimits.remaining(), Ordering::Relaxed);      } -    pub async fn ticket(&self) -> oneshot::Receiver<()> { +    /// Submits a ticket to the queue +    /// A oneshot receiver is returned and will be called when the ticket is accepted. +    pub fn ticket(&self) -> oneshot::Receiver<()> {          let (tx, rx) = oneshot::channel();          self.queue.push(tx);          rx      }  } + +#[cfg(test)] +mod tests { +    use std::{ +        ops::Add, +        time::{Duration, Instant, SystemTime}, +    }; + +    use tokio::time::timeout; +    use tracing::info; +    use twilight_http_ratelimiting::RatelimitHeaders; + +    use super::Bucket; + +    #[test_log::test(tokio::test)] +    async fn should_ratelimit() { +        let bucket = Bucket::new(); + +        // Intialize a bucket with one remaining ticket +        // and that resets in oue hour +        let mreset = SystemTime::now() +            .add(Duration::from_secs(100)) +            .duration_since(SystemTime::UNIX_EPOCH) +            .unwrap() +            .as_millis() +            .to_string(); +        let headers: [(&str, &[u8]); 5] = [ +            ("x-ratelimit-bucket", b"123"), +            ("x-ratelimit-limit", b"100"), +            ("x-ratelimit-remaining", b"1"), +            ("x-ratelimit-reset", mreset.as_bytes()), +            ("x-ratelimit-reset-after", b"100.000"), +        ]; +        if let RatelimitHeaders::Present(present) = +            RatelimitHeaders::from_pairs(headers.into_iter()).unwrap() +        { +            // Integer truncating is expected +            #[allow(clippy::cast_possible_truncation)] +            bucket.update( +                &present, +                SystemTime::now() +                    .duration_since(SystemTime::UNIX_EPOCH) +                    .unwrap() +                    .as_millis() as u64, +            ); +        } + +        let ticket = bucket.ticket(); + +        info!("first request"); +        // We should accept one ticket +        let respo = timeout(Duration::from_secs(10), ticket).await; +        assert!(respo.is_ok()); + +        info!("second request"); + +        let ticket = bucket.ticket(); +        // We should accept one ticket +        let respo = timeout(Duration::from_secs(1), ticket).await; + +        // the ticket should not have responded because the queue is locked +        assert!(respo.is_err()); +    } + +    #[test_log::test(tokio::test)] +    async fn should_block_until_possible() { +        let bucket = Bucket::new(); + +        // Intialize a bucket with one remaining ticket +        // and that resets in oue hour +        let mreset = SystemTime::now() +            .add(Duration::from_secs(100)) +            .duration_since(SystemTime::UNIX_EPOCH) +            .unwrap() +            .as_millis() +            .to_string(); +        let headers: [(&str, &[u8]); 5] = [ +            ("x-ratelimit-bucket", b"123"), +            ("x-ratelimit-limit", b"100"), +            ("x-ratelimit-remaining", b"0"), +            ("x-ratelimit-reset", mreset.as_bytes()), +            ("x-ratelimit-reset-after", b"100.000"), +        ]; + +        if let RatelimitHeaders::Present(present) = +            RatelimitHeaders::from_pairs(headers.into_iter()).unwrap() +        { +            // Integer truncating is expected +            #[allow(clippy::cast_possible_truncation)] +            bucket.update( +                &present, +                SystemTime::now() +                    .duration_since(SystemTime::UNIX_EPOCH) +                    .unwrap() +                    .as_millis() as u64, +            ); +        } + +        let ticket = bucket.ticket(); +        let start = Instant::now(); + +        // in this case, the ratelimiter should wait 10 seconds +        let respo = timeout(Duration::from_secs(12), ticket).await; +        let end = start.elapsed().as_secs(); + +        // we should have waited 10 seconds (+- 1s) +        assert_eq!(10, end); +        // and the ticket should be a success +        assert!(respo.is_ok()); +    } +} diff --git a/exes/ratelimit/src/buckets/mod.rs b/exes/ratelimit/src/buckets/mod.rs index c86d623..5263765 100644 --- a/exes/ratelimit/src/buckets/mod.rs +++ b/exes/ratelimit/src/buckets/mod.rs @@ -1,4 +1,17 @@ +use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; +  pub mod async_queue;  pub mod atomic_instant;  pub mod bucket; +pub mod noop_lock;  pub mod redis_lock; + +pub trait GlobalLock: Send + Sync { +    fn lock_for<'a>( +        self: &'a Arc<Self>, +        duration: Duration, +    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>; +    fn is_locked<'a>( +        self: &'a Arc<Self>, +    ) -> Pin<Box<dyn Future<Output = Option<Duration>> + Send + 'a>>; +} diff --git a/exes/ratelimit/src/buckets/noop_lock.rs b/exes/ratelimit/src/buckets/noop_lock.rs new file mode 100644 index 0000000..614eba3 --- /dev/null +++ b/exes/ratelimit/src/buckets/noop_lock.rs @@ -0,0 +1,18 @@ +use super::GlobalLock; + +pub struct NoOpLock; +impl GlobalLock for NoOpLock { +    fn lock_for<'a>( +        self: &'a std::sync::Arc<Self>, +        _duration: std::time::Duration, +    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>> { +        Box::pin(async move {}) +    } + +    fn is_locked<'a>( +        self: &'a std::sync::Arc<Self>, +    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<std::time::Duration>> + Send + 'a>> +    { +        Box::pin(async move { None }) +    } +} diff --git a/exes/ratelimit/src/buckets/redis_lock.rs b/exes/ratelimit/src/buckets/redis_lock.rs index fdae149..333d726 100644 --- a/exes/ratelimit/src/buckets/redis_lock.rs +++ b/exes/ratelimit/src/buckets/redis_lock.rs @@ -1,4 +1,6 @@  use std::{ +    future::Future, +    pin::Pin,      sync::{atomic::AtomicU64, Arc},      time::{Duration, SystemTime},  }; @@ -7,7 +9,9 @@ use redis::{aio::MultiplexedConnection, AsyncCommands};  use tokio::sync::Mutex;  use tracing::debug; -/// This is flawed and needs to be replaced sometime with the real RedisLock algorithm +use super::GlobalLock; + +/// This is flawed and needs to be replaced sometime with the real `RedisLock` algorithm  #[derive(Debug)]  pub struct RedisLock {      redis: Mutex<MultiplexedConnection>, @@ -15,70 +19,89 @@ pub struct RedisLock {  }  impl RedisLock { -    /// Set the global ratelimit as exhausted. -    pub async fn lock_for(self: &Arc<Self>, duration: Duration) { -        debug!("locking globally for {}", duration.as_secs()); -        let _: () = self -            .redis -            .lock() -            .await -            .set_ex( -                "nova:rls:lock", -                1, -                (duration.as_secs() + 1).try_into().unwrap(), -            ) -            .await -            .unwrap(); - -        self.is_locked.store( -            (SystemTime::now() + duration) -                .duration_since(SystemTime::UNIX_EPOCH) -                .unwrap() -                .as_millis() as u64, -            std::sync::atomic::Ordering::Relaxed, -        ); +    #[must_use] +    pub fn new(redis: MultiplexedConnection) -> Arc<Self> { +        Arc::new(Self { +            redis: Mutex::new(redis), +            is_locked: AtomicU64::new(0), +        })      } +} + +impl GlobalLock for RedisLock { +    fn lock_for<'a>( +        self: &'a Arc<Self>, +        duration: Duration, +    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { +        Box::pin(async move { +            debug!("locking globally for {}", duration.as_secs()); +            let _: () = self +                .redis +                .lock() +                .await +                .set_ex( +                    "nova:rls:lock", +                    1, +                    (duration.as_secs() + 1).try_into().unwrap(), +                ) +                .await +                .unwrap(); -    pub async fn locked_for(self: &Arc<Self>) -> Option<Duration> { -        let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed); -        if load != 0 { -            if load -                > SystemTime::now() +            // Integer truncating is expected +            #[allow(clippy::cast_possible_truncation)] +            self.is_locked.store( +                (SystemTime::now() + duration)                      .duration_since(SystemTime::UNIX_EPOCH)                      .unwrap() -                    .as_millis() as u64 -            { -                return Some(Duration::from_millis(load)); -            } else { +                    .as_millis() as u64, +                std::sync::atomic::Ordering::Relaxed, +            ); +        }) +    } + +    fn is_locked<'a>( +        self: &'a Arc<Self>, +    ) -> Pin<Box<dyn Future<Output = Option<Duration>> + Send + 'a>> { +        Box::pin(async move { +            let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed); +            if load != 0 { +                // Integer truncating is expected +                #[allow(clippy::cast_possible_truncation)] +                if load +                    > SystemTime::now() +                        .duration_since(SystemTime::UNIX_EPOCH) +                        .unwrap() +                        .as_millis() as u64 +                { +                    return Some(Duration::from_millis(load)); +                }                  self.is_locked                      .store(0, std::sync::atomic::Ordering::Relaxed);              } -        } -        let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await; -        match result { -            Ok(remaining_time) => { -                if remaining_time > 0 { -                    let duration = Duration::from_secs(remaining_time as u64); -                    debug!("external global lock detected, locking"); -                    self.lock_for(duration).await; -                    Some(duration) -                } else { -                    None +            let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await; +            match result { +                Ok(remaining_time) => { +                    if remaining_time > 0 { +                        // Sign loss is allowed since we know it's a positive number +                        // because a ttl is always positive when the key exists and have a ttl +                        // otherwise redis *will* return a negative number, hence the check for +                        // a positive sign. +                        #[allow(clippy::cast_sign_loss)] +                        let duration = Duration::from_secs(remaining_time as u64); +                        debug!("external global lock detected, locking"); +                        self.lock_for(duration).await; +                        Some(duration) +                    } else { +                        None +                    }                  } -            } -            Err(error) => { -                debug!("redis call failed: {}", error); +                Err(error) => { +                    debug!("redis call failed: {}", error); -                None +                    None +                }              } -        } -    } - -    pub fn new(redis: MultiplexedConnection) -> Arc<Self> { -        Arc::new(Self { -            redis: Mutex::new(redis), -            is_locked: AtomicU64::new(0),          })      }  } diff --git a/exes/ratelimit/src/config.rs b/exes/ratelimit/src/config.rs index df18b76..4114c7b 100644 --- a/exes/ratelimit/src/config.rs +++ b/exes/ratelimit/src/config.rs @@ -18,6 +18,6 @@ impl Default for ServerSettings {  }  #[derive(Debug, Deserialize, Clone, Default)] -pub struct RatelimitServerConfig { +pub struct Ratelimit {      pub server: ServerSettings,  } diff --git a/exes/ratelimit/src/grpc.rs b/exes/ratelimit/src/grpc.rs index 09885c1..9e5d31c 100644 --- a/exes/ratelimit/src/grpc.rs +++ b/exes/ratelimit/src/grpc.rs @@ -16,6 +16,7 @@ use twilight_http_ratelimiting::RatelimitHeaders;  use crate::buckets::bucket::Bucket;  use crate::buckets::redis_lock::RedisLock; +use crate::buckets::GlobalLock;  pub struct RLServer {      global: Arc<RedisLock>, @@ -34,12 +35,12 @@ impl RLServer {  struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);  impl<'a> Extractor for MetadataMap<'a> { -    /// Get a value for a key from the MetadataMap.  If the value can't be converted to &str, returns None +    /// Get a value for a key from the `MetadataMap`.  If the value can't be converted to &str, returns None      fn get(&self, key: &str) -> Option<&str> {          self.0.get(key).and_then(|metadata| metadata.to_str().ok())      } -    /// Collect all the keys from the MetadataMap. +    /// Collect all the keys from the `MetadataMap`.      fn keys(&self) -> Vec<&str> {          self.0              .keys() @@ -70,6 +71,10 @@ impl Ratelimiter for RLServer {          )          .unwrap(); +        if let Some(duration) = self.global.is_locked().await { +            tokio::time::sleep(duration).await; +        } +          let bucket: Arc<Bucket> = if self.buckets.read().await.contains_key(&data.path) {              self.buckets                  .read() @@ -78,7 +83,7 @@ impl Ratelimiter for RLServer {                  .expect("impossible")                  .clone()          } else { -            let bucket = Bucket::new(self.global.clone()); +            let bucket = Bucket::new();              self.buckets.write().await.insert(data.path, bucket.clone());              bucket          }; @@ -93,13 +98,14 @@ impl Ratelimiter for RLServer {                      global.retry_after()                  );                  self.global +                    .clone()                      .lock_for(Duration::from_secs(global.retry_after()))                      .await;              }              RatelimitHeaders::None => {}              RatelimitHeaders::Present(present) => {                  // we should update the bucket. -                bucket.update(present, data.precise_time); +                bucket.update(&present, data.precise_time);              }              _ => unreachable!(),          }; @@ -127,13 +133,13 @@ impl Ratelimiter for RLServer {                  .expect("impossible")                  .clone()          } else { -            let bucket = Bucket::new(self.global.clone()); +            let bucket = Bucket::new();              self.buckets.write().await.insert(data.path, bucket.clone());              bucket          };          // wait for the ticket to be accepted -        bucket.ticket().await; +        let _ = bucket.ticket().await;          Ok(Response::new(()))      } diff --git a/exes/ratelimit/src/lib.rs b/exes/ratelimit/src/lib.rs index 6653157..d1bd6e0 100644 --- a/exes/ratelimit/src/lib.rs +++ b/exes/ratelimit/src/lib.rs @@ -1,5 +1,17 @@ +#![deny( +    clippy::all, +    clippy::correctness, +    clippy::suspicious, +    clippy::style, +    clippy::complexity, +    clippy::perf, +    clippy::pedantic, +    clippy::nursery, +    unsafe_code +)] +  use buckets::redis_lock::RedisLock; -use config::RatelimitServerConfig; +use config::Ratelimit;  use grpc::RLServer;  use leash::{AnyhowResultFuture, Component};  use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer; @@ -10,13 +22,13 @@ use std::pin::Pin;  use tokio::sync::oneshot;  use tonic::transport::Server; -mod buckets; +pub mod buckets;  mod config;  mod grpc;  pub struct RatelimiterServerComponent {}  impl Component for RatelimiterServerComponent { -    type Config = RatelimitServerConfig; +    type Config = Ratelimit;      const SERVICE_NAME: &'static str = "ratelimiter";      fn start( diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index e87757c..9593ac8 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -23,7 +23,7 @@ pub struct Discord {  }  #[derive(Debug, Deserialize, Clone, Default)] -pub struct ReverseProxyConfig { +pub struct ReverseProxy {      pub server: ServerSettings,      pub discord: Discord,      pub ratelimiter_address: String, diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index a8f43cd..ab158fe 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -58,7 +58,7 @@ pub async fn handle_request(          let request_path = request.uri().path();          let (api_path, trimmed_path) = normalize_path(request_path); -        let mut uri_string = format!("http://127.0.0.1:9999{}{}", api_path, trimmed_path); +        let mut uri_string = format!("http://127.0.0.1:9999{api_path}{trimmed_path}");          if let Some(query) = request.uri().query() {              uri_string.push('?');              uri_string.push_str(query); @@ -103,12 +103,12 @@ pub async fn handle_request(              .expect("Failed to check header")              .starts_with("Bot")          { -            *auth = HeaderValue::from_str(&format!("Bot {}", token))?; +            *auth = HeaderValue::from_str(&format!("Bot {token}"))?;          }      } else {          request.headers_mut().insert(              AUTHORIZATION, -            HeaderValue::from_str(&format!("Bot {}", token))?, +            HeaderValue::from_str(&format!("Bot {token}"))?,          );      } @@ -132,12 +132,12 @@ pub async fn handle_request(      let headers = resp          .headers()          .into_iter() -        .map(|(k, v)| (k.to_string(), v.to_str().map(|f| f.to_string()))) +        .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string)))          .filter(|f| f.1.is_ok())          .map(|f| (f.0, f.1.expect("errors should be filtered")))          .collect(); -    let _ = ratelimiter +    let _submit_headers = ratelimiter          .submit_headers(hash, headers)          .instrument(info_span!("submitting headers"))          .await; diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs index 53ab12a..174bea0 100644 --- a/exes/rest/src/lib.rs +++ b/exes/rest/src/lib.rs @@ -1,4 +1,16 @@ -use config::ReverseProxyConfig; +#![deny( +    clippy::all, +    clippy::correctness, +    clippy::suspicious, +    clippy::style, +    clippy::complexity, +    clippy::perf, +    clippy::pedantic, +    clippy::nursery, +    unsafe_code +)] + +use config::ReverseProxy;  use handler::handle_request;  use hyper::{ @@ -20,7 +32,7 @@ mod ratelimit_client;  pub struct ReverseProxyServer {}  impl Component for ReverseProxyServer { -    type Config = ReverseProxyConfig; +    type Config = ReverseProxy;      const SERVICE_NAME: &'static str = "rest";      fn start( diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index c9bd52e..e2f7e0e 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -1,4 +1,4 @@ -use crate::config::ReverseProxyConfig; +use crate::config::ReverseProxy;  use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode};  use anyhow::anyhow; @@ -23,7 +23,7 @@ pub struct RemoteRatelimiter {      current_remotes: Vec<String>,      stop: Arc<tokio::sync::broadcast::Sender<()>>, -    config: ReverseProxyConfig, +    config: ReverseProxy,  }  impl Drop for RemoteRatelimiter { @@ -41,15 +41,15 @@ impl RemoteRatelimiter {          // get list of dns responses          let responses: Vec<String> = dns_lookup::lookup_host(&self.config.ratelimiter_address)?              .into_iter() -            .filter(|address| address.is_ipv4()) +            .filter(std::net::IpAddr::is_ipv4)              .map(|address| address.to_string())              .collect();          let mut write = self.remotes.write().await;          for ip in &responses { -            if !self.current_remotes.contains(&ip) { -                let a = VNode::new(ip.to_owned(), self.config.ratelimiter_port).await?; +            if !self.current_remotes.contains(ip) { +                let a = VNode::new(ip.clone(), self.config.ratelimiter_port).await?;                  write.add(a.clone());              }          } @@ -58,13 +58,13 @@ impl RemoteRatelimiter {      }      #[must_use] -    pub fn new(config: ReverseProxyConfig) -> Self { +    pub fn new(config: ReverseProxy) -> Self {          let (rx, mut tx) = broadcast::channel(1);          let obj = Self {              remotes: Arc::new(RwLock::new(HashRingWrapper::default())),              stop: Arc::new(rx),              config, -            current_remotes: vec![] +            current_remotes: vec![],          };          let obj_clone = obj.clone(); @@ -75,7 +75,7 @@ impl RemoteRatelimiter {                  match obj_clone.get_ratelimiters().await {                      Ok(_) => { -                        debug!("refreshed ratelimiting servers") +                        debug!("refreshed ratelimiting servers");                      }                      Err(err) => {                          error!("refreshing ratelimiting servers failed {}", err); @@ -122,7 +122,7 @@ impl RemoteRatelimiter {                  let context = span.context();                  let mut request = Request::new(BucketSubmitTicketRequest { path });                  global::get_text_map_propagator(|propagator| { -                    propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())) +                    propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()));                  });                  // Requesting @@ -158,13 +158,15 @@ impl RemoteRatelimiter {              let time = SystemTime::now()                  .duration_since(SystemTime::UNIX_EPOCH)?                  .as_millis(); +            // truncation is expected +            #[allow(clippy::cast_possible_truncation)]              let mut request = Request::new(HeadersSubmitRequest {                  path,                  precise_time: time as u64,                  headers,              });              global::get_text_map_propagator(|propagator| { -                propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())) +                propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()));              });              node.submit_headers(request).await?; diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs index ac025c8..b99276d 100644 --- a/exes/rest/src/ratelimit_client/remote_hashring.rs +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -38,7 +38,7 @@ impl Hash for VNode {  pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap);  impl<'a> Injector for MetadataMap<'a> { -    /// Set a key and value in the MetadataMap.  Does nothing if the key or value are not valid inputs +    /// Set a key and value in the `MetadataMap`.  Does nothing if the key or value are not valid inputs      fn set(&mut self, key: &str, value: String) {          if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {              if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { @@ -50,11 +50,11 @@ impl<'a> Injector for MetadataMap<'a> {  impl VNode {      pub async fn new(address: String, port: u16) -> Result<Self, tonic::transport::Error> { -        let host = format!("http://{}:{}", address.clone(), port); +        let host = format!("http://{}:{port}", address.clone());          debug!("connecting to {}", host);          let client = RatelimiterClient::connect(host).await?; -        Ok(VNode { client, address }) +        Ok(Self { address, client })      }  } diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs index b96f368..ccc7894 100644 --- a/exes/webhook/src/config.rs +++ b/exes/webhook/src/config.rs @@ -35,7 +35,7 @@ pub struct Discord {  }  #[derive(Debug, Deserialize, Clone, Default, Copy)] -pub struct WebhookConfig { +pub struct Webhook {      pub server: ServerSettings,      pub discord: Discord,  } diff --git a/exes/webhook/src/handler/error.rs b/exes/webhook/src/handler/error.rs deleted file mode 100644 index ffa4cca..0000000 --- a/exes/webhook/src/handler/error.rs +++ /dev/null @@ -1,36 +0,0 @@ -use hyper::{header::ToStrError, Body, Response, StatusCode}; - -pub struct WebhookError { -    pub code: StatusCode, -    pub message: String, -} - -impl WebhookError { -    pub fn new(code: StatusCode, message: &str) -> WebhookError { -        WebhookError { -            code, -            message: message.to_string(), -        } -    } -} - -impl From<WebhookError> for Response<Body> { -    fn from(value: WebhookError) -> Self { -        Response::builder() -            .status(value.code) -            .body(value.message.into()) -            .unwrap() -    } -} - -impl From<hyper::Error> for WebhookError { -    fn from(_: hyper::Error) -> Self { -        WebhookError::new(StatusCode::BAD_REQUEST, "invalid request") -    } -} - -impl From<ToStrError> for WebhookError { -    fn from(_: ToStrError) -> Self { -        WebhookError::new(StatusCode::BAD_REQUEST, "invalid request") -    } -} diff --git a/exes/webhook/src/handler/make_service.rs b/exes/webhook/src/handler/make_service.rs index b51494a..4202cd7 100644 --- a/exes/webhook/src/handler/make_service.rs +++ b/exes/webhook/src/handler/make_service.rs @@ -23,7 +23,7 @@ impl<T, V: Clone> Service<T> for MakeSvc<V> {  }  impl<T: Clone> MakeSvc<T> { -    pub fn new(service: T) -> Self { +    pub const fn new(service: T) -> Self {          Self { service }      }  } diff --git a/exes/webhook/src/handler/mod.rs b/exes/webhook/src/handler/mod.rs index 594919b..ea7ecca 100644 --- a/exes/webhook/src/handler/mod.rs +++ b/exes/webhook/src/handler/mod.rs @@ -1,18 +1,17 @@ -use crate::config::WebhookConfig; +use crate::config::Webhook; +use anyhow::bail;  use async_nats::Client;  use ed25519_dalek::PublicKey; -use error::WebhookError;  use hyper::{      body::{to_bytes, Bytes},      service::Service, -    Body, Method, Request, Response, StatusCode, +    Body, Method, Request, Response,  };  use shared::payloads::{CachePayload, DispatchEventTagged}; -use signature::validate_signature; +use signature::validate;  use std::{      future::Future,      pin::Pin, -    str::from_utf8,      task::{Context, Poll},  };  use tracing::{debug, error}; @@ -22,7 +21,6 @@ use twilight_model::{      gateway::payload::incoming::InteractionCreate,  }; -mod error;  pub mod make_service;  mod signature; @@ -32,46 +30,37 @@ pub mod tests;  /// Hyper service used to handle the discord webhooks  #[derive(Clone)]  pub struct WebhookService { -    pub config: WebhookConfig, +    pub config: Webhook,      pub nats: Client,  }  impl WebhookService { -    async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> { +    async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, anyhow::Error> {          if req.method() == Method::POST {              let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") { -                sig.to_owned() +                sig.clone()              } else { -                return Err(WebhookError::new( -                    StatusCode::BAD_REQUEST, -                    "missing signature header", -                )); +                bail!("Missing signature header");              };              let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") { -                timestamp.to_owned() +                timestamp.clone()              } else { -                return Err(WebhookError::new( -                    StatusCode::BAD_REQUEST, -                    "missing timestamp header", -                )); +                bail!("Missing timestamp header");              };              let data = to_bytes(req.into_body()).await?; -            if validate_signature( +            if validate(                  &pk,                  &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),                  signature.to_str()?,              ) {                  Ok(data)              } else { -                Err(WebhookError::new( -                    StatusCode::UNAUTHORIZED, -                    "invalid signature", -                )) +                bail!("invalid signature");              }          } else { -            Err(WebhookError::new(StatusCode::NOT_FOUND, "not found")) +            bail!("not found");          }      } @@ -79,69 +68,46 @@ impl WebhookService {          req: Request<Body>,          nats: Client,          pk: PublicKey, -    ) -> Result<Response<Body>, WebhookError> { -        match Self::check_request(req, pk).await { -            Ok(data) => { -                let utf8 = from_utf8(&data); -                match utf8 { -                    Ok(data) => match serde_json::from_str::<Interaction>(data) { -                        Ok(value) => { -                            match value.kind { -                                InteractionType::Ping => Ok(Response::builder() -                                    .header("Content-Type", "application/json") -                                    .body(r#"{"type":1}"#.into()) -                                    .unwrap()), -                                _ => { -                                    debug!("calling nats"); -                                    // this should hopefully not fail ? - -                                    let data = CachePayload { -                                        data: DispatchEventTagged { -                                            data: DispatchEvent::InteractionCreate(Box::new( -                                                InteractionCreate(value), -                                            )), -                                        }, -                                    }; - -                                    let payload = serde_json::to_string(&data).unwrap(); - -                                    match nats -                                        .request( -                                            "nova.cache.dispatch.INTERACTION_CREATE".to_string(), -                                            Bytes::from(payload), -                                        ) -                                        .await -                                    { -                                        Ok(response) => Ok(Response::builder() -                                            .header("Content-Type", "application/json") -                                            .body(Body::from(response.payload)) -                                            .unwrap()), - -                                        Err(error) => { -                                            error!("failed to request nats: {}", error); -                                            Err(WebhookError::new( -                                                StatusCode::INTERNAL_SERVER_ERROR, -                                                "failed to request nats", -                                            )) -                                        } -                                    } -                                } -                            } -                        } - -                        Err(error) => { -                            error!("invalid json body: {}", error); -                            Err(WebhookError::new( -                                StatusCode::BAD_REQUEST, -                                "invalid json body", -                            )) -                        } -                    }, - -                    Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")), +    ) -> Result<Response<Body>, anyhow::Error> { +        let data = Self::check_request(req, pk).await?; +        let interaction: Interaction = serde_json::from_slice(&data)?; + +        if interaction.kind == InteractionType::Ping { +            Ok(Response::builder() +                .header("Content-Type", "application/json") +                .body(r#"{"type":1}"#.into()) +                .unwrap()) +        } else { +            debug!("calling nats"); +            // this should hopefully not fail ? + +            let data = CachePayload { +                data: DispatchEventTagged { +                    data: DispatchEvent::InteractionCreate(Box::new(InteractionCreate( +                        interaction, +                    ))), +                }, +            }; + +            let payload = serde_json::to_string(&data).unwrap(); + +            match nats +                .request( +                    "nova.cache.dispatch.INTERACTION_CREATE".to_string(), +                    Bytes::from(payload), +                ) +                .await +            { +                Ok(response) => Ok(Response::builder() +                    .header("Content-Type", "application/json") +                    .body(Body::from(response.payload)) +                    .unwrap()), + +                Err(error) => { +                    error!("failed to request nats: {}", error); +                    Err(anyhow::anyhow!("internal error"))                  }              } -            Err(error) => Err(error),          }      }  } @@ -149,7 +115,7 @@ impl WebhookService {  /// Implementation of the service  impl Service<hyper::Request<Body>> for WebhookService {      type Response = hyper::Response<Body>; -    type Error = hyper::Error; +    type Error = anyhow::Error;      type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;      fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> { @@ -163,7 +129,7 @@ impl Service<hyper::Request<Body>> for WebhookService {              match response {                  Ok(r) => Ok(r), -                Err(e) => Ok(e.into()), +                Err(e) => Err(e),              }          })      } diff --git a/exes/webhook/src/handler/signature.rs b/exes/webhook/src/handler/signature.rs index 05221d3..5a48645 100644 --- a/exes/webhook/src/handler/signature.rs +++ b/exes/webhook/src/handler/signature.rs @@ -1,14 +1,13 @@  use ed25519_dalek::{PublicKey, Signature, Verifier};  #[inline] -pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool { +pub fn validate(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool {      let mut slice: [u8; Signature::BYTE_SIZE] = [0; Signature::BYTE_SIZE];      let signature_result = hex::decode_to_slice(hex_signature, &mut slice); -    let mut result = false;      if signature_result.is_ok() { -        result = public_key.verify(data, &Signature::from(slice)).is_ok(); +        public_key.verify(data, &Signature::from(slice)).is_ok() +    } else { +        false      } - -    result  } diff --git a/exes/webhook/src/handler/tests/signature.rs b/exes/webhook/src/handler/tests/signature.rs index 0bed86a..4ff52ff 100644 --- a/exes/webhook/src/handler/tests/signature.rs +++ b/exes/webhook/src/handler/tests/signature.rs @@ -1,16 +1,16 @@ -use crate::handler::signature::validate_signature; +use crate::handler::signature::validate;  use ed25519_dalek::PublicKey;  #[test]  fn validate_signature_test() {      let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002"; -    let content = "message de test incroyable".as_bytes().to_vec(); +    let content = b"message de test incroyable";      let public_key = PublicKey::from_bytes(          &hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap(),      )      .unwrap(); -    assert!(validate_signature(&public_key, &content, signature)) +    assert!(validate(&public_key, content, signature));  }  #[test] @@ -21,10 +21,8 @@ fn validate_signature_reverse_test() {      )      .unwrap(); -    let content = "ceci est un test qui ne fonctionnera pas!" -        .as_bytes() -        .to_vec(); -    assert!(!validate_signature(&public_key, &content, signature)) +    let content = b"ceci est un test qui ne fonctionnera pas!"; +    assert!(!validate(&public_key, content, signature));  }  #[test] @@ -35,8 +33,6 @@ fn invalid_hex() {      )      .unwrap(); -    let content = "ceci est un test qui ne fonctionnera pas!" -        .as_bytes() -        .to_vec(); -    assert!(!validate_signature(&public_key, &content, signature)) +    let content = b"ceci est un test qui ne fonctionnera pas!"; +    assert!(!validate(&public_key, content, signature));  } diff --git a/exes/webhook/src/lib.rs b/exes/webhook/src/lib.rs index 057e70f..933f38e 100644 --- a/exes/webhook/src/lib.rs +++ b/exes/webhook/src/lib.rs @@ -1,9 +1,21 @@ +#![deny( +    clippy::all, +    clippy::correctness, +    clippy::suspicious, +    clippy::style, +    clippy::complexity, +    clippy::perf, +    clippy::pedantic, +    clippy::nursery, +    unsafe_code +)] +  mod config;  mod handler;  use std::{future::Future, pin::Pin};  use crate::{ -    config::WebhookConfig, +    config::Webhook,      handler::{make_service::MakeSvc, WebhookService},  };  use async_nats::Client; @@ -16,7 +28,7 @@ use tracing::info;  pub struct WebhookServer {}  impl Component for WebhookServer { -    type Config = WebhookConfig; +    type Config = Webhook;      const SERVICE_NAME: &'static str = "webhook";      fn start( diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs index cd67075..37e2b7c 100644 --- a/libs/leash/src/lib.rs +++ b/libs/leash/src/lib.rs @@ -1,3 +1,14 @@ +#![deny( +    clippy::all, +    clippy::correctness, +    clippy::suspicious, +    clippy::style, +    clippy::complexity, +    clippy::perf, +    clippy::pedantic, +    clippy::nursery, +)] +  use anyhow::Result;  use opentelemetry::sdk::propagation::TraceContextPropagator;  use opentelemetry::sdk::trace::{self}; diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs index adcaf79..cdf0bd3 100644 --- a/libs/shared/src/config.rs +++ b/libs/shared/src/config.rs @@ -5,22 +5,24 @@ use std::{env, ops::Deref};  use tracing::info;  #[derive(Debug, Deserialize, Clone)] -pub struct Settings<T: Clone + DeserializeOwned + Default> { +pub struct Settings<T: Clone + DeserializeOwned> {      #[serde(skip_deserializing)]      pub config: T, -    pub nats: crate::nats::NatsConfiguration, -    pub redis: crate::redis::RedisConfiguration, +    pub nats: crate::nats::Configuration, +    pub redis: crate::redis::Configuration,  }  impl<T: Clone + DeserializeOwned + Default> Settings<T> { -    pub fn new(service_name: &str) -> Result<Settings<T>> { +    /// # Errors +    /// Fails it the config could not be deserialized to `Self::T` +    pub fn new(service_name: &str) -> Result<Self> {          let mut builder = Config::builder();          builder = builder.add_source(File::with_name("config/default"));          let mode = env::var("ENV").unwrap_or_else(|_| "development".into());          info!("Configuration Environment: {}", mode); -        builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false)); +        builder = builder.add_source(File::with_name(&format!("config/{mode}")).required(false));          builder = builder.add_source(File::with_name("config/local").required(false));          let env = Environment::with_prefix("NOVA").separator("__"); @@ -28,7 +30,7 @@ impl<T: Clone + DeserializeOwned + Default> Settings<T> {          builder = builder.add_source(env);          let config = builder.build()?; -        let mut settings: Settings<T> = config.clone().try_deserialize()?; +        let mut settings: Self = config.clone().try_deserialize()?;          //  try to load the config          settings.config = config.get::<T>(service_name)?; diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index a714a1b..92e7a05 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -1,5 +1,14 @@ -/// This crate is all the utilities shared by the nova rust projects -/// It includes logging, config and protocols. +#![deny( +    clippy::all, +    clippy::correctness, +    clippy::suspicious, +    clippy::style, +    clippy::complexity, +    clippy::perf, +    clippy::pedantic, +    clippy::nursery, +)] +  pub mod config;  pub mod nats;  pub mod payloads; diff --git a/libs/shared/src/nats.rs b/libs/shared/src/nats.rs index 3529c46..7d4d3d1 100644 --- a/libs/shared/src/nats.rs +++ b/libs/shared/src/nats.rs @@ -4,23 +4,12 @@ use async_nats::Client;  use serde::Deserialize;  #[derive(Clone, Debug, Deserialize)] -pub struct NatsConfigurationClientCert { -    pub cert: String, -    pub key: String, -} - -#[derive(Clone, Debug, Deserialize)] -pub struct NatsConfigurationTls { -    pub mtu: Option<usize>, -} - -#[derive(Clone, Debug, Deserialize)] -pub struct NatsConfiguration { +pub struct Configuration {      pub host: String,  } -impl From<NatsConfiguration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>> { -    fn from(value: NatsConfiguration) -> Self { +impl From<Configuration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>> { +    fn from(value: Configuration) -> Self {          Box::pin(async move { Ok(async_nats::connect(value.host).await?) })      }  } diff --git a/libs/shared/src/payloads.rs b/libs/shared/src/payloads.rs index 6e51b2d..3f183a9 100644 --- a/libs/shared/src/payloads.rs +++ b/libs/shared/src/payloads.rs @@ -31,7 +31,7 @@ impl<'de> Deserialize<'de> for DispatchEventTagged {          let tagged = DispatchEventTaggedSerialized::deserialize(deserializer)?;          let deserializer_seed = DispatchEventWithTypeDeserializer::new(&tagged.kind);          let dispatch_event = deserializer_seed.deserialize(tagged.data).unwrap(); -        Ok(DispatchEventTagged { +        Ok(Self {              data: dispatch_event,          })      } diff --git a/libs/shared/src/redis.rs b/libs/shared/src/redis.rs index a623c2f..77aa97f 100644 --- a/libs/shared/src/redis.rs +++ b/libs/shared/src/redis.rs @@ -3,14 +3,14 @@ use serde::Deserialize;  use std::{future::Future, pin::Pin};  #[derive(Clone, Debug, Deserialize)] -pub struct RedisConfiguration { +pub struct Configuration {      pub url: String,  } -impl From<RedisConfiguration> +impl From<Configuration>      for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>> + Send>>  { -    fn from(value: RedisConfiguration) -> Self { +    fn from(value: Configuration) -> Self {          Box::pin(async move {              let con = Client::open(value.url)?;              let (multiplex, ready) = con.create_multiplexed_tokio_connection().await?;  | 
