]> git.puffer.fish Git - matthieu/nova.git/commitdiff
clippy, tests and a bit of docs
authorMatthieuCoder <matthieu@matthieu-dev.xyz>
Sat, 14 Jan 2023 13:45:48 +0000 (17:45 +0400)
committerMatthieuCoder <matthieu@matthieu-dev.xyz>
Sat, 14 Jan 2023 13:45:48 +0000 (17:45 +0400)
42 files changed:
Cargo.lock
docs/architecture.md [new file with mode: 0644]
docs/quickstart.md
exes/all-in-one/Makefile [deleted file]
exes/all-in-one/main.go [deleted file]
exes/all-in-one/src/errors.rs
exes/all-in-one/src/ffi.rs
exes/all-in-one/src/lib.rs
exes/all-in-one/src/utils.rs
exes/cache/Cargo.toml
exes/cache/README.md [new file with mode: 0644]
exes/gateway/src/config.rs
exes/gateway/src/lib.rs
exes/ratelimit/Cargo.toml
exes/ratelimit/bench/req.rs [deleted file]
exes/ratelimit/benches/bucket.rs [new file with mode: 0644]
exes/ratelimit/src/buckets/atomic_instant.rs
exes/ratelimit/src/buckets/bucket.rs
exes/ratelimit/src/buckets/mod.rs
exes/ratelimit/src/buckets/noop_lock.rs [new file with mode: 0644]
exes/ratelimit/src/buckets/redis_lock.rs
exes/ratelimit/src/config.rs
exes/ratelimit/src/grpc.rs
exes/ratelimit/src/lib.rs
exes/rest/src/config.rs
exes/rest/src/handler.rs
exes/rest/src/lib.rs
exes/rest/src/ratelimit_client/mod.rs
exes/rest/src/ratelimit_client/remote_hashring.rs
exes/webhook/src/config.rs
exes/webhook/src/handler/error.rs [deleted file]
exes/webhook/src/handler/make_service.rs
exes/webhook/src/handler/mod.rs
exes/webhook/src/handler/signature.rs
exes/webhook/src/handler/tests/signature.rs
exes/webhook/src/lib.rs
libs/leash/src/lib.rs
libs/shared/src/config.rs
libs/shared/src/lib.rs
libs/shared/src/nats.rs
libs/shared/src/payloads.rs
libs/shared/src/redis.rs

index 24585f9193143ef97b453df2f660aa9f55d3224c..45885095514cf643142726ead7f5a139ca453f43 100644 (file)
@@ -63,6 +63,12 @@ dependencies = [
  "webhook",
 ]
 
+[[package]]
+name = "anes"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
+
 [[package]]
 name = "anyhow"
 version = "1.0.68"
@@ -301,6 +307,12 @@ dependencies = [
  "twilight-model",
 ]
 
+[[package]]
+name = "cast"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
+
 [[package]]
 name = "cbindgen"
 version = "0.24.3"
@@ -332,6 +344,33 @@ version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
+[[package]]
+name = "ciborium"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b0c137568cc60b904a7724001b35ce2630fd00d5d84805fbb608ab89509d788f"
+dependencies = [
+ "ciborium-io",
+ "ciborium-ll",
+ "serde",
+]
+
+[[package]]
+name = "ciborium-io"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "346de753af073cc87b52b2083a506b38ac176a44cfb05497b622e27be899b369"
+
+[[package]]
+name = "ciborium-ll"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "213030a2b5a4e0c0892b6652260cf6ccac84827b83a85a534e178e3906c4cf1b"
+dependencies = [
+ "ciborium-io",
+ "half",
+]
+
 [[package]]
 name = "clap"
 version = "3.2.23"
@@ -435,6 +474,44 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "criterion"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb"
+dependencies = [
+ "anes",
+ "atty",
+ "cast",
+ "ciborium",
+ "clap",
+ "criterion-plot",
+ "futures",
+ "itertools",
+ "lazy_static",
+ "num-traits",
+ "oorandom",
+ "plotters",
+ "rayon",
+ "regex",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "tinytemplate",
+ "tokio",
+ "walkdir",
+]
+
+[[package]]
+name = "criterion-plot"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
+dependencies = [
+ "cast",
+ "itertools",
+]
+
 [[package]]
 name = "crossbeam-channel"
 version = "0.5.6"
@@ -445,6 +522,30 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "crossbeam-deque"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc"
+dependencies = [
+ "cfg-if",
+ "crossbeam-epoch",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.9.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a"
+dependencies = [
+ "autocfg",
+ "cfg-if",
+ "crossbeam-utils",
+ "memoffset",
+ "scopeguard",
+]
+
 [[package]]
 name = "crossbeam-utils"
 version = "0.8.14"
@@ -844,6 +945,12 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "half"
+version = "1.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
+
 [[package]]
 name = "hashbrown"
 version = "0.12.3"
@@ -1185,6 +1292,15 @@ version = "2.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
 
+[[package]]
+name = "memoffset"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
+dependencies = [
+ "autocfg",
+]
+
 [[package]]
 name = "mime"
 version = "0.3.16"
@@ -1315,6 +1431,12 @@ version = "1.17.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
 
+[[package]]
+name = "oorandom"
+version = "11.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
+
 [[package]]
 name = "opaque-debug"
 version = "0.3.0"
@@ -1598,6 +1720,34 @@ version = "0.3.26"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
 
+[[package]]
+name = "plotters"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2538b639e642295546c50fcd545198c9d64ee2a38620a628724a3b266d5fbf97"
+dependencies = [
+ "num-traits",
+ "plotters-backend",
+ "plotters-svg",
+ "wasm-bindgen",
+ "web-sys",
+]
+
+[[package]]
+name = "plotters-backend"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "193228616381fecdc1224c62e96946dfbc73ff4384fba576e052ff8c1bea8142"
+
+[[package]]
+name = "plotters-svg"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9a81d2759aae1dae668f783c308bc5c8ebd191ff4184aaa1b37f65a6ae5a56f"
+dependencies = [
+ "plotters-backend",
+]
+
 [[package]]
 name = "ppv-lite86"
 version = "0.2.17"
@@ -1779,6 +1929,8 @@ name = "ratelimit"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "criterion",
+ "env_logger 0.7.1",
  "hyper",
  "leash",
  "opentelemetry",
@@ -1787,14 +1939,40 @@ dependencies = [
  "redis",
  "serde",
  "shared",
+ "test-log",
  "tokio",
  "tokio-stream",
+ "tokio-test",
  "tonic",
  "tracing",
  "tracing-opentelemetry",
+ "tracing-subscriber",
+ "tracing-test",
  "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",
 ]
 
+[[package]]
+name = "rayon"
+version = "1.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6db3a213adf02b3bcfd2d3846bb41cb22857d131789e01df434fb7e7bc0759b7"
+dependencies = [
+ "either",
+ "rayon-core",
+]
+
+[[package]]
+name = "rayon-core"
+version = "1.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cac410af5d00ab6884528b4ab69d1e8e146e8d471201800fa1b4524126de6ad3"
+dependencies = [
+ "crossbeam-channel",
+ "crossbeam-deque",
+ "crossbeam-utils",
+ "num_cpus",
+]
+
 [[package]]
 name = "redis"
 version = "0.22.1"
@@ -1991,6 +2169,15 @@ version = "1.0.12"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"
 
+[[package]]
+name = "same-file"
+version = "1.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
+dependencies = [
+ "winapi-util",
+]
+
 [[package]]
 name = "schannel"
 version = "0.1.20"
@@ -2330,6 +2517,17 @@ dependencies = [
  "winapi-util",
 ]
 
+[[package]]
+name = "test-log"
+version = "0.2.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38f0c854faeb68a048f0f2dc410c5ddae3bf83854ef0e4977d58306a5edef50e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "textwrap"
 version = "0.16.0"
@@ -2392,6 +2590,16 @@ dependencies = [
  "time-core",
 ]
 
+[[package]]
+name = "tinytemplate"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc"
+dependencies = [
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "tinyvec"
 version = "1.6.0"
@@ -2481,6 +2689,19 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tokio-test"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
+dependencies = [
+ "async-stream",
+ "bytes",
+ "futures-core",
+ "tokio",
+ "tokio-stream",
+]
+
 [[package]]
 name = "tokio-tungstenite"
 version = "0.17.2"
@@ -2704,6 +2925,29 @@ dependencies = [
  "tracing-log",
 ]
 
+[[package]]
+name = "tracing-test"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e3d272c44878d2bbc9f4a20ad463724f03e19dbc667c6e84ac433ab7ffcc70b"
+dependencies = [
+ "lazy_static",
+ "tracing-core",
+ "tracing-subscriber",
+ "tracing-test-macro",
+]
+
+[[package]]
+name = "tracing-test-macro"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "744324b12d69a9fc1edea4b38b7b1311295b662d161ad5deac17bb1358224a08"
+dependencies = [
+ "lazy_static",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "try-lock"
 version = "0.2.3"
@@ -2909,6 +3153,17 @@ version = "0.9.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
 
+[[package]]
+name = "walkdir"
+version = "2.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
+dependencies = [
+ "same-file",
+ "winapi",
+ "winapi-util",
+]
+
 [[package]]
 name = "want"
 version = "0.3.0"
diff --git a/docs/architecture.md b/docs/architecture.md
new file mode 100644 (file)
index 0000000..b30f98f
--- /dev/null
@@ -0,0 +1,42 @@
+# Nova architecture
+
+The nova architecture is composed of multiple components. Each of them is horizontally scale-able.
+
+```
+                                            ┌──────────────────┐
+                                            │                  │
+                              ┌─────────────┤    Discord API   ├──────────────┐
+                              │             │                  │              │
+                              │             └────────┬─────────┘              │
+                              │                      │                        │
+                              │                      │                        │
+                              │                      │                        │
+                    ┌─────────┴────────┐    ┌────────┴─────────┐    ┌─────────┴────────┐
+                    │                  │    │                  │    │                  │
+                    │    Rest Proxy    │    │  Gateway client  │    │  Webhook Server  │
+                    │                  │    │                  │    │                  │
+                    └─────────┬──┬─────┘    └────────┬─────────┘    └─────────┬────────┘
+                              │  │                   │                        │
+                              │  │                   │                        │
+                              │  │                   │                        │
+                              │  │                   │                        │
+                              │  │                   │                        │
+                              │  │                   │                        │
+                              │  └───────┐           │                        │
+┌────────────────┐   ┌────────┴───────┐  │   ┌───────┴────────┐               │
+│                │   │                │  │   │                ├───────────────┘
+│     Redis      ├───┤    Ratelimit   │  │   │  Nats broker   │
+│                │   │                │  │   │                ├──────────────────┐
+└────────────────┘   └────────────────┘  │   └───────┬────────┘                  │
+                                         │           │                           │
+                                         │           │                           │
+                                         │   ┌───────┴────────┐           ┌──────┴─────┐
+                                         │   │                │           │            │
+                                         │   │  Cache manager ├───────────┤    User    │
+                                         │   │                │           │            │
+                                         │   └────────────────┘           └──────┬─────┘
+                                         └───────────────────────────────────────┘
+```
+
+## Rest Proxy
+
index 1eace72aac1e6d77c61fca26fddb0236dce55e1e..d4ca78bc8ea2d8bc147aae061eded21225f67404 100644 (file)
@@ -1,24 +1,2 @@
 # 5 Minutes quickstart
 
-This page shows you how to start a new nova project
-under five minutes using a typescript project,
-hold tight this is going to be fast.
-
-## Requirements
-
-* A discord bot application available
-* [Docker](https://docker.io/) (or alternatives) available to you.
-* A domain name / [ngrok.io](https://ngrok.com/) domain (for webhooks only)
-
-> If you are deploying nova to production, consider following the
-> production guide instead.
-
-## Setting up a nova instance
-
-Clone the [example repo](https://github.com/discordnova/nova-quickstart.git) like so
-
-`git clone https://github.com/discordnova/nova-quickstart.git`,
-
-In this folder, find the `config.yml.example` file and rename it to match `config.yml`
-
-Next, you need to fill all the environment variables to match your discord bot.
diff --git a/exes/all-in-one/Makefile b/exes/all-in-one/Makefile
deleted file mode 100644 (file)
index 8ed17c4..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-clean:
-       rm ./build/*
-
-library:
-       cargo build --release
-
-build: library
-       cp ../../target/release/liball_in_one.a ./build
-       go build -a -ldflags '-s' -o build/all-in-one
-
-all: library build
-
-run: all-in-one
-       ./build/all-in-one
-
diff --git a/exes/all-in-one/main.go b/exes/all-in-one/main.go
deleted file mode 100644 (file)
index 1de08db..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-package main
-
-/*
-#cgo LDFLAGS: -L./build -lall_in_one -lz -lm
-#include "./build/all-in-one.h"
-*/
-import "C"
-
-import (
-       "fmt"
-       "os"
-       "os/signal"
-       "syscall"
-       "time"
-
-       "github.com/Jeffail/gabs"
-       "github.com/alicebob/miniredis/v2"
-
-       server "github.com/nats-io/nats-server/v2/server"
-)
-
-func main() {
-       // Intialise les logs de la librarie Rust
-       C.init_logs()
-       // Charge la configuration
-       str := C.GoString(C.load_config())
-
-       // Démarre une instance MiniRedis
-       mr := miniredis.NewMiniRedis()
-       err := mr.Start()
-
-       if err != nil {
-               panic(err)
-       }
-
-       // Démarre un serveur Nats
-       opts := &server.Options{}
-       opts.Host = "0.0.0.0"
-       ns, err := server.NewServer(opts)
-
-       if err != nil {
-               panic(err)
-       }
-
-       go ns.Start()
-
-       if !ns.ReadyForConnections(4 * time.Second) {
-               panic("not ready for connection")
-       }
-
-       // Edite le json de configuration donné
-       // Et injecte la configuration des servers Nats et MiniRedis
-       json, _ := gabs.ParseJSON([]byte(str))
-       json.Set(fmt.Sprintf("redis://%s", mr.Addr()), "redis", "url")
-       json.Set("localhost", "nats", "host")
-       json.Set(1, "webhook", "discord", "client_id")
-
-       // Démarre une instance de nova
-       instance := C.start_instance(C.CString(json.String()))
-
-       // Wait for a SIGINT
-       c := make(chan os.Signal, 1)
-       signal.Notify(c,
-               syscall.SIGHUP,
-               syscall.SIGINT,
-               syscall.SIGTERM,
-               syscall.SIGQUIT)
-       <-c
-
-       println("Arret de nova all in one")
-       C.stop_instance(instance)
-}
index d676e8d3837852dca7951f2f57b86fe12c3eda39..1d2a9e29e5ff53e042ce1ffa2c9e702e04cd37a7 100644 (file)
@@ -1,7 +1,6 @@
 use std::cell::RefCell;
 
 use anyhow::Result;
-use libc::c_int;
 use tracing::error;
 
 thread_local! {
@@ -9,7 +8,7 @@ thread_local! {
 }
 
 /// Update the most recent error, clearing whatever may have been there before.
-pub fn stacktrace(err: anyhow::Error) -> String {
+#[must_use] pub fn stacktrace(err: &anyhow::Error) -> String {
     format!("{err}")
 }
 
@@ -23,13 +22,15 @@ where
         Ok(ok) => Some(ok),
         Err(error) => {
             // Call the handler
-            handle_error(error);
+            handle_error(&error);
             None
         }
     }
 }
 
-pub fn handle_error(error: anyhow::Error) {
+/// # Panics
+/// Panics if the stacktrace size is > than an i32
+pub fn handle_error(error: &anyhow::Error) {
     ERROR_HANDLER.with(|val| {
         let mut stacktrace = stacktrace(error);
 
@@ -38,8 +39,8 @@ pub fn handle_error(error: anyhow::Error) {
             // Call the error handler
             unsafe {
                 func(
-                    stacktrace.len() as c_int + 1,
-                    stacktrace.as_mut_ptr() as *mut i8,
+                    (stacktrace.len() + 1).try_into().unwrap(),
+                    stacktrace.as_mut_ptr().cast::<i8>(),
                 );
             }
         }
index 7a8821f82bc62f450b2540b33564d41517d3c8cd..449d1ccb2cbe736ffda70f4dc7dfe2067fa7ca7e 100644 (file)
@@ -69,8 +69,12 @@ pub unsafe extern "C" fn stop_instance(instance: *mut AllInOneInstance) {
     });
 }
 
+/// # Panics
+/// Panics if an incorrect `RUST_LOG` variables is specified.
 #[no_mangle]
 pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneInstance {
+    // Returning a null pointer (unaligned) is expected.
+    #[allow(clippy::cast_ptr_alignment)]
     wrap_result(move || {
         let value = CString::from_raw(config);
         let json = value.to_str()?;
@@ -87,7 +91,7 @@ pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneIn
             .with(fmt::layer())
             .with(
                 EnvFilter::builder()
-                    .with_default_directive(Directive::from_str("info").unwrap())
+                    .with_default_directive(Directive::from_str("info").expect(""))
                     .from_env()
                     .unwrap(),
             )
@@ -96,7 +100,7 @@ pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneIn
         // Error handling task
         runtime.spawn(async move {
             while let Some(error) = errors.recv().await {
-                handle_error(error)
+                handle_error(&error);
             }
         });
 
index 74625c02e04c533c26b7041b4730209a7f4f22a6..493bf46525cfdad2e28a21313cd72af6592a67af 100644 (file)
@@ -1,3 +1,14 @@
+#![deny(
+    clippy::all,
+    clippy::correctness,
+    clippy::suspicious,
+    clippy::style,
+    clippy::complexity,
+    clippy::perf,
+    clippy::pedantic,
+    clippy::nursery,
+)]
+
 pub mod errors;
 pub mod ffi;
 pub mod utils;
index 7a7134d11d84214499e6f294078cdc714981da3a..159d98d29366f5c86a51423c097c65e6d1797e0f 100644 (file)
@@ -26,7 +26,7 @@ fn load_settings_for<T: Default + DeserializeOwned + Clone>(
     name: &str,
 ) -> Result<Settings<T>> {
     let value: Value = serde_json::from_str(settings)?;
-    let section: T = serde_json::from_value(value.get(name).unwrap().to_owned())?;
+    let section: T = serde_json::from_value(value.get(name).unwrap().clone())?;
     let mut settings: Settings<T> = serde_json::from_value(value)?;
     settings.config = section;
 
@@ -69,7 +69,7 @@ pub(crate) fn load_config_file() -> Result<Value> {
     let mode = std::env::var("ENV").unwrap_or_else(|_| "development".into());
     info!("Configuration Environment: {}", mode);
 
-    builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false));
+    builder = builder.add_source(File::with_name(&format!("config/{mode}")).required(false));
     builder = builder.add_source(File::with_name("config/local").required(false));
 
     let env = Environment::with_prefix("NOVA").separator("__");
index 349deaa633c13e553266ac1a0c9ad07e571e3570..70cb6a3903c3b96d7097ed778c33e7b07c7e35a2 100644 (file)
@@ -2,8 +2,12 @@
 name = "cache"
 version = "0.1.0"
 edition = "2018"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+description = "Stores the data from discord if needed"
+readme = "README.md"
+repository = "https://github.com/discordnova/nova.git"
+keywords = ["discord", "scaleable", "cache"]
+categories = ["microservices", "nova"]
+license = "APACHE2"
 
 [dependencies]
 shared = { path = "../../libs/shared" }
diff --git a/exes/cache/README.md b/exes/cache/README.md
new file mode 100644 (file)
index 0000000..0395009
--- /dev/null
@@ -0,0 +1,3 @@
+# Cache
+
+Stores the data from discord if needed
\ No newline at end of file
index 7b12bfeccc826cb9828ee9c0dac01ad588b9fd33..a1a2fad5852ffdcb7a62383c208bef58fbf61b41 100644 (file)
@@ -2,14 +2,14 @@ use serde::{Deserialize, Serialize};
 use twilight_gateway::Intents;
 
 #[derive(Serialize, Deserialize, Clone)]
-pub struct GatewayConfig {
+pub struct Gateway {
     pub token: String,
     pub intents: Intents,
     pub shard: u64,
     pub shard_total: u64,
 }
 
-impl Default for GatewayConfig {
+impl Default for Gateway {
     fn default() -> Self {
         Self {
             intents: Intents::empty(),
index ec3337b47ebf4698ac4491b815b60e8d4d8e0124..e54bb5cda683ac857fd7b8d12ff225644de3a486 100644 (file)
@@ -1,5 +1,17 @@
+#![deny(
+    clippy::all,
+    clippy::correctness,
+    clippy::suspicious,
+    clippy::style,
+    clippy::complexity,
+    clippy::perf,
+    clippy::pedantic,
+    clippy::nursery,
+    unsafe_code
+)]
+#![allow(clippy::redundant_pub_crate)]
 use async_nats::{Client, HeaderMap, HeaderValue};
-use config::GatewayConfig;
+use config::Gateway;
 use leash::{AnyhowResultFuture, Component};
 use opentelemetry::{global, propagation::Injector};
 use shared::{
@@ -12,21 +24,21 @@ use tokio_stream::StreamExt;
 use tracing_opentelemetry::OpenTelemetrySpanExt;
 use twilight_gateway::{Event, Shard};
 pub mod config;
-use tracing::{debug, info, trace_span};
+use tracing::{debug, error, info, trace_span};
 use twilight_model::gateway::event::DispatchEvent;
 
 struct MetadataMap<'a>(&'a mut HeaderMap);
 
 impl<'a> Injector for MetadataMap<'a> {
     fn set(&mut self, key: &str, value: String) {
-        self.0.insert(key, HeaderValue::from_str(&value).unwrap())
+        self.0.insert(key, HeaderValue::from_str(&value).unwrap());
     }
 }
 
 pub struct GatewayServer {}
 
 impl Component for GatewayServer {
-    type Config = GatewayConfig;
+    type Config = Gateway;
     const SERVICE_NAME: &'static str = "gateway";
 
     fn start(
@@ -35,7 +47,7 @@ impl Component for GatewayServer {
         mut stop: oneshot::Receiver<()>,
     ) -> AnyhowResultFuture<()> {
         Box::pin(async move {
-            let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
+            let (shard, mut events) = Shard::builder(settings.token.clone(), settings.intents)
                 .shard(settings.shard, settings.shard_total)?
                 .build();
 
@@ -48,40 +60,10 @@ impl Component for GatewayServer {
             loop {
                 select! {
                     event = events.next() => {
-
                         if let Some(event) = event {
-                            match event {
-                                Event::Ready(ready) => {
-                                    info!("Logged in as {}", ready.user.name);
-                                },
-
-                                _ => {
-
-                                    let name = event.kind().name();
-                                    if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
-                                        debug!("handling event {}", name.unwrap());
-
-                                        let data = CachePayload {
-                                            data: DispatchEventTagged {
-                                                data: dispatch_event,
-                                            },
-                                        };
-                                        let value = serde_json::to_string(&data)?;
-                                        let bytes = bytes::Bytes::from(value);
-
-                                        let span = trace_span!("nats send");
-
-                                        let mut header_map = HeaderMap::new();
-                                        let context = span.context();
-                                        global::get_text_map_propagator(|propagator| {
-                                            propagator.inject_context(&context, &mut MetadataMap(&mut header_map))
-                                        });
-
-                                        nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes)
-                                            .await?;
-                                    }
-                                }
-                            }
+                           let _ = handle_event(event, &nats)
+                            .await
+                            .map_err(|err| error!(error = ?err, "event publish failed"));
                         } else {
                             break
                         }
@@ -101,3 +83,39 @@ impl Component for GatewayServer {
         Self {}
     }
 }
+
+async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
+    if let Event::Ready(ready) = event {
+        info!("Logged in as {}", ready.user.name);
+    } else {
+        let name = event.kind().name();
+        if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+            debug!("handling event {}", name.unwrap());
+
+            let data = CachePayload {
+                data: DispatchEventTagged {
+                    data: dispatch_event,
+                },
+            };
+            let value = serde_json::to_string(&data)?;
+            let bytes = bytes::Bytes::from(value);
+
+            let span = trace_span!("nats send");
+
+            let mut header_map = HeaderMap::new();
+            let context = span.context();
+            global::get_text_map_propagator(|propagator| {
+                propagator.inject_context(&context, &mut MetadataMap(&mut header_map));
+            });
+
+            nats.publish_with_headers(
+                format!("nova.cache.dispatch.{}", name.unwrap()),
+                header_map,
+                bytes,
+            )
+            .await?;
+        }
+    }
+
+    Ok(())
+}
index d82d8c95f07a76224a53c958f1a84df398fb51aa..3989735e146e2af72b2e640e165d59ee0fbdb40e 100644 (file)
@@ -26,4 +26,16 @@ tonic = "0.8.3"
 tokio-stream = "0.1.11"
 
 
-redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
\ No newline at end of file
+redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
+
+[dev-dependencies]
+criterion = { version = "0.4", features = ["async_tokio"] }
+tokio-test = "*"
+tracing-test = "0.2.3"
+tracing-subscriber = "*"
+env_logger = "*"
+test-log = { version = "0.2.11", features = ["log", "trace"] }
+
+[[bench]]
+name = "bucket"
+harness = false
diff --git a/exes/ratelimit/bench/req.rs b/exes/ratelimit/bench/req.rs
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/exes/ratelimit/benches/bucket.rs b/exes/ratelimit/benches/bucket.rs
new file mode 100644 (file)
index 0000000..d049d62
--- /dev/null
@@ -0,0 +1,56 @@
+use std::ops::Add;
+use std::time::{Duration, SystemTime};
+
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
+use ratelimit::buckets::bucket::Bucket;
+use tokio::runtime::Runtime;
+use twilight_http_ratelimiting::RatelimitHeaders;
+
+pub fn acquire_ticket(c: &mut Criterion) {
+    let rt = Runtime::new().unwrap();
+
+    let bucket = rt.block_on(async move {
+        let bucket = Bucket::new();
+
+        let mreset = SystemTime::now()
+            .add(Duration::from_secs(3600))
+            .duration_since(SystemTime::UNIX_EPOCH)
+            .unwrap()
+            .as_millis()
+            .to_string();
+        let headers = [
+            (
+                "x-ratelimit-bucket",
+                "d721dea6054f6322373d361f98e5c38b".as_bytes(),
+            ),
+            ("x-ratelimit-limit", "100".as_bytes()),
+            ("x-ratelimit-remaining", "1".as_bytes()),
+            ("x-ratelimit-reset", mreset.as_bytes()),
+            ("x-ratelimit-reset-after", "100000.000".as_bytes()),
+        ];
+        if let RatelimitHeaders::Present(present) =
+            RatelimitHeaders::from_pairs(headers.into_iter()).unwrap()
+        {
+            bucket.update(
+                &present,
+                SystemTime::now()
+                    .duration_since(SystemTime::UNIX_EPOCH)
+                    .unwrap()
+                    .as_millis() as u64,
+            );
+        }
+        bucket
+    });
+
+    let size: usize = 1024;
+    c.bench_with_input(BenchmarkId::new("input_example", size), &size, |b, _| {
+        // Insert a call to `to_async` to convert the bencher to async mode.
+        // The timing loops are the same as with the normal bencher.
+        b.to_async(&rt).iter(|| async {
+            bucket.ticket().await.unwrap();
+        });
+    });
+}
+
+criterion_group!(benches, acquire_ticket);
+criterion_main!(benches);
index 67dd0ee758a2a7177077c74d0f8d4f916a3a6a14..992adb927708db5f9db4e6a96010187bb8c40370 100644 (file)
@@ -3,33 +3,46 @@ use std::{
     time::{Duration, SystemTime, UNIX_EPOCH},
 };
 
+use tracing::debug;
+
 #[derive(Default, Debug)]
 pub struct AtomicInstant(AtomicU64);
 
 impl AtomicInstant {
+    #[must_use]
     pub const fn empty() -> Self {
         Self(AtomicU64::new(0))
     }
 
     pub fn elapsed(&self) -> Duration {
+        // Truncation is expected
+        #[allow(clippy::cast_possible_truncation)]
         Duration::from_millis(
             SystemTime::now()
                 .duration_since(UNIX_EPOCH)
-                .unwrap()
+                .expect("time went backwards")
                 .as_millis() as u64
-                - self.0.load(Ordering::SeqCst),
+                - self.0.load(Ordering::Relaxed),
         )
     }
 
     pub fn as_millis(&self) -> u64 {
-        self.0.load(Ordering::SeqCst)
+        self.0.load(Ordering::Relaxed)
     }
 
     pub fn set_millis(&self, millis: u64) {
-        self.0.store(millis, Ordering::SeqCst);
+        // get address of struct
+        let b = self as *const _ as usize;
+        debug!(millis, this = ?b, "settings instant millis");
+        self.0.store(millis, Ordering::Relaxed);
     }
 
     pub fn is_empty(&self) -> bool {
-        self.as_millis() == 0
+        let millis = self.as_millis();
+        // get address of struct
+        let b = self as *const _ as usize;
+        debug!(millis, this = ?b, "settings instant millis");
+        debug!(empty = (millis == 0), millis, this = ?b, "instant empty check");
+        millis == 0
     }
 }
index f8fa8b9c7f8834c7d6478d8e52f92d33a3f6d46f..c40f059739191b1269966d6a817ef3b688487ea6 100644 (file)
@@ -1,3 +1,4 @@
+use super::{async_queue::AsyncQueue, atomic_instant::AtomicInstant};
 use std::{
     sync::{
         atomic::{AtomicU64, Ordering},
@@ -9,8 +10,6 @@ use tokio::{sync::oneshot, task::JoinHandle};
 use tracing::{debug, trace};
 use twilight_http_ratelimiting::headers::Present;
 
-use super::{async_queue::AsyncQueue, atomic_instant::AtomicInstant, redis_lock::RedisLock};
-
 #[derive(Clone, Debug)]
 pub enum TimeRemaining {
     Finished,
@@ -18,22 +17,57 @@ pub enum TimeRemaining {
     Some(Duration),
 }
 
+/// A bucket is a simple atomic implementation of a bucket used for ratelimiting
+/// It can be updated dynamically depending on the discord api responses.
+///
+/// # Usage
+/// ```
+/// use ratelimit::buckets::bucket::Bucket;
+/// use twilight_http_ratelimiting::RatelimitHeaders;
+/// use std::time::SystemTime;
+///
+/// let bucket = Bucket::new();
+///
+/// // Feed the headers informations into the bucket to update it
+/// let headers = [
+///     ( "x-ratelimit-bucket", "bucket id".as_bytes()),
+///     ("x-ratelimit-limit", "100".as_bytes()),
+///     ("x-ratelimit-remaining", "0".as_bytes()),
+///     ("x-ratelimit-reset", "".as_bytes()),
+///     ("x-ratelimit-reset-after", "10.000".as_bytes()),
+/// ];
+///
+/// // Parse the headers
+/// let present = if let Ok(RatelimitHeaders::Present(present)) = RatelimitHeaders::from_pairs(headers.into_iter()) { present } else { todo!() };
+///
+/// // this should idealy the time of the request
+/// let current_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
+///
+/// bucket.update(present, current_time).await;
+///
+/// ```
+///
+/// # Async
+/// You need to call this struct new method in a tokio 1.x async runtime.
 #[derive(Debug)]
 pub struct Bucket {
+    /// Limits of tickets that can be accepted
     pub limit: AtomicU64,
-    /// Queue associated with this bucket.
-    pub queue: AsyncQueue,
-    /// Number of tickets remaining.
+    /// Remaining requests that can be executed
     pub remaining: AtomicU64,
-    /// Duration after the [`Self::last_update`] time the bucket will refresh.
+    /// Time to wait after [`Self::last_update`] before accepting new tickets.
     pub reset_after: AtomicU64,
-    /// When the bucket's ratelimit refresh countdown started (unix millis)
+    /// Last update got from the discord upstream
     pub last_update: AtomicInstant,
 
-    pub tasks: Vec<JoinHandle<()>>,
+    /// List of tasks that dequeue tasks from [`Self::queue`]
+    tasks: Vec<JoinHandle<()>>,
+    /// Queue of tickets to be processed.
+    queue: AsyncQueue,
 }
 
 impl Drop for Bucket {
+    /// Simply abord the dequeue tasks to aboid leaking memory via arc(s)
     fn drop(&mut self) {
         for join in &self.tasks {
             join.abort();
@@ -42,8 +76,11 @@ impl Drop for Bucket {
 }
 
 impl Bucket {
-    /// Create a new bucket for the specified [`Path`].
-    pub fn new(global: Arc<RedisLock>) -> Arc<Self> {
+    /// Creates a new bucket with four dequeue tasks
+    /// # Async
+    /// This functions **should** be called in a tokio 1.x runtime, otherwise the function *will* panic.
+    #[must_use]
+    pub fn new() -> Arc<Self> {
         let tasks = vec![];
 
         let this = Arc::new(Self {
@@ -58,19 +95,16 @@ impl Bucket {
         // Run with 4 dequeue tasks
         for _ in 0..4 {
             let this = this.clone();
-            let global = global.clone();
             tokio::spawn(async move {
+                // continuously wait for elements in the queue to process them sequantially.
+                // this is using parallel tasks to allow (hopefully) better performance.
                 while let Some(element) = this.queue.pop().await {
-                    // we need to wait
-                    if let Some(duration) = global.locked_for().await {
-                        tokio::time::sleep(duration).await;
-                    }
-
                     if this.remaining() == 0 {
                         debug!("0 tickets remaining, we have to wait.");
 
                         match this.time_remaining() {
                             TimeRemaining::Finished => {
+                                debug!("waiting seems finished.");
                                 this.try_reset();
                             }
                             TimeRemaining::Some(duration) => {
@@ -79,7 +113,9 @@ impl Bucket {
 
                                 this.try_reset();
                             }
-                            TimeRemaining::NotStarted => {}
+                            TimeRemaining::NotStarted => {
+                                debug!("we should not wait");
+                            }
                         }
                     }
 
@@ -94,19 +130,17 @@ impl Bucket {
         this
     }
 
-    /// Total number of tickets allotted in a cycle.
+    /// Total number of tickets allowed in a cycle.
     pub fn limit(&self) -> u64 {
         self.limit.load(Ordering::Relaxed)
     }
 
-    /// Number of tickets remaining.
+    /// Number of tickets remaining in the current cycle.
     pub fn remaining(&self) -> u64 {
         self.remaining.load(Ordering::Relaxed)
     }
 
-    /// Duration after the [`started_at`] time the bucket will refresh.
-    ///
-    /// [`started_at`]: Self::started_at
+    /// Duration after the [`Self::last_update`] time the bucket will refresh.
     pub fn reset_after(&self) -> u64 {
         self.reset_after.load(Ordering::Relaxed)
     }
@@ -117,6 +151,10 @@ impl Bucket {
         let last_update = &self.last_update;
 
         if last_update.is_empty() {
+            debug!("last update is empty");
+
+            TimeRemaining::NotStarted
+        } else {
             let elapsed = last_update.elapsed();
 
             if elapsed > Duration::from_millis(reset_after) {
@@ -124,16 +162,12 @@ impl Bucket {
             }
 
             TimeRemaining::Some(Duration::from_millis(reset_after) - elapsed)
-        } else {
-            TimeRemaining::NotStarted
         }
     }
 
-    /// Try to reset this bucket's [`started_at`] value if it has finished.
+    /// Try to reset this bucket's [`Self::last_update`] value if it has finished.
     ///
     /// Returns whether resetting was possible.
-    ///
-    /// [`started_at`]: Self::started_at
     pub fn try_reset(&self) -> bool {
         if self.last_update.is_empty() {
             return false;
@@ -150,10 +184,12 @@ impl Bucket {
     }
 
     /// Update this bucket's ratelimit data after a request has been made.
-    pub fn update(&self, ratelimits: Present, time: u64) {
+    /// The time of the request should be given.
+    pub fn update(&self, ratelimits: &Present, time: u64) {
         let bucket_limit = self.limit();
 
         if self.last_update.is_empty() {
+            debug!(millis = time, "updated the last update time");
             self.last_update.set_millis(time);
         }
 
@@ -167,9 +203,122 @@ impl Bucket {
             .store(ratelimits.remaining(), Ordering::Relaxed);
     }
 
-    pub async fn ticket(&self) -> oneshot::Receiver<()> {
+    /// Submits a ticket to the queue
+    /// A oneshot receiver is returned and will be called when the ticket is accepted.
+    pub fn ticket(&self) -> oneshot::Receiver<()> {
         let (tx, rx) = oneshot::channel();
         self.queue.push(tx);
         rx
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use std::{
+        ops::Add,
+        time::{Duration, Instant, SystemTime},
+    };
+
+    use tokio::time::timeout;
+    use tracing::info;
+    use twilight_http_ratelimiting::RatelimitHeaders;
+
+    use super::Bucket;
+
+    #[test_log::test(tokio::test)]
+    async fn should_ratelimit() {
+        let bucket = Bucket::new();
+
+        // Intialize a bucket with one remaining ticket
+        // and that resets in oue hour
+        let mreset = SystemTime::now()
+            .add(Duration::from_secs(100))
+            .duration_since(SystemTime::UNIX_EPOCH)
+            .unwrap()
+            .as_millis()
+            .to_string();
+        let headers: [(&str, &[u8]); 5] = [
+            ("x-ratelimit-bucket", b"123"),
+            ("x-ratelimit-limit", b"100"),
+            ("x-ratelimit-remaining", b"1"),
+            ("x-ratelimit-reset", mreset.as_bytes()),
+            ("x-ratelimit-reset-after", b"100.000"),
+        ];
+        if let RatelimitHeaders::Present(present) =
+            RatelimitHeaders::from_pairs(headers.into_iter()).unwrap()
+        {
+            // Integer truncating is expected
+            #[allow(clippy::cast_possible_truncation)]
+            bucket.update(
+                &present,
+                SystemTime::now()
+                    .duration_since(SystemTime::UNIX_EPOCH)
+                    .unwrap()
+                    .as_millis() as u64,
+            );
+        }
+
+        let ticket = bucket.ticket();
+
+        info!("first request");
+        // We should accept one ticket
+        let respo = timeout(Duration::from_secs(10), ticket).await;
+        assert!(respo.is_ok());
+
+        info!("second request");
+
+        let ticket = bucket.ticket();
+        // We should accept one ticket
+        let respo = timeout(Duration::from_secs(1), ticket).await;
+
+        // the ticket should not have responded because the queue is locked
+        assert!(respo.is_err());
+    }
+
+    #[test_log::test(tokio::test)]
+    async fn should_block_until_possible() {
+        let bucket = Bucket::new();
+
+        // Intialize a bucket with one remaining ticket
+        // and that resets in oue hour
+        let mreset = SystemTime::now()
+            .add(Duration::from_secs(100))
+            .duration_since(SystemTime::UNIX_EPOCH)
+            .unwrap()
+            .as_millis()
+            .to_string();
+        let headers: [(&str, &[u8]); 5] = [
+            ("x-ratelimit-bucket", b"123"),
+            ("x-ratelimit-limit", b"100"),
+            ("x-ratelimit-remaining", b"0"),
+            ("x-ratelimit-reset", mreset.as_bytes()),
+            ("x-ratelimit-reset-after", b"100.000"),
+        ];
+
+        if let RatelimitHeaders::Present(present) =
+            RatelimitHeaders::from_pairs(headers.into_iter()).unwrap()
+        {
+            // Integer truncating is expected
+            #[allow(clippy::cast_possible_truncation)]
+            bucket.update(
+                &present,
+                SystemTime::now()
+                    .duration_since(SystemTime::UNIX_EPOCH)
+                    .unwrap()
+                    .as_millis() as u64,
+            );
+        }
+
+        let ticket = bucket.ticket();
+        let start = Instant::now();
+
+        // in this case, the ratelimiter should wait 10 seconds
+        let respo = timeout(Duration::from_secs(12), ticket).await;
+        let end = start.elapsed().as_secs();
+
+        // we should have waited 10 seconds (+- 1s)
+        assert_eq!(10, end);
+        // and the ticket should be a success
+        assert!(respo.is_ok());
+    }
+}
index c86d623f4fd6c4870998adaf3e9e9968863fcb2a..52637659ec066283afd80cc6b888e295c5b05009 100644 (file)
@@ -1,4 +1,17 @@
+use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
+
 pub mod async_queue;
 pub mod atomic_instant;
 pub mod bucket;
+pub mod noop_lock;
 pub mod redis_lock;
+
+pub trait GlobalLock: Send + Sync {
+    fn lock_for<'a>(
+        self: &'a Arc<Self>,
+        duration: Duration,
+    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
+    fn is_locked<'a>(
+        self: &'a Arc<Self>,
+    ) -> Pin<Box<dyn Future<Output = Option<Duration>> + Send + 'a>>;
+}
diff --git a/exes/ratelimit/src/buckets/noop_lock.rs b/exes/ratelimit/src/buckets/noop_lock.rs
new file mode 100644 (file)
index 0000000..614eba3
--- /dev/null
@@ -0,0 +1,18 @@
+use super::GlobalLock;
+
+pub struct NoOpLock;
+impl GlobalLock for NoOpLock {
+    fn lock_for<'a>(
+        self: &'a std::sync::Arc<Self>,
+        _duration: std::time::Duration,
+    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>> {
+        Box::pin(async move {})
+    }
+
+    fn is_locked<'a>(
+        self: &'a std::sync::Arc<Self>,
+    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<std::time::Duration>> + Send + 'a>>
+    {
+        Box::pin(async move { None })
+    }
+}
index fdae149881e77a09d1538a2a9d4d4fb45ab8f5b7..333d726684fac632c8e63219b652d62752bd490e 100644 (file)
@@ -1,4 +1,6 @@
 use std::{
+    future::Future,
+    pin::Pin,
     sync::{atomic::AtomicU64, Arc},
     time::{Duration, SystemTime},
 };
@@ -7,7 +9,9 @@ use redis::{aio::MultiplexedConnection, AsyncCommands};
 use tokio::sync::Mutex;
 use tracing::debug;
 
-/// This is flawed and needs to be replaced sometime with the real RedisLock algorithm
+use super::GlobalLock;
+
+/// This is flawed and needs to be replaced sometime with the real `RedisLock` algorithm
 #[derive(Debug)]
 pub struct RedisLock {
     redis: Mutex<MultiplexedConnection>,
@@ -15,70 +19,89 @@ pub struct RedisLock {
 }
 
 impl RedisLock {
-    /// Set the global ratelimit as exhausted.
-    pub async fn lock_for(self: &Arc<Self>, duration: Duration) {
-        debug!("locking globally for {}", duration.as_secs());
-        let _: () = self
-            .redis
-            .lock()
-            .await
-            .set_ex(
-                "nova:rls:lock",
-                1,
-                (duration.as_secs() + 1).try_into().unwrap(),
-            )
-            .await
-            .unwrap();
-
-        self.is_locked.store(
-            (SystemTime::now() + duration)
-                .duration_since(SystemTime::UNIX_EPOCH)
-                .unwrap()
-                .as_millis() as u64,
-            std::sync::atomic::Ordering::Relaxed,
-        );
+    #[must_use]
+    pub fn new(redis: MultiplexedConnection) -> Arc<Self> {
+        Arc::new(Self {
+            redis: Mutex::new(redis),
+            is_locked: AtomicU64::new(0),
+        })
     }
+}
+
+impl GlobalLock for RedisLock {
+    fn lock_for<'a>(
+        self: &'a Arc<Self>,
+        duration: Duration,
+    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
+        Box::pin(async move {
+            debug!("locking globally for {}", duration.as_secs());
+            let _: () = self
+                .redis
+                .lock()
+                .await
+                .set_ex(
+                    "nova:rls:lock",
+                    1,
+                    (duration.as_secs() + 1).try_into().unwrap(),
+                )
+                .await
+                .unwrap();
 
-    pub async fn locked_for(self: &Arc<Self>) -> Option<Duration> {
-        let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed);
-        if load != 0 {
-            if load
-                > SystemTime::now()
+            // Integer truncating is expected
+            #[allow(clippy::cast_possible_truncation)]
+            self.is_locked.store(
+                (SystemTime::now() + duration)
                     .duration_since(SystemTime::UNIX_EPOCH)
                     .unwrap()
-                    .as_millis() as u64
-            {
-                return Some(Duration::from_millis(load));
-            } else {
+                    .as_millis() as u64,
+                std::sync::atomic::Ordering::Relaxed,
+            );
+        })
+    }
+
+    fn is_locked<'a>(
+        self: &'a Arc<Self>,
+    ) -> Pin<Box<dyn Future<Output = Option<Duration>> + Send + 'a>> {
+        Box::pin(async move {
+            let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed);
+            if load != 0 {
+                // Integer truncating is expected
+                #[allow(clippy::cast_possible_truncation)]
+                if load
+                    > SystemTime::now()
+                        .duration_since(SystemTime::UNIX_EPOCH)
+                        .unwrap()
+                        .as_millis() as u64
+                {
+                    return Some(Duration::from_millis(load));
+                }
                 self.is_locked
                     .store(0, std::sync::atomic::Ordering::Relaxed);
             }
-        }
 
-        let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await;
-        match result {
-            Ok(remaining_time) => {
-                if remaining_time > 0 {
-                    let duration = Duration::from_secs(remaining_time as u64);
-                    debug!("external global lock detected, locking");
-                    self.lock_for(duration).await;
-                    Some(duration)
-                } else {
-                    None
+            let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await;
+            match result {
+                Ok(remaining_time) => {
+                    if remaining_time > 0 {
+                        // Sign loss is allowed since we know it's a positive number
+                        // because a ttl is always positive when the key exists and have a ttl
+                        // otherwise redis *will* return a negative number, hence the check for
+                        // a positive sign.
+                        #[allow(clippy::cast_sign_loss)]
+                        let duration = Duration::from_secs(remaining_time as u64);
+                        debug!("external global lock detected, locking");
+                        self.lock_for(duration).await;
+                        Some(duration)
+                    } else {
+                        None
+                    }
                 }
-            }
-            Err(error) => {
-                debug!("redis call failed: {}", error);
+                Err(error) => {
+                    debug!("redis call failed: {}", error);
 
-                None
+                    None
+                }
             }
-        }
-    }
-
-    pub fn new(redis: MultiplexedConnection) -> Arc<Self> {
-        Arc::new(Self {
-            redis: Mutex::new(redis),
-            is_locked: AtomicU64::new(0),
         })
     }
 }
index df18b762d779f30c90a24e9c5a11905ff02679c3..4114c7b450e75b4b130aef1eea4b698ad35e3320 100644 (file)
@@ -18,6 +18,6 @@ impl Default for ServerSettings {
 }
 
 #[derive(Debug, Deserialize, Clone, Default)]
-pub struct RatelimitServerConfig {
+pub struct Ratelimit {
     pub server: ServerSettings,
 }
index 09885c1a2f267dc0ead9a61803f16d9431962060..9e5d31c71992bb3982df9f19e839e6ed09f5f97a 100644 (file)
@@ -16,6 +16,7 @@ use twilight_http_ratelimiting::RatelimitHeaders;
 
 use crate::buckets::bucket::Bucket;
 use crate::buckets::redis_lock::RedisLock;
+use crate::buckets::GlobalLock;
 
 pub struct RLServer {
     global: Arc<RedisLock>,
@@ -34,12 +35,12 @@ impl RLServer {
 struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);
 
 impl<'a> Extractor for MetadataMap<'a> {
-    /// Get a value for a key from the MetadataMap.  If the value can't be converted to &str, returns None
+    /// Get a value for a key from the `MetadataMap`.  If the value can't be converted to &str, returns None
     fn get(&self, key: &str) -> Option<&str> {
         self.0.get(key).and_then(|metadata| metadata.to_str().ok())
     }
 
-    /// Collect all the keys from the MetadataMap.
+    /// Collect all the keys from the `MetadataMap`.
     fn keys(&self) -> Vec<&str> {
         self.0
             .keys()
@@ -70,6 +71,10 @@ impl Ratelimiter for RLServer {
         )
         .unwrap();
 
+        if let Some(duration) = self.global.is_locked().await {
+            tokio::time::sleep(duration).await;
+        }
+
         let bucket: Arc<Bucket> = if self.buckets.read().await.contains_key(&data.path) {
             self.buckets
                 .read()
@@ -78,7 +83,7 @@ impl Ratelimiter for RLServer {
                 .expect("impossible")
                 .clone()
         } else {
-            let bucket = Bucket::new(self.global.clone());
+            let bucket = Bucket::new();
             self.buckets.write().await.insert(data.path, bucket.clone());
             bucket
         };
@@ -93,13 +98,14 @@ impl Ratelimiter for RLServer {
                     global.retry_after()
                 );
                 self.global
+                    .clone()
                     .lock_for(Duration::from_secs(global.retry_after()))
                     .await;
             }
             RatelimitHeaders::None => {}
             RatelimitHeaders::Present(present) => {
                 // we should update the bucket.
-                bucket.update(present, data.precise_time);
+                bucket.update(&present, data.precise_time);
             }
             _ => unreachable!(),
         };
@@ -127,13 +133,13 @@ impl Ratelimiter for RLServer {
                 .expect("impossible")
                 .clone()
         } else {
-            let bucket = Bucket::new(self.global.clone());
+            let bucket = Bucket::new();
             self.buckets.write().await.insert(data.path, bucket.clone());
             bucket
         };
 
         // wait for the ticket to be accepted
-        bucket.ticket().await;
+        let _ = bucket.ticket().await;
 
         Ok(Response::new(()))
     }
index 6653157d9ac15f2951f798cf326a22b1bd73d568..d1bd6e03a27f0b7fe222ba9848e07341fdf3f288 100644 (file)
@@ -1,5 +1,17 @@
+#![deny(
+    clippy::all,
+    clippy::correctness,
+    clippy::suspicious,
+    clippy::style,
+    clippy::complexity,
+    clippy::perf,
+    clippy::pedantic,
+    clippy::nursery,
+    unsafe_code
+)]
+
 use buckets::redis_lock::RedisLock;
-use config::RatelimitServerConfig;
+use config::Ratelimit;
 use grpc::RLServer;
 use leash::{AnyhowResultFuture, Component};
 use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer;
@@ -10,13 +22,13 @@ use std::pin::Pin;
 use tokio::sync::oneshot;
 use tonic::transport::Server;
 
-mod buckets;
+pub mod buckets;
 mod config;
 mod grpc;
 
 pub struct RatelimiterServerComponent {}
 impl Component for RatelimiterServerComponent {
-    type Config = RatelimitServerConfig;
+    type Config = Ratelimit;
     const SERVICE_NAME: &'static str = "ratelimiter";
 
     fn start(
index e87757c359ad1e0fd041752649c71f2a9b69d582..9593ac8f66f67b2bbc852f2f01b64254d3c23689 100644 (file)
@@ -23,7 +23,7 @@ pub struct Discord {
 }
 
 #[derive(Debug, Deserialize, Clone, Default)]
-pub struct ReverseProxyConfig {
+pub struct ReverseProxy {
     pub server: ServerSettings,
     pub discord: Discord,
     pub ratelimiter_address: String,
index a8f43cdf6fe6a8e22d5250ba934d589893ed5485..ab158fe820e7ee54f22b3245dee0dd070c51d4c3 100644 (file)
@@ -58,7 +58,7 @@ pub async fn handle_request(
         let request_path = request.uri().path();
         let (api_path, trimmed_path) = normalize_path(request_path);
 
-        let mut uri_string = format!("http://127.0.0.1:9999{}{}", api_path, trimmed_path);
+        let mut uri_string = format!("http://127.0.0.1:9999{api_path}{trimmed_path}");
         if let Some(query) = request.uri().query() {
             uri_string.push('?');
             uri_string.push_str(query);
@@ -103,12 +103,12 @@ pub async fn handle_request(
             .expect("Failed to check header")
             .starts_with("Bot")
         {
-            *auth = HeaderValue::from_str(&format!("Bot {}", token))?;
+            *auth = HeaderValue::from_str(&format!("Bot {token}"))?;
         }
     } else {
         request.headers_mut().insert(
             AUTHORIZATION,
-            HeaderValue::from_str(&format!("Bot {}", token))?,
+            HeaderValue::from_str(&format!("Bot {token}"))?,
         );
     }
 
@@ -132,12 +132,12 @@ pub async fn handle_request(
     let headers = resp
         .headers()
         .into_iter()
-        .map(|(k, v)| (k.to_string(), v.to_str().map(|f| f.to_string())))
+        .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string)))
         .filter(|f| f.1.is_ok())
         .map(|f| (f.0, f.1.expect("errors should be filtered")))
         .collect();
 
-    let _ = ratelimiter
+    let _submit_headers = ratelimiter
         .submit_headers(hash, headers)
         .instrument(info_span!("submitting headers"))
         .await;
index 53ab12ade0be5c90ffa7160b701c17c30bb0dc50..174bea0c4b13a0bba3d29b97b7debfd753735d1c 100644 (file)
@@ -1,4 +1,16 @@
-use config::ReverseProxyConfig;
+#![deny(
+    clippy::all,
+    clippy::correctness,
+    clippy::suspicious,
+    clippy::style,
+    clippy::complexity,
+    clippy::perf,
+    clippy::pedantic,
+    clippy::nursery,
+    unsafe_code
+)]
+
+use config::ReverseProxy;
 
 use handler::handle_request;
 use hyper::{
@@ -20,7 +32,7 @@ mod ratelimit_client;
 
 pub struct ReverseProxyServer {}
 impl Component for ReverseProxyServer {
-    type Config = ReverseProxyConfig;
+    type Config = ReverseProxy;
     const SERVICE_NAME: &'static str = "rest";
 
     fn start(
index c9bd52eb824b19d14d96999ce752da2a63a9bbb3..e2f7e0e2ce7d4423a4de3a12b6eaf5ba88ce64a3 100644 (file)
@@ -1,4 +1,4 @@
-use crate::config::ReverseProxyConfig;
+use crate::config::ReverseProxy;
 
 use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode};
 use anyhow::anyhow;
@@ -23,7 +23,7 @@ pub struct RemoteRatelimiter {
     current_remotes: Vec<String>,
 
     stop: Arc<tokio::sync::broadcast::Sender<()>>,
-    config: ReverseProxyConfig,
+    config: ReverseProxy,
 }
 
 impl Drop for RemoteRatelimiter {
@@ -41,15 +41,15 @@ impl RemoteRatelimiter {
         // get list of dns responses
         let responses: Vec<String> = dns_lookup::lookup_host(&self.config.ratelimiter_address)?
             .into_iter()
-            .filter(|address| address.is_ipv4())
+            .filter(std::net::IpAddr::is_ipv4)
             .map(|address| address.to_string())
             .collect();
 
         let mut write = self.remotes.write().await;
 
         for ip in &responses {
-            if !self.current_remotes.contains(&ip) {
-                let a = VNode::new(ip.to_owned(), self.config.ratelimiter_port).await?;
+            if !self.current_remotes.contains(ip) {
+                let a = VNode::new(ip.clone(), self.config.ratelimiter_port).await?;
                 write.add(a.clone());
             }
         }
@@ -58,13 +58,13 @@ impl RemoteRatelimiter {
     }
 
     #[must_use]
-    pub fn new(config: ReverseProxyConfig) -> Self {
+    pub fn new(config: ReverseProxy) -> Self {
         let (rx, mut tx) = broadcast::channel(1);
         let obj = Self {
             remotes: Arc::new(RwLock::new(HashRingWrapper::default())),
             stop: Arc::new(rx),
             config,
-            current_remotes: vec![]
+            current_remotes: vec![],
         };
 
         let obj_clone = obj.clone();
@@ -75,7 +75,7 @@ impl RemoteRatelimiter {
 
                 match obj_clone.get_ratelimiters().await {
                     Ok(_) => {
-                        debug!("refreshed ratelimiting servers")
+                        debug!("refreshed ratelimiting servers");
                     }
                     Err(err) => {
                         error!("refreshing ratelimiting servers failed {}", err);
@@ -122,7 +122,7 @@ impl RemoteRatelimiter {
                 let context = span.context();
                 let mut request = Request::new(BucketSubmitTicketRequest { path });
                 global::get_text_map_propagator(|propagator| {
-                    propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()))
+                    propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()));
                 });
 
                 // Requesting
@@ -158,13 +158,15 @@ impl RemoteRatelimiter {
             let time = SystemTime::now()
                 .duration_since(SystemTime::UNIX_EPOCH)?
                 .as_millis();
+            // truncation is expected
+            #[allow(clippy::cast_possible_truncation)]
             let mut request = Request::new(HeadersSubmitRequest {
                 path,
                 precise_time: time as u64,
                 headers,
             });
             global::get_text_map_propagator(|propagator| {
-                propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()))
+                propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()));
             });
 
             node.submit_headers(request).await?;
index ac025c802897d66187d964127cbc532a516e1f8e..b99276d25166f424ee98aa235e0bc6dd429e24c8 100644 (file)
@@ -38,7 +38,7 @@ impl Hash for VNode {
 pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap);
 
 impl<'a> Injector for MetadataMap<'a> {
-    /// Set a key and value in the MetadataMap.  Does nothing if the key or value are not valid inputs
+    /// Set a key and value in the `MetadataMap`.  Does nothing if the key or value are not valid inputs
     fn set(&mut self, key: &str, value: String) {
         if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
             if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) {
@@ -50,11 +50,11 @@ impl<'a> Injector for MetadataMap<'a> {
 
 impl VNode {
     pub async fn new(address: String, port: u16) -> Result<Self, tonic::transport::Error> {
-        let host = format!("http://{}:{}", address.clone(), port);
+        let host = format!("http://{}:{port}", address.clone());
         debug!("connecting to {}", host);
         let client = RatelimiterClient::connect(host).await?;
 
-        Ok(VNode { client, address })
+        Ok(Self { address, client })
     }
 }
 
index b96f368b904e388168c16549f5057188959452fb..ccc7894bd527a8782017b9a75bdab7c5373cd257 100644 (file)
@@ -35,7 +35,7 @@ pub struct Discord {
 }
 
 #[derive(Debug, Deserialize, Clone, Default, Copy)]
-pub struct WebhookConfig {
+pub struct Webhook {
     pub server: ServerSettings,
     pub discord: Discord,
 }
diff --git a/exes/webhook/src/handler/error.rs b/exes/webhook/src/handler/error.rs
deleted file mode 100644 (file)
index ffa4cca..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-use hyper::{header::ToStrError, Body, Response, StatusCode};
-
-pub struct WebhookError {
-    pub code: StatusCode,
-    pub message: String,
-}
-
-impl WebhookError {
-    pub fn new(code: StatusCode, message: &str) -> WebhookError {
-        WebhookError {
-            code,
-            message: message.to_string(),
-        }
-    }
-}
-
-impl From<WebhookError> for Response<Body> {
-    fn from(value: WebhookError) -> Self {
-        Response::builder()
-            .status(value.code)
-            .body(value.message.into())
-            .unwrap()
-    }
-}
-
-impl From<hyper::Error> for WebhookError {
-    fn from(_: hyper::Error) -> Self {
-        WebhookError::new(StatusCode::BAD_REQUEST, "invalid request")
-    }
-}
-
-impl From<ToStrError> for WebhookError {
-    fn from(_: ToStrError) -> Self {
-        WebhookError::new(StatusCode::BAD_REQUEST, "invalid request")
-    }
-}
index b51494a88280565ef52d6f37e5b70483debffc22..4202cd7c20a5f507eda862007cf79ac866b2900c 100644 (file)
@@ -23,7 +23,7 @@ impl<T, V: Clone> Service<T> for MakeSvc<V> {
 }
 
 impl<T: Clone> MakeSvc<T> {
-    pub fn new(service: T) -> Self {
+    pub const fn new(service: T) -> Self {
         Self { service }
     }
 }
index 594919b1c0ad867b7dda9914293d056e614e53e6..ea7eccacba358116b98800a8999fa2558a169dc0 100644 (file)
@@ -1,18 +1,17 @@
-use crate::config::WebhookConfig;
+use crate::config::Webhook;
+use anyhow::bail;
 use async_nats::Client;
 use ed25519_dalek::PublicKey;
-use error::WebhookError;
 use hyper::{
     body::{to_bytes, Bytes},
     service::Service,
-    Body, Method, Request, Response, StatusCode,
+    Body, Method, Request, Response,
 };
 use shared::payloads::{CachePayload, DispatchEventTagged};
-use signature::validate_signature;
+use signature::validate;
 use std::{
     future::Future,
     pin::Pin,
-    str::from_utf8,
     task::{Context, Poll},
 };
 use tracing::{debug, error};
@@ -22,7 +21,6 @@ use twilight_model::{
     gateway::payload::incoming::InteractionCreate,
 };
 
-mod error;
 pub mod make_service;
 mod signature;
 
@@ -32,46 +30,37 @@ pub mod tests;
 /// Hyper service used to handle the discord webhooks
 #[derive(Clone)]
 pub struct WebhookService {
-    pub config: WebhookConfig,
+    pub config: Webhook,
     pub nats: Client,
 }
 
 impl WebhookService {
-    async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {
+    async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, anyhow::Error> {
         if req.method() == Method::POST {
             let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {
-                sig.to_owned()
+                sig.clone()
             } else {
-                return Err(WebhookError::new(
-                    StatusCode::BAD_REQUEST,
-                    "missing signature header",
-                ));
+                bail!("Missing signature header");
             };
 
             let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") {
-                timestamp.to_owned()
+                timestamp.clone()
             } else {
-                return Err(WebhookError::new(
-                    StatusCode::BAD_REQUEST,
-                    "missing timestamp header",
-                ));
+                bail!("Missing timestamp header");
             };
             let data = to_bytes(req.into_body()).await?;
 
-            if validate_signature(
+            if validate(
                 &pk,
                 &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),
                 signature.to_str()?,
             ) {
                 Ok(data)
             } else {
-                Err(WebhookError::new(
-                    StatusCode::UNAUTHORIZED,
-                    "invalid signature",
-                ))
+                bail!("invalid signature");
             }
         } else {
-            Err(WebhookError::new(StatusCode::NOT_FOUND, "not found"))
+            bail!("not found");
         }
     }
 
@@ -79,69 +68,46 @@ impl WebhookService {
         req: Request<Body>,
         nats: Client,
         pk: PublicKey,
-    ) -> Result<Response<Body>, WebhookError> {
-        match Self::check_request(req, pk).await {
-            Ok(data) => {
-                let utf8 = from_utf8(&data);
-                match utf8 {
-                    Ok(data) => match serde_json::from_str::<Interaction>(data) {
-                        Ok(value) => {
-                            match value.kind {
-                                InteractionType::Ping => Ok(Response::builder()
-                                    .header("Content-Type", "application/json")
-                                    .body(r#"{"type":1}"#.into())
-                                    .unwrap()),
-                                _ => {
-                                    debug!("calling nats");
-                                    // this should hopefully not fail ?
-
-                                    let data = CachePayload {
-                                        data: DispatchEventTagged {
-                                            data: DispatchEvent::InteractionCreate(Box::new(
-                                                InteractionCreate(value),
-                                            )),
-                                        },
-                                    };
-
-                                    let payload = serde_json::to_string(&data).unwrap();
-
-                                    match nats
-                                        .request(
-                                            "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
-                                            Bytes::from(payload),
-                                        )
-                                        .await
-                                    {
-                                        Ok(response) => Ok(Response::builder()
-                                            .header("Content-Type", "application/json")
-                                            .body(Body::from(response.payload))
-                                            .unwrap()),
-
-                                        Err(error) => {
-                                            error!("failed to request nats: {}", error);
-                                            Err(WebhookError::new(
-                                                StatusCode::INTERNAL_SERVER_ERROR,
-                                                "failed to request nats",
-                                            ))
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        Err(error) => {
-                            error!("invalid json body: {}", error);
-                            Err(WebhookError::new(
-                                StatusCode::BAD_REQUEST,
-                                "invalid json body",
-                            ))
-                        }
-                    },
-
-                    Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")),
+    ) -> Result<Response<Body>, anyhow::Error> {
+        let data = Self::check_request(req, pk).await?;
+        let interaction: Interaction = serde_json::from_slice(&data)?;
+
+        if interaction.kind == InteractionType::Ping {
+            Ok(Response::builder()
+                .header("Content-Type", "application/json")
+                .body(r#"{"type":1}"#.into())
+                .unwrap())
+        } else {
+            debug!("calling nats");
+            // this should hopefully not fail ?
+
+            let data = CachePayload {
+                data: DispatchEventTagged {
+                    data: DispatchEvent::InteractionCreate(Box::new(InteractionCreate(
+                        interaction,
+                    ))),
+                },
+            };
+
+            let payload = serde_json::to_string(&data).unwrap();
+
+            match nats
+                .request(
+                    "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
+                    Bytes::from(payload),
+                )
+                .await
+            {
+                Ok(response) => Ok(Response::builder()
+                    .header("Content-Type", "application/json")
+                    .body(Body::from(response.payload))
+                    .unwrap()),
+
+                Err(error) => {
+                    error!("failed to request nats: {}", error);
+                    Err(anyhow::anyhow!("internal error"))
                 }
             }
-            Err(error) => Err(error),
         }
     }
 }
@@ -149,7 +115,7 @@ impl WebhookService {
 /// Implementation of the service
 impl Service<hyper::Request<Body>> for WebhookService {
     type Response = hyper::Response<Body>;
-    type Error = hyper::Error;
+    type Error = anyhow::Error;
     type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
 
     fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
@@ -163,7 +129,7 @@ impl Service<hyper::Request<Body>> for WebhookService {
 
             match response {
                 Ok(r) => Ok(r),
-                Err(e) => Ok(e.into()),
+                Err(e) => Err(e),
             }
         })
     }
index 05221d3de71b01bce7e966cb1f16790bac0a4274..5a486455bc8b185aa532d3c11c77c0a32db780e9 100644 (file)
@@ -1,14 +1,13 @@
 use ed25519_dalek::{PublicKey, Signature, Verifier};
 
 #[inline]
-pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool {
+pub fn validate(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool {
     let mut slice: [u8; Signature::BYTE_SIZE] = [0; Signature::BYTE_SIZE];
     let signature_result = hex::decode_to_slice(hex_signature, &mut slice);
 
-    let mut result = false;
     if signature_result.is_ok() {
-        result = public_key.verify(data, &Signature::from(slice)).is_ok();
+        public_key.verify(data, &Signature::from(slice)).is_ok()
+    } else {
+        false
     }
-
-    result
 }
index 0bed86a7544baae04d3eca4c6383beda6965d6e5..4ff52ff33d6a7aacd7eb179abc767264ef742f94 100644 (file)
@@ -1,16 +1,16 @@
-use crate::handler::signature::validate_signature;
+use crate::handler::signature::validate;
 use ed25519_dalek::PublicKey;
 
 #[test]
 fn validate_signature_test() {
     let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002";
-    let content = "message de test incroyable".as_bytes().to_vec();
+    let content = b"message de test incroyable";
     let public_key = PublicKey::from_bytes(
         &hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap(),
     )
     .unwrap();
 
-    assert!(validate_signature(&public_key, &content, signature))
+    assert!(validate(&public_key, content, signature));
 }
 
 #[test]
@@ -21,10 +21,8 @@ fn validate_signature_reverse_test() {
     )
     .unwrap();
 
-    let content = "ceci est un test qui ne fonctionnera pas!"
-        .as_bytes()
-        .to_vec();
-    assert!(!validate_signature(&public_key, &content, signature))
+    let content = b"ceci est un test qui ne fonctionnera pas!";
+    assert!(!validate(&public_key, content, signature));
 }
 
 #[test]
@@ -35,8 +33,6 @@ fn invalid_hex() {
     )
     .unwrap();
 
-    let content = "ceci est un test qui ne fonctionnera pas!"
-        .as_bytes()
-        .to_vec();
-    assert!(!validate_signature(&public_key, &content, signature))
+    let content = b"ceci est un test qui ne fonctionnera pas!";
+    assert!(!validate(&public_key, content, signature));
 }
index 057e70f72b7cc2f4d519b68d6b8255dabf816241..933f38ea9a5e7d5d12271a5c735f232eaf29c843 100644 (file)
@@ -1,9 +1,21 @@
+#![deny(
+    clippy::all,
+    clippy::correctness,
+    clippy::suspicious,
+    clippy::style,
+    clippy::complexity,
+    clippy::perf,
+    clippy::pedantic,
+    clippy::nursery,
+    unsafe_code
+)]
+
 mod config;
 mod handler;
 use std::{future::Future, pin::Pin};
 
 use crate::{
-    config::WebhookConfig,
+    config::Webhook,
     handler::{make_service::MakeSvc, WebhookService},
 };
 use async_nats::Client;
@@ -16,7 +28,7 @@ use tracing::info;
 pub struct WebhookServer {}
 
 impl Component for WebhookServer {
-    type Config = WebhookConfig;
+    type Config = Webhook;
     const SERVICE_NAME: &'static str = "webhook";
 
     fn start(
index cd6707572ac82d0170cb1ead2829af749ed835bd..37e2b7c60b2d844130a1a47dab2debd8f7195bae 100644 (file)
@@ -1,3 +1,14 @@
+#![deny(
+    clippy::all,
+    clippy::correctness,
+    clippy::suspicious,
+    clippy::style,
+    clippy::complexity,
+    clippy::perf,
+    clippy::pedantic,
+    clippy::nursery,
+)]
+
 use anyhow::Result;
 use opentelemetry::sdk::propagation::TraceContextPropagator;
 use opentelemetry::sdk::trace::{self};
index adcaf797de3aa5f57e8b7e28e85090c70c690cbf..cdf0bd36db4b8fa4f4cb70617e616ddb3b89688c 100644 (file)
@@ -5,22 +5,24 @@ use std::{env, ops::Deref};
 use tracing::info;
 
 #[derive(Debug, Deserialize, Clone)]
-pub struct Settings<T: Clone + DeserializeOwned + Default> {
+pub struct Settings<T: Clone + DeserializeOwned> {
     #[serde(skip_deserializing)]
     pub config: T,
-    pub nats: crate::nats::NatsConfiguration,
-    pub redis: crate::redis::RedisConfiguration,
+    pub nats: crate::nats::Configuration,
+    pub redis: crate::redis::Configuration,
 }
 
 impl<T: Clone + DeserializeOwned + Default> Settings<T> {
-    pub fn new(service_name: &str) -> Result<Settings<T>> {
+    /// # Errors
+    /// Fails it the config could not be deserialized to `Self::T`
+    pub fn new(service_name: &str) -> Result<Self> {
         let mut builder = Config::builder();
 
         builder = builder.add_source(File::with_name("config/default"));
         let mode = env::var("ENV").unwrap_or_else(|_| "development".into());
         info!("Configuration Environment: {}", mode);
 
-        builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false));
+        builder = builder.add_source(File::with_name(&format!("config/{mode}")).required(false));
         builder = builder.add_source(File::with_name("config/local").required(false));
 
         let env = Environment::with_prefix("NOVA").separator("__");
@@ -28,7 +30,7 @@ impl<T: Clone + DeserializeOwned + Default> Settings<T> {
         builder = builder.add_source(env);
 
         let config = builder.build()?;
-        let mut settings: Settings<T> = config.clone().try_deserialize()?;
+        let mut settings: Self = config.clone().try_deserialize()?;
 
         //  try to load the config
         settings.config = config.get::<T>(service_name)?;
index a714a1bb71f944911fca61c5e1a56bc0a145c5b2..92e7a0591cdb0e93e3879dcf3a292d1bb48ed3a1 100644 (file)
@@ -1,5 +1,14 @@
-/// This crate is all the utilities shared by the nova rust projects
-/// It includes logging, config and protocols.
+#![deny(
+    clippy::all,
+    clippy::correctness,
+    clippy::suspicious,
+    clippy::style,
+    clippy::complexity,
+    clippy::perf,
+    clippy::pedantic,
+    clippy::nursery,
+)]
+
 pub mod config;
 pub mod nats;
 pub mod payloads;
index 3529c462e29a4b61d9f9b6542fa9fe2fce8f3a43..7d4d3d1e9bf0a3ca76290feedfc90ce919b1bf33 100644 (file)
@@ -4,23 +4,12 @@ use async_nats::Client;
 use serde::Deserialize;
 
 #[derive(Clone, Debug, Deserialize)]
-pub struct NatsConfigurationClientCert {
-    pub cert: String,
-    pub key: String,
-}
-
-#[derive(Clone, Debug, Deserialize)]
-pub struct NatsConfigurationTls {
-    pub mtu: Option<usize>,
-}
-
-#[derive(Clone, Debug, Deserialize)]
-pub struct NatsConfiguration {
+pub struct Configuration {
     pub host: String,
 }
 
-impl From<NatsConfiguration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>> {
-    fn from(value: NatsConfiguration) -> Self {
+impl From<Configuration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>> {
+    fn from(value: Configuration) -> Self {
         Box::pin(async move { Ok(async_nats::connect(value.host).await?) })
     }
 }
index 6e51b2d602b4a4a54e35db037db52aef39c1873b..3f183a94f256153c2c61ba3e33cfa8e33cb412ff 100644 (file)
@@ -31,7 +31,7 @@ impl<'de> Deserialize<'de> for DispatchEventTagged {
         let tagged = DispatchEventTaggedSerialized::deserialize(deserializer)?;
         let deserializer_seed = DispatchEventWithTypeDeserializer::new(&tagged.kind);
         let dispatch_event = deserializer_seed.deserialize(tagged.data).unwrap();
-        Ok(DispatchEventTagged {
+        Ok(Self {
             data: dispatch_event,
         })
     }
index a623c2fac42654b64085f40d88649af65db9a8e2..77aa97f78562780153aa3f76da7e49bf4a528249 100644 (file)
@@ -3,14 +3,14 @@ use serde::Deserialize;
 use std::{future::Future, pin::Pin};
 
 #[derive(Clone, Debug, Deserialize)]
-pub struct RedisConfiguration {
+pub struct Configuration {
     pub url: String,
 }
 
-impl From<RedisConfiguration>
+impl From<Configuration>
     for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>> + Send>>
 {
-    fn from(value: RedisConfiguration) -> Self {
+    fn from(value: Configuration) -> Self {
         Box::pin(async move {
             let con = Client::open(value.url)?;
             let (multiplex, ready) = con.create_multiplexed_tokio_connection().await?;