]> git.puffer.fish Git - matthieu/nova.git/commitdiff
restructure project
authorMatthieuCoder <matthieu@matthieu-dev.xyz>
Mon, 2 Jan 2023 14:59:03 +0000 (18:59 +0400)
committerMatthieuCoder <matthieu@matthieu-dev.xyz>
Mon, 2 Jan 2023 14:59:03 +0000 (18:59 +0400)
40 files changed:
.devcontainer/Dockerfile
.gitignore
Cargo.lock
Cargo.toml
docker-compose.yaml
exes/.gitignore [new file with mode: 0644]
exes/cache/Cargo.toml
exes/cache/src/config.rs
exes/cache/src/main.rs
exes/cache/src/managers/bans.rs
exes/gateway/Cargo.toml
exes/gateway/src/config.rs
exes/gateway/src/main.rs
exes/rest/Cargo.toml
exes/rest/src/config.rs
exes/rest/src/handler.rs [new file with mode: 0644]
exes/rest/src/main.rs
exes/rest/src/proxy/mod.rs [deleted file]
exes/rest/src/ratelimit/mod.rs [deleted file]
exes/rest/src/ratelimit_client/mod.rs [new file with mode: 0644]
exes/rest/src/ratelimit_client/remote_hashring.rs [new file with mode: 0644]
exes/webhook/Cargo.toml
exes/webhook/src/config.rs
exes/webhook/src/handler/handler.rs
exes/webhook/src/handler/make_service.rs
exes/webhook/src/handler/mod.rs
exes/webhook/src/handler/tests/signature.rs
exes/webhook/src/main.rs
libs/leash/Cargo.toml [new file with mode: 0644]
libs/leash/src/lib.rs [new file with mode: 0644]
libs/proto/Cargo.toml
libs/proto/build.rs [new file with mode: 0644]
libs/proto/src/lib.rs
libs/shared/Cargo.toml
libs/shared/src/config.rs
libs/shared/src/nats.rs
libs/shared/src/redis.rs
proto/nova/management/nova.management.v1alpha.proto [deleted file]
proto/nova/management/rpc/nova.management.rpc.v1alpha.proto [deleted file]
proto/nova/ratelimit/ratelimiter.proto [new file with mode: 0644]

index a0da8ab90b9f2a92a6c373387dd79e725c8e41af..06771a3740ba3d2a94c8d69f83bf73ac6609d244 100644 (file)
@@ -7,8 +7,6 @@ RUN apt update -y && apt install libssl-dev pkg-config apt-transport-https curl
     curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg && \
     # Add docker repository apt source
     echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list && \
-    # Add bazel repository apt source
-    echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | tee /etc/apt/sources.list.d/bazel.list && \
     # Install docker
     apt update -y && apt install docker-ce-cli -y
 
index 22904a147af6657f9ef93cfd04d5ad9b12061809..cc5c15c7e7a097a91fb6a510b7785c81abab2b11 100644 (file)
@@ -2,4 +2,5 @@
 target/\r
 **/local*\r
 .ijwb\r
-.idea
\ No newline at end of file
+.idea\r
+config.yml\r
index 943bb096c85ea7e9e7bf3858da82e087d9a7b74b..c76107e2ad78b4e2e2eaa0234a623d797cdd05a2 100644 (file)
@@ -37,6 +37,12 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "anyhow"
+version = "1.0.68"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
+
 [[package]]
 name = "arc-swap"
 version = "1.5.1"
@@ -77,6 +83,27 @@ dependencies = [
  "url",
 ]
 
+[[package]]
+name = "async-stream"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "async-trait"
 version = "0.1.60"
@@ -105,6 +132,52 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
 
+[[package]]
+name = "axum"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48"
+dependencies = [
+ "async-trait",
+ "axum-core",
+ "bitflags",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "hyper",
+ "itoa",
+ "matchit",
+ "memchr",
+ "mime",
+ "percent-encoding",
+ "pin-project-lite",
+ "rustversion",
+ "serde",
+ "sync_wrapper",
+ "tower",
+ "tower-http",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "axum-core"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "mime",
+ "rustversion",
+ "tower-layer",
+ "tower-service",
+]
+
 [[package]]
 name = "base64"
 version = "0.13.1"
@@ -183,9 +256,11 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
 name = "cache"
 version = "0.1.0"
 dependencies = [
+ "anyhow",
  "async-nats",
  "futures-util",
  "log",
+ "proto",
  "redis",
  "serde",
  "serde_json",
@@ -321,16 +396,6 @@ dependencies = [
  "typenum",
 ]
 
-[[package]]
-name = "ctor"
-version = "0.1.26"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
-dependencies = [
- "quote",
- "syn",
-]
-
 [[package]]
 name = "curve25519-dalek"
 version = "3.2.0"
@@ -464,6 +529,18 @@ version = "0.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"
 
+[[package]]
+name = "dns-lookup"
+version = "1.0.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "socket2",
+ "winapi",
+]
+
 [[package]]
 name = "ed25519"
 version = "1.5.2"
@@ -557,6 +634,12 @@ dependencies = [
  "instant",
 ]
 
+[[package]]
+name = "fixedbitset"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
+
 [[package]]
 name = "flate2"
 version = "1.0.25"
@@ -691,8 +774,11 @@ dependencies = [
 name = "gateway"
 version = "0.1.0"
 dependencies = [
+ "anyhow",
  "bytes",
  "futures",
+ "leash",
+ "proto",
  "serde",
  "serde_json",
  "shared",
@@ -733,6 +819,12 @@ dependencies = [
  "wasi 0.11.0+wasi-snapshot-preview1",
 ]
 
+[[package]]
+name = "glob"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
+
 [[package]]
 name = "h2"
 version = "0.3.15"
@@ -761,6 +853,21 @@ dependencies = [
  "ahash",
 ]
 
+[[package]]
+name = "hashring"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dd0ddd025eccd8a2fff9865e82ef4c8ce00c4a67709036847d95cf3ccffd07a8"
+dependencies = [
+ "siphasher",
+]
+
+[[package]]
+name = "heck"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
+
 [[package]]
 name = "hermit-abi"
 version = "0.1.19"
@@ -816,6 +923,12 @@ dependencies = [
  "pin-project-lite",
 ]
 
+[[package]]
+name = "http-range-header"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
+
 [[package]]
 name = "httparse"
 version = "1.8.0"
@@ -875,6 +988,18 @@ dependencies = [
  "tokio-rustls",
 ]
 
+[[package]]
+name = "hyper-timeout"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
+dependencies = [
+ "hyper",
+ "pin-project-lite",
+ "tokio",
+ "tokio-io-timeout",
+]
+
 [[package]]
 name = "hyper-tls"
 version = "0.5.0"
@@ -1013,6 +1138,16 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "leash"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "serde",
+ "shared",
+ "tokio",
+]
+
 [[package]]
 name = "libc"
 version = "0.2.139"
@@ -1070,12 +1205,24 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "matchit"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
+
 [[package]]
 name = "memchr"
 version = "2.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
 
+[[package]]
+name = "mime"
+version = "0.3.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
+
 [[package]]
 name = "minimal-lexical"
 version = "0.2.1"
@@ -1103,6 +1250,12 @@ dependencies = [
  "windows-sys 0.42.0",
 ]
 
+[[package]]
+name = "multimap"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
+
 [[package]]
 name = "native-tls"
 version = "0.2.11"
@@ -1349,6 +1502,16 @@ dependencies = [
  "sha1",
 ]
 
+[[package]]
+name = "petgraph"
+version = "0.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143"
+dependencies = [
+ "fixedbitset",
+ "indexmap",
+]
+
 [[package]]
 name = "pin-project"
 version = "1.0.12"
@@ -1415,6 +1578,16 @@ dependencies = [
  "log",
 ]
 
+[[package]]
+name = "prettyplease"
+version = "0.1.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c8992a85d8e93a28bdf76137db888d3874e3b230dee5ed8bebac4c9f7617773"
+dependencies = [
+ "proc-macro2",
+ "syn",
+]
+
 [[package]]
 name = "proc-macro2"
 version = "1.0.49"
@@ -1454,9 +1627,70 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "prost"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c01db6702aa05baa3f57dec92b8eeeeb4cb19e894e73996b32a4093289e54592"
+dependencies = [
+ "bytes",
+ "prost-derive",
+]
+
+[[package]]
+name = "prost-build"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cb5320c680de74ba083512704acb90fe00f28f79207286a848e730c45dd73ed6"
+dependencies = [
+ "bytes",
+ "heck",
+ "itertools",
+ "lazy_static",
+ "log",
+ "multimap",
+ "petgraph",
+ "prettyplease",
+ "prost",
+ "prost-types",
+ "regex",
+ "syn",
+ "tempfile",
+ "which",
+]
+
+[[package]]
+name = "prost-derive"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c8842bad1a5419bca14eac663ba798f6bc19c413c2fdceb5f3ba3b0932d96720"
+dependencies = [
+ "anyhow",
+ "itertools",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "prost-types"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "017f79637768cde62820bc2d4fe0e45daaa027755c323ad077767c6c5f173091"
+dependencies = [
+ "bytes",
+ "prost",
+]
+
 [[package]]
 name = "proto"
 version = "0.1.0"
+dependencies = [
+ "glob",
+ "prost",
+ "tonic",
+ "tonic-build",
+]
 
 [[package]]
 name = "protobuf"
@@ -1550,6 +1784,24 @@ dependencies = [
  "rand_core 0.5.1",
 ]
 
+[[package]]
+name = "ratelimit"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "futures-util",
+ "hyper",
+ "proto",
+ "serde",
+ "serde_json",
+ "shared",
+ "tokio",
+ "tokio-stream",
+ "tonic",
+ "tracing",
+ "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",
+]
+
 [[package]]
 name = "redis"
 version = "0.22.1"
@@ -1613,13 +1865,25 @@ dependencies = [
 name = "rest"
 version = "0.1.0"
 dependencies = [
+ "anyhow",
+ "dns-lookup",
  "futures-util",
+ "hashring",
+ "http",
  "hyper",
  "hyper-tls",
  "lazy_static",
+ "leash",
+ "proto",
  "serde",
+ "serde_json",
  "shared",
  "tokio",
+ "tokio-scoped",
+ "tokio-stream",
+ "tonic",
+ "tracing",
+ "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",
  "xxhash-rust",
 ]
 
@@ -1706,6 +1970,12 @@ dependencies = [
  "base64",
 ]
 
+[[package]]
+name = "rustversion"
+version = "1.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70"
+
 [[package]]
 name = "ryu"
 version = "1.0.12"
@@ -1906,6 +2176,7 @@ dependencies = [
 name = "shared"
 version = "0.1.0"
 dependencies = [
+ "anyhow",
  "async-nats",
  "config",
  "enumflags2",
@@ -1951,6 +2222,12 @@ version = "1.6.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c"
 
+[[package]]
+name = "siphasher"
+version = "0.3.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
+
 [[package]]
 name = "slab"
 version = "0.4.7"
@@ -2023,6 +2300,12 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "sync_wrapper"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
+
 [[package]]
 name = "synstructure"
 version = "0.12.6"
@@ -2168,6 +2451,16 @@ dependencies = [
  "windows-sys 0.42.0",
 ]
 
+[[package]]
+name = "tokio-io-timeout"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
+dependencies = [
+ "pin-project-lite",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-macros"
 version = "1.8.2"
@@ -2211,6 +2504,27 @@ dependencies = [
  "webpki",
 ]
 
+[[package]]
+name = "tokio-scoped"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e4beb8ba13bc53ac53ce1d52b42f02e5d8060f0f42138862869beb769722b256"
+dependencies = [
+ "tokio",
+ "tokio-stream",
+]
+
+[[package]]
+name = "tokio-stream"
+version = "0.1.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
+dependencies = [
+ "futures-core",
+ "pin-project-lite",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-tungstenite"
 version = "0.17.2"
@@ -2250,6 +2564,96 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "tonic"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
+dependencies = [
+ "async-stream",
+ "async-trait",
+ "axum",
+ "base64",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "hyper",
+ "hyper-timeout",
+ "percent-encoding",
+ "pin-project",
+ "prost",
+ "prost-derive",
+ "tokio",
+ "tokio-stream",
+ "tokio-util",
+ "tower",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+ "tracing-futures",
+]
+
+[[package]]
+name = "tonic-build"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4"
+dependencies = [
+ "prettyplease",
+ "proc-macro2",
+ "prost-build",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "tower"
+version = "0.4.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
+dependencies = [
+ "futures-core",
+ "futures-util",
+ "indexmap",
+ "pin-project",
+ "pin-project-lite",
+ "rand 0.8.5",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "tower-http"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858"
+dependencies = [
+ "bitflags",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "http",
+ "http-body",
+ "http-range-header",
+ "pin-project-lite",
+ "tower",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "tower-layer"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
+
 [[package]]
 name = "tower-service"
 version = "0.3.2"
@@ -2263,6 +2667,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
 dependencies = [
  "cfg-if",
+ "log",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
@@ -2288,6 +2693,16 @@ dependencies = [
  "once_cell",
 ]
 
+[[package]]
+name = "tracing-futures"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
+dependencies = [
+ "pin-project",
+ "tracing",
+]
+
 [[package]]
 name = "try-lock"
 version = "0.2.3"
@@ -2362,7 +2777,7 @@ dependencies = [
  "serde_json",
  "tokio",
  "tracing",
- "twilight-http-ratelimiting",
+ "twilight-http-ratelimiting 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "twilight-model",
  "twilight-validate",
 ]
@@ -2379,6 +2794,17 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "twilight-http-ratelimiting"
+version = "0.14.0"
+source = "git+https://github.com/MatthieuCoder/twilight.git#a7953514373d3e3962435e6a539e0e2504a2c2fd"
+dependencies = [
+ "futures-util",
+ "http",
+ "tokio",
+ "tracing",
+]
+
 [[package]]
 name = "twilight-model"
 version = "0.14.0"
@@ -2578,13 +3004,13 @@ dependencies = [
 name = "webhook"
 version = "0.1.0"
 dependencies = [
- "ctor",
+ "anyhow",
  "ed25519-dalek",
  "hex",
  "hyper",
  "lazy_static",
- "libc",
- "rand 0.8.5",
+ "leash",
+ "proto",
  "serde",
  "serde_json",
  "shared",
@@ -2602,6 +3028,17 @@ dependencies = [
  "untrusted",
 ]
 
+[[package]]
+name = "which"
+version = "4.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1c831fbbee9e129a8cf93e7747a82da9d95ba8e16621cae60ec2cdc849bacb7b"
+dependencies = [
+ "either",
+ "libc",
+ "once_cell",
+]
+
 [[package]]
 name = "winapi"
 version = "0.3.9"
index 501a768096b54263c0deb9525fbd44d00dde142b..785bb95fa0e927e0d97be2803f9428c035ff6f85 100644 (file)
@@ -4,7 +4,9 @@ members = [
     "exes/gateway/",\r
     "exes/rest/",\r
     "exes/webhook/",\r
+    "exes/ratelimit/",\r
 \r
     "libs/proto/",\r
-    "libs/shared/"\r
+    "libs/shared/",\r
+    "libs/leash/"\r
 ]
\ No newline at end of file
index f671f9b5c86b55b1d8f4b33793e602f32b899b31..2472e45e8b4dd3888137690fbd8a083f03039685 100644 (file)
@@ -1,26 +1,92 @@
 version: "3.3"
 services:
+  nats:
+    image: bitnami/nats
+    restart: always
+    ports:
+      - 4222:4222
+      - 8222:8222
+  redis:
+    image: redis
+
   cache:
     image: ghcr.io/discordnova/nova/cache
+    restart: always
     build: 
       context: .
       args:
         - COMPONENT=cache
+    volumes:
+      - ./config.yml:/config/default.yml
+    environment:
+      - RUST_LOG=info
+    depends_on:
+      - nats
+      - redis
+  
   gateway:
     image: ghcr.io/discordnova/nova/gateway
+    restart: always
     build: 
       context: .
       args:
         - COMPONENT=gateway
+    volumes:
+      - ./config.yml:/config/default.yml
+    environment:
+      - RUST_LOG=info
+    depends_on:
+      - nats
+    ports:
+      - 9000:9000
+
   rest:
     image: ghcr.io/discordnova/nova/rest
+    restart: always
     build: 
       context: .
       args:
         - COMPONENT=rest
+    volumes:
+      - ./config.yml:/config/default.yml
+    environment:
+      - RUST_LOG=info
+    depends_on:
+      - ratelimit
+    ports:
+      - 9001:9000
+      - 8080:8080
+
   webhook:
     image: ghcr.io/discordnova/nova/webhook
+    restart: always
     build: 
       context: .
       args:
         - COMPONENT=webhook
+    volumes:
+      - ./config.yml:/config/default.yml
+    environment:
+      - RUST_LOG=info
+    depends_on:
+      - nats
+    ports:
+      - 9002:9000
+      - 8081:8080
+  ratelimit:
+    image: ghcr.io/discordnova/nova/ratelimit
+    restart: always
+    build: 
+      context: .
+      args:
+        - COMPONENT=ratelimit
+    volumes:
+      - ./config.yml:/config/default.yml
+    environment:
+      - RUST_LOG=info
+    depends_on:
+      - nats
+      - redis
+    ports:
+      - 9003:9000
+      - 8082:8080
\ No newline at end of file
diff --git a/exes/.gitignore b/exes/.gitignore
new file mode 100644 (file)
index 0000000..0f02d23
--- /dev/null
@@ -0,0 +1 @@
+ratelimit/
\ No newline at end of file
index 61bb449f54217af05fe3724b2a3f96b3602dcc58..9a7f9ee2e93328c1bf6f24d7d4614cc298d1fb85 100644 (file)
@@ -7,6 +7,7 @@ edition = "2018"
 
 [dependencies]
 shared = { path = "../../libs/shared" }
+proto = { path = "../../libs/proto" }
 async-nats = "0.25.1"
 tokio = { version = "1", features = ["full"] }
 serde = { version = "1.0.8", features = ["derive"] }
@@ -14,4 +15,5 @@ log = { version = "0.4", features = ["std"] }
 serde_json = { version = "1.0" }
 redis = "*"
 futures-util = "*"
-twilight-model = "0.14"
\ No newline at end of file
+twilight-model = "0.14"
+anyhow = "1.0.68"
\ No newline at end of file
index 2aa71a8b4229700ae22de0e8da2cf2bf2ec46891..3d9f5e24e61f9194de9d52114c00b2bee42341bb 100644 (file)
@@ -1,6 +1,5 @@
 use serde::Deserialize;
-
 #[derive(Debug, Deserialize, Clone, Default)]
 pub struct CacheConfiguration {
     pub toggles: Vec<String>
-}
\ No newline at end of file
+}
index a74cfa6491d830c15cc753e92f679b69bcf906a6..312f960664094ff3337d5d14aac1d74ce3ea69c6 100644 (file)
@@ -1,7 +1,7 @@
-use std::error::Error;
+use std::{error::Error, pin::Pin};
 
 use async_nats::{Client, Subscriber};
-use futures_util::stream::StreamExt;
+use futures_util::{stream::StreamExt, Future};
 use log::info;
 use managers::{
     automoderation::Automoderation, bans::Bans, channels::Channels,
@@ -22,7 +22,7 @@ pub enum CacheSourcedEvents {
 }
 
 #[derive(Default)]
-struct MegaCache {
+struct Cache {
     automoderation: Automoderation,
     channels: Channels,
     bans: Bans,
@@ -42,18 +42,18 @@ struct MegaCache {
 async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
     let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();
     info!("loaded configuration: {:?}", settings);
-
-    let nats: Client = settings.nats.to_client().await?;
+    let nats =
+        Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats).await?;
     // let redis: redis::Client = settings.redis.into();
 
-    let mut cache = MegaCache::default();
+    let mut cache = Cache::default();
 
     let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?;
     listen(&mut sub, &mut cache, settings.config.toggles).await;
     Ok(())
 }
 
-async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec<String>) {
+async fn listen(sub: &mut Subscriber, cache: &mut Cache, features: Vec<String>) {
     while let Some(data) = sub.next().await {
         let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap();
         let event = cp.data.data;
index 27e6a34949968b351b329ae2e00a7b136d7efcaa..24a3fcd11c0d96cb21c54a515e6cf156de530148 100644 (file)
@@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
 use super::CacheManager;
 use std::future::Future;
 
-
 #[derive(Default)]
 pub struct Bans {}
 impl CacheManager for Bans {
index 7ef3d9882fa4ccb2c0d9e0963e0badb2a22b47e6..d71ed4ac07cb2e3b146539592523eba9c4af65f2 100644 (file)
@@ -5,10 +5,13 @@ edition = "2018"
 
 [dependencies]
 shared = { path = "../../libs/shared" }
+proto = { path = "../../libs/proto" }
+leash = { path = "../../libs/leash" }
 tokio = { version = "1", features = ["full"] }
 twilight-gateway = { version = "0.14" }
 twilight-model = "0.14"
 serde = { version = "1.0.8", features = ["derive"] }
 futures = "0.3"
 serde_json = { version = "1.0" }
-bytes = "*"
\ No newline at end of file
+bytes = "*"
+anyhow = "*"
\ No newline at end of file
index 4c62de636db2fb4694f9511fe8c23814f4660b99..923ab30cebc2f03c6790bc8a2475811439f9b770 100644 (file)
@@ -2,13 +2,20 @@ use shared::serde::{Deserialize, Serialize};
 use twilight_gateway::Intents;
 
 #[derive(Serialize, Deserialize, Clone)]
-pub struct Config {
+pub struct GatewayConfig {
     pub token: String,
-    pub intents: Intents
+    pub intents: Intents,
+    pub shard: u64,
+    pub shard_total: u64,
 }
 
-impl Default for Config {
+impl Default for GatewayConfig {
     fn default() -> Self {
-        Self { intents: Intents::empty(), token: String::default() }
+        Self {
+            intents: Intents::empty(),
+            token: String::default(),
+            shard_total: 1,
+            shard: 1,
+        }
     }
 }
index 6968fe431a5704acb1ddb24750d5e3b7333dcfee..7957b08435c7f4b5e90bb4bcaa8c17d474c289f2 100644 (file)
@@ -1,51 +1,69 @@
-use config::Config;
+use config::GatewayConfig;
+use leash::{ignite, AnyhowResultFuture, Component};
 use shared::{
     config::Settings,
     log::{debug, info},
     nats_crate::Client,
     payloads::{CachePayload, DispatchEventTagged, Tracing},
 };
-use std::{convert::TryFrom, error::Error};
+use std::{convert::TryFrom, pin::Pin};
 use twilight_gateway::{Event, Shard};
 mod config;
-use futures::StreamExt;
+use futures::{Future, StreamExt};
 use twilight_model::gateway::event::DispatchEvent;
 
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
-    let settings: Settings<Config> = Settings::new("gateway").unwrap();
-    let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents);
-    let nats: Client = settings.nats.to_client().await?;
+struct GatewayServer {}
+impl Component for GatewayServer {
+    type Config = GatewayConfig;
+    const SERVICE_NAME: &'static str = "gateway";
 
-    shard.start().await?;
+    fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+        Box::pin(async move {
+            let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
+                .shard(settings.shard, settings.shard_total)?
+                .build();
 
-    while let Some(event) = events.next().await {
-        match event {
-            Event::Ready(ready) => {
-                info!("Logged in as {}", ready.user.name);
-            }
+            let nats =
+                Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats)
+                    .await?;
+
+            shard.start().await?;
+
+            while let Some(event) = events.next().await {
+                match event {
+                    Event::Ready(ready) => {
+                        info!("Logged in as {}", ready.user.name);
+                    }
 
-            _ => {
-                let name = event.kind().name();
-                if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
-                    let data = CachePayload {
-                        tracing: Tracing {
-                            node_id: "".to_string(),
-                            span: None,
-                        },
-                        data: DispatchEventTagged {
-                            data: dispatch_event,
-                        },
-                    };
-                    let value = serde_json::to_string(&data)?;
-                    debug!("nats send: {}", value);
-                    let bytes = bytes::Bytes::from(value);
-                    nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
-                        .await?;
+                    _ => {
+                        let name = event.kind().name();
+                        if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+                            let data = CachePayload {
+                                tracing: Tracing {
+                                    node_id: "".to_string(),
+                                    span: None,
+                                },
+                                data: DispatchEventTagged {
+                                    data: dispatch_event,
+                                },
+                            };
+                            let value = serde_json::to_string(&data)?;
+                            debug!("nats send: {}", value);
+                            let bytes = bytes::Bytes::from(value);
+                            nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+                                .await?;
+                        }
+                    }
                 }
             }
-        }
+
+            Ok(())
+        })
     }
 
-    Ok(())
+    fn new() -> Self {
+        Self {}
+    }
 }
+
+ignite!(GatewayServer);
index 7b5b2b51db63144057708b38361a915d787c14f6..f4c5eccc5c2c6a67e37b09b5867a70ed2a4babca 100644 (file)
@@ -7,10 +7,23 @@ edition = "2018"
 
 [dependencies]
 shared = { path = "../../libs/shared" }
+proto = { path = "../../libs/proto" }
+leash = { path = "../../libs/leash" }
+
 hyper = { version = "0.14", features = ["full"] }
 tokio = { version = "1", features = ["full"] }
 serde = { version = "1.0.8", features = ["derive"] }
 futures-util = "0.3.17"
 hyper-tls = "0.5.0"
 lazy_static = "1.4.0"
-xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
\ No newline at end of file
+xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
+twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" }
+tracing = "0.1.37"
+hashring = "0.3.0"
+anyhow = "*"
+tonic = "0.8.3"
+serde_json = { version = "1.0" }
+http = "0.2.8"
+tokio-stream = "0.1.11"
+dns-lookup = "1.0.8"
+tokio-scoped = "0.2.0"
\ No newline at end of file
index 559929f0ab25680c8f7d26eb308c9bced66b14ed..9261de268e4c9e6ae95d7405a8e25f9842fd384b 100644 (file)
@@ -1,9 +1,21 @@
+use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
 use serde::Deserialize;
 
-#[derive(Debug, Deserialize, Clone, Default)]
+fn default_listening_address() -> SocketAddr {
+    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080))
+}
+
+#[derive(Debug, Deserialize, Clone)]
 pub struct ServerSettings {
-    pub port: u16,
-    pub address: String,
+    #[serde(default = "default_listening_address")]
+    pub listening_adress: SocketAddr
+}
+impl Default for ServerSettings {
+    fn default() -> Self {
+        Self {
+            listening_adress: default_listening_address(),
+        }
+    }
 }
 
 #[derive(Debug, Deserialize, Clone, Default)]
@@ -12,7 +24,7 @@ pub struct Discord {
 }
 
 #[derive(Debug, Deserialize, Clone, Default)]
-pub struct Config {
+pub struct ReverseProxyConfig {
     pub server: ServerSettings,
     pub discord: Discord,
 }
diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs
new file mode 100644 (file)
index 0000000..8b0dd52
--- /dev/null
@@ -0,0 +1,141 @@
+use std::{
+    collections::hash_map::DefaultHasher,
+    convert::TryFrom,
+    hash::{Hash, Hasher},
+    str::FromStr,
+};
+
+use anyhow::bail;
+use http::{
+    header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
+    HeaderValue, Method as HttpMethod, Request, Response, Uri,
+};
+use hyper::{client::HttpConnector, Body, Client};
+use hyper_tls::HttpsConnector;
+use shared::log::error;
+use twilight_http_ratelimiting::{Method, Path};
+
+use crate::ratelimit_client::RemoteRatelimiter;
+
+/// Normalizes the path
+fn normalize_path(request_path: &str) -> (&str, &str) {
+    if let Some(trimmed_path) = request_path.strip_prefix("/api") {
+        if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) {
+            if let Some(version_number) = maybe_api_version.strip_prefix('v') {
+                if version_number.parse::<u8>().is_ok() {
+                    let len = "/api/v".len() + version_number.len();
+                    return (&request_path[..len], &request_path[len..]);
+                };
+            };
+        }
+
+        ("/api", trimmed_path)
+    } else {
+        ("/api", request_path)
+    }
+}
+
+pub async fn handle_request(
+    client: Client<HttpsConnector<HttpConnector>, Body>,
+    ratelimiter: RemoteRatelimiter,
+    token: String,
+    mut request: Request<Body>,
+) -> Result<Response<Body>, anyhow::Error> {
+    let (hash, uri_string) = {
+        let method = match *request.method() {
+            HttpMethod::DELETE => Method::Delete,
+            HttpMethod::GET => Method::Get,
+            HttpMethod::PATCH => Method::Patch,
+            HttpMethod::POST => Method::Post,
+            HttpMethod::PUT => Method::Put,
+            _ => {
+                error!("Unsupported HTTP method in request, {}", request.method());
+                bail!("unsupported method");
+            }
+        };
+
+        let request_path = request.uri().path();
+        let (api_path, trimmed_path) = normalize_path(&request_path);
+
+        let mut uri_string = format!("http://192.168.0.27:8000{}{}", api_path, trimmed_path);
+        if let Some(query) = request.uri().query() {
+            uri_string.push('?');
+            uri_string.push_str(query);
+        }
+
+        let mut hash = DefaultHasher::new();
+        match Path::try_from((method, trimmed_path)) {
+            Ok(path) => path,
+            Err(e) => {
+                error!(
+                    "Failed to parse path for {:?} {}: {:?}",
+                    method, trimmed_path, e
+                );
+                bail!("failed o parse");
+            }
+        }
+        .hash(&mut hash);
+
+        (hash.finish().to_string(), uri_string)
+    };
+
+    let header_sender = match ratelimiter.ticket(hash).await {
+        Ok(sender) => sender,
+        Err(e) => {
+            error!("Failed to receive ticket for ratelimiting: {:?}", e);
+            bail!("failed to reteive ticket");
+        }
+    };
+
+    request.headers_mut().insert(
+        AUTHORIZATION,
+        HeaderValue::from_bytes(token.as_bytes())
+            .expect("strings are guaranteed to be valid utf-8"),
+    );
+    request
+        .headers_mut()
+        .insert(HOST, HeaderValue::from_static("discord.com"));
+
+    // Remove forbidden HTTP/2 headers
+    // https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2
+    request.headers_mut().remove(CONNECTION);
+    request.headers_mut().remove("keep-alive");
+    request.headers_mut().remove("proxy-connection");
+    request.headers_mut().remove(TRANSFER_ENCODING);
+    request.headers_mut().remove(UPGRADE);
+    request.headers_mut().remove(AUTHORIZATION);
+    request.headers_mut().append(
+        AUTHORIZATION,
+        HeaderValue::from_static(
+            "Bot ODA3MTg4MzM1NzE3Mzg0MjEy.G3sXFM.8gY2sVYDAq2WuPWwDskAAEFLfTg8htooxME-LE",
+        ),
+    );
+
+    let uri = match Uri::from_str(&uri_string) {
+        Ok(uri) => uri,
+        Err(e) => {
+            error!("Failed to create URI for requesting Discord API: {:?}", e);
+            bail!("failed to create uri");
+        }
+    };
+    *request.uri_mut() = uri;
+    let resp = match client.request(request).await {
+        Ok(response) => response,
+        Err(e) => {
+            error!("Error when requesting the Discord API: {:?}", e);
+            bail!("failed to request the discord api");
+        }
+    };
+
+    let ratelimit_headers = resp
+        .headers()
+        .into_iter()
+        .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string()))
+        .collect();
+
+    if header_sender.send(ratelimit_headers).is_err() {
+        error!("Error when sending ratelimit headers to ratelimiter");
+    };
+
+    Ok(resp)
+}
index 9fa6ce744e49e5e80a03faa2e105176db715d489..8d014ab1f8f3d08852263d392d1820f889b29307 100644 (file)
@@ -1,46 +1,56 @@
-use std::{convert::Infallible, sync::Arc};
+use config::ReverseProxyConfig;
 
-use crate::{config::Config, ratelimit::Ratelimiter};
-use shared::{
-    config::Settings,
-    log::{error, info},
-    redis_crate::Client,
+use handler::handle_request;
+use hyper::{
+    server::conn::AddrStream,
+    service::{make_service_fn, service_fn},
+    Body, Client, Request, Server,
 };
-use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
-use std::net::ToSocketAddrs;
-use tokio::sync::Mutex;
-
-use crate::proxy::ServiceProxy;
+use hyper_tls::HttpsConnector;
+use leash::{ignite, AnyhowResultFuture, Component};
+use shared::config::Settings;
+use std::convert::Infallible;
 
 mod config;
-mod proxy;
-mod ratelimit;
-
-#[tokio::main]
-async fn main() {
-    let settings: Settings<Config> = Settings::new("rest").unwrap();
-    let config = Arc::new(settings.config);
-    let redis_client: Client = settings.redis.into();
-    let redis = Arc::new(Mutex::new(
-        redis_client.get_async_connection().await.unwrap(),
-    ));
-    let ratelimiter = Arc::new(Ratelimiter::new(redis));
-
-    let addr = format!("{}:{}", config.server.address, config.server.port)
-        .to_socket_addrs()
-        .unwrap()
-        .next()
-        .unwrap();
-
-    let service_fn = make_service_fn(move |_: &AddrStream| {
-        let service_proxy = ServiceProxy::new(config.clone(), ratelimiter.clone());
-        async move { Ok::<_, Infallible>(service_proxy) }
-    });
-
-    let server = Server::bind(&addr).serve(service_fn);
-
-    info!("starting ratelimit server");
-    if let Err(e) = server.await {
-        error!("server error: {}", e);
+mod handler;
+mod ratelimit_client;
+
+struct ReverseProxyServer {}
+impl Component for ReverseProxyServer {
+    type Config = ReverseProxyConfig;
+    const SERVICE_NAME: &'static str = "rest";
+
+    fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+        Box::pin(async move {
+            // Client to the remote ratelimiters
+            let ratelimiter = ratelimit_client::RemoteRatelimiter::new();
+            let client = Client::builder().build(HttpsConnector::new());
+
+            let service_fn = make_service_fn(move |_: &AddrStream| {
+                let client = client.clone();
+                let ratelimiter = ratelimiter.clone();
+                async move {
+                    Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
+                        let client = client.clone();
+                        let ratelimiter = ratelimiter.clone();
+                        async move {
+                            handle_request(client, ratelimiter, "token".to_string(), request).await
+                        }
+                    }))
+                }
+            });
+
+            let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn);
+
+            server.await?;
+
+            Ok(())
+        })
+    }
+
+    fn new() -> Self {
+        Self {}
     }
 }
+
+ignite!(ReverseProxyServer);
diff --git a/exes/rest/src/proxy/mod.rs b/exes/rest/src/proxy/mod.rs
deleted file mode 100644 (file)
index 65d77aa..0000000
+++ /dev/null
@@ -1,138 +0,0 @@
-use crate::{config::Config, ratelimit::Ratelimiter};
-use hyper::{
-    client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client,
-    Request, Response, Uri,
-};
-use hyper_tls::HttpsConnector;
-use shared::{
-    log::debug,
-    prometheus::{labels, opts, register_counter, register_histogram_vec, Counter, HistogramVec},
-};
-use std::{future::Future, pin::Pin, sync::Arc, task::Poll};
-use tokio::sync::Mutex;
-
-lazy_static::lazy_static! {
-    static ref HTTP_COUNTER: Counter = register_counter!(opts!(
-        "nova_rest_http_requests_total",
-        "Number of HTTP requests made.",
-        labels! {"handler" => "all",}
-    ))
-    .unwrap();
-
-    static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!(
-        "nova_rest_http_request_duration_seconds",
-        "The HTTP request latencies in seconds.",
-        &["handler"]
-    )
-    .unwrap();
-
-    static ref HTTP_COUNTER_STATUS: Counter = register_counter!(opts!(
-        "nova_rest_http_requests_status",
-        "Number of HTTP requests made by status",
-        labels! {"" => ""}
-    ))
-    .unwrap();
-}
-
-#[derive(Clone)]
-pub struct ServiceProxy {
-    client: Client<HttpsConnector<HttpConnector>>,
-    ratelimiter: Arc<Ratelimiter>,
-    config: Arc<Config>,
-    fail: Arc<Mutex<i32>>,
-}
-
-impl Service<Request<Body>> for ServiceProxy {
-    type Response = Response<Body>;
-    type Error = hyper::Error;
-    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
-
-    fn poll_ready(
-        &mut self,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), Self::Error>> {
-        match self.client.poll_ready(cx) {
-            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
-            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
-            Poll::Pending => Poll::Pending,
-        }
-    }
-
-    fn call(&mut self, mut req: Request<hyper::Body>) -> Self::Future {
-        HTTP_COUNTER.inc();
-
-        let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer();
-        let host = "discord.com";
-        let mut new_parts = Parts::default();
-
-        let path = req.uri().path().to_string();
-
-        new_parts.scheme = Some("https".parse().unwrap());
-        new_parts.authority = Some(host.parse().unwrap());
-        new_parts.path_and_query = Some(path.parse().unwrap());
-
-        *req.uri_mut() = Uri::from_parts(new_parts).unwrap();
-
-        let headers = req.headers_mut();
-        headers.remove("user-agent");
-        headers.insert("Host", HeaderValue::from_str("discord.com").unwrap());
-        headers.insert(
-            "Authorization",
-            HeaderValue::from_str(&format!("Bot {}", self.config.discord.token)).unwrap(),
-        );
-
-        println!("{:?}", headers);
-
-        let client = self.client.clone();
-        let ratelimiter = self.ratelimiter.clone();
-        let fail = self.fail.clone();
-
-        return Box::pin(async move {
-            let resp = match ratelimiter.before_request(&req).await {
-                Ok(allowed) => match allowed {
-                    crate::ratelimit::RatelimiterResponse::Ratelimited => {
-                        debug!("ratelimited");
-                        Ok(Response::builder().body("ratelimited".into()).unwrap())
-                    }
-                    _ => {
-                        debug!("forwarding request");
-                        match client.request(req).await {
-                            Ok(mut response) => {
-                                ratelimiter.after_request(&path, &response).await;
-                                if response.status() != 200 {
-                                    *fail.lock().await += 1
-                                }
-                                response.headers_mut().insert(
-                                    "x-fails",
-                                    HeaderValue::from_str(&format!("{}", fail.lock().await))
-                                        .unwrap(),
-                                );
-                                Ok(response)
-                            }
-                            Err(e) => Err(e),
-                        }
-                    }
-                },
-                Err(e) => Ok(Response::builder()
-                    .body(format!("server error: {}", e).into())
-                    .unwrap()),
-            };
-            timer.observe_duration();
-            resp
-        });
-    }
-}
-
-impl ServiceProxy {
-    pub fn new(config: Arc<Config>, ratelimiter: Arc<Ratelimiter>) -> Self {
-        let https = HttpsConnector::new();
-        let client = Client::builder().build::<_, hyper::Body>(https);
-        let fail = Arc::new(Mutex::new(0));
-        ServiceProxy {
-            client,
-            config,
-            ratelimiter,
-            fail,
-        }
-    }
-}
diff --git a/exes/rest/src/ratelimit/mod.rs b/exes/rest/src/ratelimit/mod.rs
deleted file mode 100644 (file)
index 132bfd3..0000000
+++ /dev/null
@@ -1,155 +0,0 @@
-use shared::{
-    log::debug,
-    redis_crate::{aio::Connection, AsyncCommands}, error::GenericError,
-};
-use hyper::{Body, Request, Response};
-use std::{
-    convert::TryInto,
-    sync::Arc,
-    time::{SystemTime, UNIX_EPOCH},
-};
-use tokio::sync::Mutex;
-use xxhash_rust::xxh32::xxh32;
-
-pub enum RatelimiterResponse {
-    NoSuchUrl,
-    Ratelimited,
-    Pass,
-}
-
-pub struct Ratelimiter {
-    redis: Arc<Mutex<Connection>>,
-}
-
-impl Ratelimiter {
-    pub fn new(redis: Arc<Mutex<Connection>>) -> Ratelimiter {
-        return Ratelimiter { redis };
-    }
-
-    pub async fn before_request(
-        &self,
-        request: &Request<Body>,
-    ) -> Result<RatelimiterResponse, GenericError> {
-        // we lookup if the route hash is stored in the redis table
-        let path = request.uri().path();
-        let hash = xxh32(path.as_bytes(), 32);
-        let mut redis = self.redis.lock().await;
-
-        let start = SystemTime::now();
-        let since_the_epoch = start
-            .duration_since(UNIX_EPOCH)
-            .expect("Time went backwards");
-
-        // global rate litmit
-        match redis
-            .get::<String, Option<i32>>(format!(
-                "nova:rest:ratelimit:global:{}",
-                since_the_epoch.as_secs()
-            ))
-            .await
-        {
-            Ok(value) => {
-                match value {
-                    Some(value) => {
-                        debug!("incr: {}", value);
-                        if value >= 49 {
-                            return Ok(RatelimiterResponse::Ratelimited);
-                        }
-                    }
-                    None => {
-                        let key =
-                            format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs());
-                        // init global ratelimit
-                        redis.set_ex::<String, i32, ()>(key, 0, 2).await.unwrap();
-                    }
-                }
-            }
-            Err(_) => {
-                return Err(GenericError::StepFailed("radis ratelimit check".to_string()));
-            }
-        };
-
-        // we lookup the corresponding bucket for this url
-        match redis
-            .get::<String, Option<String>>(format!("nova:rest:ratelimit:url_bucket:{}", hash))
-            .await
-        {
-            Ok(bucket) => match bucket {
-                Some(bucket) => {
-                    match redis
-                        .exists::<String, bool>(format!("nova:rest:ratelimit:lock:{}", bucket))
-                        .await
-                    {
-                        Ok(exists) => {
-                            if exists {
-                                Ok(RatelimiterResponse::Ratelimited)
-                            } else {
-                                Ok(RatelimiterResponse::Pass)
-                            }
-                        }
-                        Err(_) =>  Err(GenericError::StepFailed("radis ratelimit check".to_string())),
-                    }
-                }
-                None => Ok(RatelimiterResponse::NoSuchUrl),
-            },
-            Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())),
-        }
-    }
-
-    fn parse_headers(&self, response: &Response<Body>) -> Option<(String, i32, i32)> {
-        if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") {
-            let bucket = bucket.to_str().unwrap().to_string();
-
-            let remaining = response.headers().get("X-RateLimit-Remaining").unwrap();
-            let reset = response.headers().get("X-RateLimit-Reset-After").unwrap();
-
-            let remaining_i32 = remaining.to_str().unwrap().parse::<i32>().unwrap();
-            let reset_ms_i32 = reset.to_str().unwrap().parse::<f32>().unwrap().ceil() as i32;
-            return Some((bucket, remaining_i32, reset_ms_i32));
-        } else {
-            None
-        }
-    }
-
-    pub async fn after_request(&self, path: &str, response: &Response<Body>) {
-        let hash = xxh32(path.as_bytes(), 32);
-        // verified earlier
-
-        let mut redis = self.redis.lock().await;
-
-        let start = SystemTime::now();
-        let since_the_epoch = start
-            .duration_since(UNIX_EPOCH)
-            .expect("Time went backwards");
-
-        redis
-            .incr::<String, i32, ()>(
-                format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()),
-                1,
-            )
-            .await
-            .unwrap();
-        if let Some((bucket, remaining, reset)) = self.parse_headers(response) {
-            if remaining <= 1 {
-                // we set a lock for the bucket until the timeout passes
-                redis
-                    .set_ex::<String, bool, ()>(
-                        format!("nova:rest:ratelimit:lock:{}", bucket),
-                        true,
-                        reset.try_into().unwrap(),
-                    )
-                    .await
-                    .unwrap();
-            }
-
-            redis
-                .set_ex::<String, String, ()>(
-                    format!("nova:rest:ratelimit:url_bucket:{}", hash),
-                    bucket,
-                    reset.try_into().unwrap(),
-                )
-                .await
-                .unwrap();
-        }
-    }
-}
diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs
new file mode 100644 (file)
index 0000000..8263d15
--- /dev/null
@@ -0,0 +1,137 @@
+use self::remote_hashring::{HashRingWrapper, VNode};
+use futures_util::Future;
+use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};
+use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest;
+use shared::log::debug;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::time::UNIX_EPOCH;
+use std::time::{Duration, SystemTime};
+use tokio::sync::oneshot::{self};
+use tokio::sync::{broadcast, mpsc, RwLock};
+use tokio_stream::wrappers::ReceiverStream;
+
+mod remote_hashring;
+
+#[derive(Clone, Debug)]
+pub struct RemoteRatelimiter {
+    remotes: Arc<RwLock<HashRingWrapper>>,
+    stop: Arc<tokio::sync::broadcast::Sender<()>>,
+}
+
+impl Drop for RemoteRatelimiter {
+    fn drop(&mut self) {
+        self.stop.clone().send(()).unwrap();
+    }
+}
+
+impl RemoteRatelimiter {
+    async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> {
+        // get list of dns responses
+        let responses = dns_lookup::lookup_host("ratelimit")
+            .unwrap()
+            .into_iter()
+            .map(|f| f.to_string());
+
+        let mut write = self.remotes.write().await;
+
+        for ip in responses {
+            let a = VNode::new(ip.into()).await?;
+            write.add(a.clone());
+        }
+
+        return Ok(());
+    }
+
+    #[must_use]
+    pub fn new() -> Self {
+        let (rx, mut tx) = broadcast::channel(1);
+        let obj = Self {
+            remotes: Arc::new(RwLock::new(HashRingWrapper::default())),
+            stop: Arc::new(rx),
+        };
+
+        let obj_clone = obj.clone();
+        // Task to update the ratelimiters in the background
+        tokio::spawn(async move {
+            loop {
+                let sleep = tokio::time::sleep(Duration::from_secs(10));
+                tokio::pin!(sleep);
+
+                debug!("refreshing");
+                obj_clone.get_ratelimiters().await.unwrap();
+                tokio::select! {
+                    () = &mut sleep => {
+                        println!("timer elapsed");
+                    },
+                    _ = tx.recv() => {}
+                }
+            }
+        });
+
+        obj
+    }
+
+    pub fn ticket(
+        &self,
+        path: String,
+    ) -> Pin<
+        Box<
+            dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>>
+                + Send
+                + 'static,
+        >,
+    > {
+        let remotes = self.remotes.clone();
+        let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
+
+        Box::pin(async move {
+            // Get node managing this path
+            let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
+
+            // Buffers for the gRPC streaming channel.
+            let (send, remote) = mpsc::channel(5);
+            let (do_request, wait) = oneshot::channel();
+            // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
+            let stream = ReceiverStream::new(remote);
+
+            // Start the grpc streaming
+            let ticket = node.submit_ticket(stream).await?;
+
+            // First, send the request
+            send.send(BucketSubmitTicketRequest {
+                data: Some(Data::Path(path)),
+            })
+            .await?;
+
+            // We continuously listen for events in the channel.
+            tokio::spawn(async move {
+                let message = ticket.into_inner().message().await.unwrap().unwrap();
+
+                if message.accepted == 1 {
+                    do_request.send(()).unwrap();
+                    let headers = rx.await.unwrap();
+
+                    send.send(BucketSubmitTicketRequest {
+                        data: Some(Data::Headers(Headers {
+                            precise_time: SystemTime::now()
+                                .duration_since(UNIX_EPOCH)
+                                .expect("time went backwards")
+                                .as_millis() as u64,
+                            headers,
+                        })),
+                    })
+                    .await
+                    .unwrap();
+                }
+            });
+
+            // Wait for the message to be sent
+            wait.await?;
+
+            Ok(tx)
+        })
+    }
+}
diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs
new file mode 100644 (file)
index 0000000..b9f7800
--- /dev/null
@@ -0,0 +1,67 @@
+use core::fmt::Debug;
+use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient;
+use std::hash::Hash;
+use std::ops::Deref;
+use std::ops::DerefMut;
+use tonic::transport::Channel;
+
+#[derive(Debug, Clone)]
+pub struct VNode {
+    address: String,
+
+    client: RatelimiterClient<Channel>,
+}
+
+impl Deref for VNode {
+    type Target = RatelimiterClient<Channel>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.client
+    }
+}
+
+impl DerefMut for VNode {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.client
+    }
+}
+
+impl Hash for VNode {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.address.hash(state);
+    }
+}
+
+impl VNode {
+    pub async fn new(address: String) -> Result<Self, tonic::transport::Error> {
+        let client = RatelimiterClient::connect(format!("http://{}:8080", address.clone())).await?;
+
+        Ok(VNode { client, address })
+    }
+}
+
+unsafe impl Send for VNode {}
+
+#[repr(transparent)]
+#[derive(Default)]
+pub struct HashRingWrapper(hashring::HashRing<VNode>);
+
+impl Deref for HashRingWrapper {
+    type Target = hashring::HashRing<VNode>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl DerefMut for HashRingWrapper {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.0
+    }
+}
+
+impl Debug for HashRingWrapper {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_tuple("HashRing").finish()
+    }
+}
index 1d6a5a72af10a699c1704a09f7a950104adb86e8..12a66080054f3610a86e36384ae55056afd1aab0 100644 (file)
@@ -7,15 +7,16 @@ edition = "2018"
 hyper = { version = "0.14", features = ["full"] }
 tokio = { version = "1", features = ["full"] }
 shared = { path = "../../libs/shared" }
+proto = { path = "../../libs/proto" }
+leash = { path = "../../libs/leash" }
+
 serde = { version = "1.0.8", features = ["derive"] }
 hex = "0.4.3"
 serde_json = { version = "1.0" }
-libc = "0.2.101"
 lazy_static = "1.4.0"
-ctor = "0.1.21"
 ed25519-dalek = "1"
 twilight-model = { version = "0.14" }
-rand = "0.8"
+anyhow = "1.0.68"
 
 [[bin]]
 name = "webhook"
index a054d33edd3443f7f3faf7fb7904438d2892388e..68f6a5fc8949b0c21f2b611d42fc69d6e8eb25a3 100644 (file)
@@ -1,19 +1,43 @@
-use serde::Deserialize;
+use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
 
-#[derive(Debug, Deserialize, Clone, Default)]
+use ed25519_dalek::PublicKey;
+use serde::{Deserialize, Deserializer};
+
+fn default_listening_address() -> SocketAddr {
+    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080))
+}
+
+#[derive(Debug, Deserialize, Clone, Copy)]
 pub struct ServerSettings {
-    pub port: u16,
-    pub address: String,
+    #[serde(default = "default_listening_address")]
+    pub listening_adress: SocketAddr,
+}
+impl Default for ServerSettings {
+    fn default() -> Self {
+        Self {
+            listening_adress: default_listening_address(),
+        }
+    }
+}
+
+fn deserialize_pk<'de, D>(deserializer: D) -> Result<PublicKey, D::Error>
+where
+    D: Deserializer<'de>,
+{
+    let str = String::deserialize(deserializer)?;
+    let public_key = PublicKey::from_bytes(&hex::decode(&str).unwrap()).unwrap();
+    Ok(public_key)
 }
 
-#[derive(Debug, Deserialize, Clone, Default)]
+#[derive(Debug, Deserialize, Clone, Default, Copy)]
 pub struct Discord {
-    pub public_key: String,
+    #[serde(deserialize_with = "deserialize_pk")]
+    pub public_key: PublicKey,
     pub client_id: u32,
 }
 
-#[derive(Debug, Deserialize, Clone, Default)]
-pub struct Config {
+#[derive(Debug, Deserialize, Clone, Default, Copy)]
+pub struct WebhookConfig {
     pub server: ServerSettings,
     pub discord: Discord,
 }
index b2ef44cf4f04806ee49285e60094182552b5b9df..af79185026c1c7e31d757b5554665f0dcf626755 100644 (file)
@@ -1,13 +1,12 @@
 use super::error::WebhookError;
 use super::signature::validate_signature;
-use crate::config::Config;
+use crate::config::WebhookConfig;
 use ed25519_dalek::PublicKey;
 use hyper::{
     body::{to_bytes, Bytes},
     service::Service,
     Body, Method, Request, Response, StatusCode,
 };
-use serde::{Deserialize, Serialize};
 use shared::nats_crate::Client;
 use shared::{
     log::{debug, error},
@@ -17,10 +16,9 @@ use std::{
     future::Future,
     pin::Pin,
     str::from_utf8,
-    sync::Arc,
     task::{Context, Poll},
 };
-use twilight_model::gateway::event::{DispatchEvent};
+use twilight_model::gateway::event::DispatchEvent;
 use twilight_model::{
     application::interaction::{Interaction, InteractionType},
     gateway::payload::incoming::InteractionCreate,
@@ -28,14 +26,13 @@ use twilight_model::{
 
 /// Hyper service used to handle the discord webhooks
 #[derive(Clone)]
-pub struct HandlerService {
-    pub config: Arc<Config>,
-    pub nats: Arc<Client>,
-    pub public_key: Arc<PublicKey>,
+pub struct WebhookService {
+    pub config: WebhookConfig,
+    pub nats: Client,
 }
 
-impl HandlerService {
-    async fn check_request(&self, req: Request<Body>) -> Result<Bytes, WebhookError> {
+impl WebhookService {
+    async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {
         if req.method() == Method::POST {
             let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {
                 sig.to_owned()
@@ -57,7 +54,7 @@ impl HandlerService {
             let data = to_bytes(req.into_body()).await?;
 
             if validate_signature(
-                &self.public_key,
+                &pk,
                 &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),
                 signature.to_str()?,
             ) {
@@ -74,10 +71,11 @@ impl HandlerService {
     }
 
     async fn process_request(
-        &mut self,
         req: Request<Body>,
+        nats: Client,
+        pk: PublicKey,
     ) -> Result<Response<Body>, WebhookError> {
-        match self.check_request(req).await {
+        match Self::check_request(req, pk).await {
             Ok(data) => {
                 let utf8 = from_utf8(&data);
                 match utf8 {
@@ -86,7 +84,7 @@ impl HandlerService {
                             match value.kind {
                                 InteractionType::Ping => Ok(Response::builder()
                                     .header("Content-Type", "application/json")
-                                    .body(serde_json::to_string(&Ping { t: 1 }).unwrap().into())
+                                    .body(r#"{"t":1}"#.into())
                                     .unwrap()),
                                 _ => {
                                     debug!("calling nats");
@@ -106,10 +104,13 @@ impl HandlerService {
 
                                     let payload = serde_json::to_string(&data).unwrap();
 
-                                    match self.nats.request(
-                                        "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
-                                        Bytes::from(payload),
-                                    ).await {
+                                    match nats
+                                        .request(
+                                            "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
+                                            Bytes::from(payload),
+                                        )
+                                        .await
+                                    {
                                         Ok(response) => Ok(Response::builder()
                                             .header("Content-Type", "application/json")
                                             .body(Body::from(response.reply.unwrap()))
@@ -144,15 +145,9 @@ impl HandlerService {
     }
 }
 
-#[derive(Debug, Serialize, Deserialize)]
-pub struct Ping {
-    #[serde(rename = "type")]
-    t: i32,
-}
-
 /// Implementation of the service
-impl Service<Request<Body>> for HandlerService {
-    type Response = Response<Body>;
+impl Service<hyper::Request<Body>> for WebhookService {
+    type Response = hyper::Response<Body>;
     type Error = hyper::Error;
     type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
 
@@ -161,9 +156,10 @@ impl Service<Request<Body>> for HandlerService {
     }
 
     fn call(&mut self, req: Request<Body>) -> Self::Future {
-        let mut clone = self.clone();
+        let future =
+            Self::process_request(req, self.nats.clone(), self.config.discord.public_key);
         Box::pin(async move {
-            let response = clone.process_request(req).await;
+            let response = future.await;
 
             match response {
                 Ok(r) => Ok(r),
index 48672a17a386d08002ef0ec32923b665419a0a65..b51494a88280565ef52d6f37e5b70483debffc22 100644 (file)
@@ -1,22 +1,15 @@
-use super::handler::HandlerService;
-use crate::config::Config;
 use hyper::service::Service;
-use shared::nats_crate::Client;
 use std::{
     future::{ready, Ready},
-    sync::Arc,
     task::{Context, Poll},
 };
-use ed25519_dalek::PublicKey;
 
-pub struct MakeSvc {
-    pub settings: Arc<Config>,
-    pub nats: Arc<Client>,
-    pub public_key: Arc<PublicKey>
+pub struct MakeSvc<T: Clone> {
+    pub service: T,
 }
 
-impl<T> Service<T> for MakeSvc {
-    type Response = HandlerService;
+impl<T, V: Clone> Service<T> for MakeSvc<V> {
+    type Response = V;
     type Error = std::io::Error;
     type Future = Ready<Result<Self::Response, Self::Error>>;
 
@@ -25,10 +18,12 @@ impl<T> Service<T> for MakeSvc {
     }
 
     fn call(&mut self, _: T) -> Self::Future {
-        ready(Ok(HandlerService {
-            config: self.settings.clone(),
-            nats: self.nats.clone(),
-            public_key: self.public_key.clone()
-        }))
+        ready(Ok(self.service.clone()))
+    }
+}
+
+impl<T: Clone> MakeSvc<T> {
+    pub fn new(service: T) -> Self {
+        Self { service }
     }
 }
index 20a977a1473fb1d813d60fc8b53d892a082af7d4..e4cf35ae3dfc219a458f881ee5827428fb84c349 100644 (file)
@@ -1,5 +1,5 @@
 mod error;
-mod handler;
+pub mod handler;
 pub mod make_service;
 mod signature;
 
index 490143b6f733d0cf9eb3f2131be33c6090a103ac..0bed86a7544baae04d3eca4c6383beda6965d6e5 100644 (file)
@@ -5,7 +5,10 @@ use ed25519_dalek::PublicKey;
 fn validate_signature_test() {
     let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002";
     let content = "message de test incroyable".as_bytes().to_vec();
-    let public_key = PublicKey::from_bytes(&hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap()).unwrap();
+    let public_key = PublicKey::from_bytes(
+        &hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap(),
+    )
+    .unwrap();
 
     assert!(validate_signature(&public_key, &content, signature))
 }
@@ -13,7 +16,10 @@ fn validate_signature_test() {
 #[test]
 fn validate_signature_reverse_test() {
     let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002";
-    let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap();
+    let public_key = PublicKey::from_bytes(
+        &hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(),
+    )
+    .unwrap();
 
     let content = "ceci est un test qui ne fonctionnera pas!"
         .as_bytes()
@@ -24,10 +30,13 @@ fn validate_signature_reverse_test() {
 #[test]
 fn invalid_hex() {
     let signature = "zzz";
-    let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap();
+    let public_key = PublicKey::from_bytes(
+        &hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(),
+    )
+    .unwrap();
 
     let content = "ceci est un test qui ne fonctionnera pas!"
         .as_bytes()
         .to_vec();
     assert!(!validate_signature(&public_key, &content, signature))
-}
\ No newline at end of file
+}
index 336dd820385242e9ffade082c832cebe13326120..efd41474d438164314be9a6ad70119a8e6a55568 100644 (file)
@@ -1,45 +1,47 @@
-use std::{net::ToSocketAddrs, sync::Arc};
 mod config;
 mod handler;
-use crate::handler::make_service::MakeSvc;
+use std::{future::Future, pin::Pin};
 
-use crate::config::Config;
-use ed25519_dalek::PublicKey;
+use crate::{
+    config::WebhookConfig,
+    handler::{handler::WebhookService, make_service::MakeSvc},
+};
 use hyper::Server;
-use shared::config::Settings;
-use shared::log::{error, info};
+use leash::{ignite, AnyhowResultFuture, Component};
+use shared::{config::Settings, log::info, nats_crate::Client};
 
-#[tokio::main]
-async fn main() {
-    let settings: Settings<Config> = Settings::new("webhook").unwrap();
-    start(settings).await;
-}
+#[derive(Clone, Copy)]
+struct WebhookServer {}
+
+impl Component for WebhookServer {
+    type Config = WebhookConfig;
+    const SERVICE_NAME: &'static str = "webhook";
+
+    fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+        Box::pin(async move {
+            info!("Starting server on {}", settings.server.listening_adress);
+
+            let bind = settings.server.listening_adress;
+            let nats =
+                Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats)
+                    .await?;
+
+            let make_service = MakeSvc::new(WebhookService {
+                config: settings.config,
+                nats: nats.clone(),
+            });
 
-async fn start(settings: Settings<Config>) {
-    let addr = format!(
-        "{}:{}",
-        settings.config.server.address, settings.config.server.port
-    )
-    .to_socket_addrs()
-    .unwrap()
-    .next()
-    .unwrap();
-
-    info!(
-        "Starting server on {}:{}",
-        settings.config.server.address, settings.config.server.port
-    );
-
-    let config = Arc::new(settings.config);
-    let public_key =
-        Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap());
-    let server = Server::bind(&addr).serve(MakeSvc {
-        settings: config,
-        nats: Arc::new(settings.nats.to_client().await.unwrap()),
-        public_key: public_key,
-    });
-
-    if let Err(e) = server.await {
-        error!("server error: {}", e);
+            let server = Server::bind(&bind).serve(make_service);
+
+            server.await?;
+
+            Ok(())
+        })
+    }
+
+    fn new() -> Self {
+        Self {}
     }
 }
+
+ignite!(WebhookServer);
diff --git a/libs/leash/Cargo.toml b/libs/leash/Cargo.toml
new file mode 100644 (file)
index 0000000..5cd54a5
--- /dev/null
@@ -0,0 +1,13 @@
+[package]
+name = "leash"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+shared = { path = "../shared" }
+anyhow = "1.0.68"
+tokio = { version = "1.23.0", features = ["full"] }
+
+serde = "1.0.152"
\ No newline at end of file
diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs
new file mode 100644 (file)
index 0000000..360db12
--- /dev/null
@@ -0,0 +1,70 @@
+use anyhow::Result;
+use serde::de::DeserializeOwned;
+use shared::config::Settings;
+use std::{future::Future, pin::Pin};
+
+pub type AnyhowResultFuture<T> = Pin<Box<dyn Future<Output = Result<T>>>>;
+pub trait Component: Send + Sync + 'static + Sized {
+    type Config: Default + Clone + DeserializeOwned;
+
+    const SERVICE_NAME: &'static str;
+    fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()>;
+    fn new() -> Self;
+
+    fn _internal_start(self) -> AnyhowResultFuture<()> {
+        Box::pin(async move {
+            let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME);
+
+            // Start the grpc healthcheck
+            tokio::spawn(async move {});
+
+            // Start the prometheus monitoring job
+            tokio::spawn(async move {});
+
+            self.start(settings?).await
+        })
+    }
+}
+
+#[macro_export]
+macro_rules! ignite {
+    ($c:ty) => {
+        #[allow(dead_code)]
+        fn main() -> anyhow::Result<()> {
+            let rt = tokio::runtime::Runtime::new()?;
+            rt.block_on(Box::new(<$c as Component>::new())._internal_start())?;
+            Ok(())
+        }
+    };
+}
+
+#[cfg(test)]
+mod test {
+    use serde::Deserialize;
+
+    use crate::Component;
+
+    #[derive(Clone, Copy)]
+    struct TestComponent {}
+
+    #[derive(Default, Clone, Deserialize, Copy)]
+    struct TestComponentConfig {}
+
+    impl Component for TestComponent {
+        type Config = TestComponentConfig;
+        const SERVICE_NAME: &'static str = "test_component";
+
+        fn start(
+            &self,
+            _settings: shared::config::Settings<Self::Config>,
+        ) -> crate::AnyhowResultFuture<()> {
+            Box::pin(async move { Ok(()) })
+        }
+
+        fn new() -> Self {
+            Self {}
+        }
+    }
+    
+    ignite!(TestComponent);
+}
index c4f2f2aa4385ed4f8a7d1f1cf3f1e70195bc7588..2556dfec5a5da9ab1ae9d9b309c14f1780dedc2f 100644 (file)
@@ -4,3 +4,9 @@ version = "0.1.0"
 edition = "2018"
 
 [dependencies]
+tonic = "0.8.3"
+prost = "0.11.5"
+
+[build-dependencies]
+tonic-build = "0.8.4"
+glob = "0.3.0"
\ No newline at end of file
diff --git a/libs/proto/build.rs b/libs/proto/build.rs
new file mode 100644 (file)
index 0000000..80c3a55
--- /dev/null
@@ -0,0 +1,11 @@
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let paths: Vec<String> = glob::glob("../../proto/nova/**/*.proto")?
+        .map(|f| f.unwrap().to_str().unwrap().to_string())
+        .collect();
+
+    tonic_build::configure()
+        .include_file("genproto.rs")
+        .compile(&paths, &["../../proto"])?;
+
+    Ok(())
+}
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..01dc7bcb6aa01a9dca3b1ec4313a68e799006bc5 100644 (file)
@@ -0,0 +1 @@
+include!(concat!(env!("OUT_DIR"), concat!("/", "genproto.rs")));
index 6d6b6f67321ab0fb6e96aee138e9f930f4e9b541..ab19ce8e86c503decbf01676bb9fe6c91774f354 100644 (file)
@@ -19,6 +19,7 @@ twilight-model = "0.14"
 serde_json = { version = "1.0" }
 thiserror = "1.0.38"
 inner = "0.1.1"
+anyhow = "1.0.68"
 
 [dependencies.redis]
 version = "*"
index 52137a3fd7badfdbaaa352e2338f9618559bb460..4387dfb0cc8fcd2321d2e4fca1a18682731b4050 100644 (file)
@@ -1,17 +1,11 @@
-use std::env;
+use std::{env, ops::Deref};
 use config::{Config, Environment, File};
 use log::info;
-use serde::Deserialize;
+use serde::{Deserialize, de::DeserializeOwned};
 
 use crate::error::GenericError;
-
-/// Settings<T> is the base structure for all the nova's component config
-/// you can specify a type T and the name of the component. the "config"
-/// field will be equals to the key named after the given component name
-/// and will be of type T
 #[derive(Debug, Deserialize, Clone)]
-#[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))]
-pub struct Settings<T> {
+pub struct Settings<T: Clone + DeserializeOwned + Default> {
     #[serde(skip_deserializing)]
     pub config: T,
     pub monitoring: crate::monitoring::MonitoringConfiguration,
@@ -19,20 +13,11 @@ pub struct Settings<T> {
     pub redis: crate::redis::RedisConfiguration,
 }
 
-/// 
-impl<T> Settings<T>
-where
-    T: Deserialize<'static> + std::default::Default + Clone,
+impl<'de, T: Clone + DeserializeOwned + Default> Settings<T>
 {
-
-    /// Initializes a new configuration like the other components of nova
-    /// And starts the prometheus metrics server if needed.
     pub fn new(service_name: &str) -> Result<Settings<T>, GenericError> {
-        pretty_env_logger::init();
-
         let mut builder = Config::builder();
         
-        // this file my be shared with all the components
         builder = builder.add_source(File::with_name("config/default"));
         let mode = env::var("ENV").unwrap_or_else(|_| "development".into());
         info!("Configuration Environment: {}", mode);
@@ -49,13 +34,15 @@ where
 
         //  try to load the config
         settings.config = config.get::<T>(service_name)?;
-        
-        // start the monitoring system if needed
-        crate::monitoring::start_monitoring(&settings.monitoring);
+
         Ok(settings)
     }
 }
 
-pub fn test_init() {
-    pretty_env_logger::init();
-}
+impl<T: Clone + DeserializeOwned + Default>  Deref for Settings<T> {
+    type Target = T;
+
+    fn deref(&self) -> &Self::Target {
+        &self.config
+    }
+}
\ No newline at end of file
index 05953cc6f0c2abcb63f18457d93cd0d40b1f1cb6..dc922d5c01fa236917191ff9d0f1eee0e7170ead 100644 (file)
@@ -1,8 +1,7 @@
+use std::{future::Future, pin::Pin};
+
 use async_nats::Client;
 use serde::Deserialize;
-use std::future::Future;
-
-use crate::error::GenericError;
 
 #[derive(Clone, Debug, Deserialize)]
 pub struct NatsConfigurationClientCert {
@@ -20,10 +19,8 @@ pub struct NatsConfiguration {
     pub host: String,
 }
 
-// todo: Prefer From since it automatically gives a free Into implementation
-// Allows the configuration to directly create a nats connection
-impl NatsConfiguration {
-    pub async fn to_client(self) -> Result<Client, GenericError> {
-        Ok(async_nats::connect(self.host).await?)
+impl From<NatsConfiguration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>>>> {
+    fn from(value: NatsConfiguration) -> Self {
+        Box::pin(async move { Ok(async_nats::connect(value.host).await?) })
     }
-}
\ No newline at end of file
+}
index a196f8d688048c82af12edc8152587df3b7f4bf6..5753fb6649963317d864bd088cfd3dd05b72d0cb 100644 (file)
@@ -1,6 +1,6 @@
-use redis::Client;
+use redis::{aio::MultiplexedConnection, Client};
 use serde::Deserialize;
-
+use std::{future::Future, pin::Pin};
 
 #[derive(Clone, Debug, Deserialize)]
 pub struct RedisConfiguration {
@@ -13,3 +13,18 @@ impl Into<Client> for RedisConfiguration {
         redis::Client::open(self.url).unwrap()
     }
 }
+
+impl From<RedisConfiguration>
+    for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>>>>
+{
+    fn from(value: RedisConfiguration) -> Self {
+        Box::pin(async move {
+            let con = Client::open(value.url)?;
+            let (multiplex, ready) = con.create_multiplexed_tokio_connection().await?;
+
+            tokio::spawn(ready);
+
+            Ok(multiplex)
+        })
+    }
+}
diff --git a/proto/nova/management/nova.management.v1alpha.proto b/proto/nova/management/nova.management.v1alpha.proto
deleted file mode 100644 (file)
index d0d6baf..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-syntax = "proto3";
-package nova.management.v1alpha;
-
-message Empty {}
-
-// Represents the status of a shard
-enum ShardStatus {
-    DISCONNECTED = 0;
-    RUNNING = 1;
-    RECONNECTING = 2;
-}
-
-// represents the state of a nova shard
-message ShardStatusResponse {
-    // Status of the shard in the cluster
-     ShardStatus status = 1;
-    // Index of the discord shard
-     int64 identifier = 2;
-    // If the cluster have a node assigned
-    string cluster = 3;
-    // the websocket latency of the shard
-    int64 latency = 4;
-}
-
-message ShardStatusRequest {
-    // the id of the shard
-     int64 identifier = 1;
-}
-
-// represents the status of a cluster
-// (an instance of the gateway which holds multiple shards)
-message ClusterStatusResponse {
-    // the unique id of the cluster
-     string id = 1;
-    // the node the cluster is running on
-     string node = 2;
-    // the average latency of the cluster
-    int64 average_latency = 3;
-    // list of all the shards on the cluster
-    repeated ShardStatusResponse shards = 4;
-}
-
-message ClusterStatusRequest {
-     string id = 1;    
-}
-
-// Represents the status of all the nova clusters & shards
-message GlobalClusterStatusResponse {
-    int64 size = 1;
-    repeated ClusterStatusResponse shards = 2;
-}
-
-// used by the cli to interact with the nova manager
-service ManagementService {
-    rpc GetGlobalClusterStatus (Empty) returns (GlobalClusterStatusResponse);
-    rpc GetClusterStatus (ClusterStatusRequest) returns (ClusterStatusResponse);
-    rpc GetShardStatus (ShardStatusRequest) returns (ShardStatusResponse);
-}
\ No newline at end of file
diff --git a/proto/nova/management/rpc/nova.management.rpc.v1alpha.proto b/proto/nova/management/rpc/nova.management.rpc.v1alpha.proto
deleted file mode 100644 (file)
index 1ec0168..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-syntax = "proto3";
-
-import "common/management/nova.management.v1alpha.proto";
-package nova.management.rpc.v1alpha;
-
diff --git a/proto/nova/ratelimit/ratelimiter.proto b/proto/nova/ratelimit/ratelimiter.proto
new file mode 100644 (file)
index 0000000..34d5b6f
--- /dev/null
@@ -0,0 +1,37 @@
+syntax = "proto3";
+
+package nova.ratelimit.ratelimiter;
+
+service Ratelimiter {
+    rpc GetBucketInformation(BucketInformationRequest) returns (BucketInformationResponse);
+    rpc SubmitTicket(stream BucketSubmitTicketRequest) returns (stream BucketSubmitTicketResponse);
+}
+
+message BucketInformationRequest {
+    string path = 1;
+}
+
+message BucketInformationResponse {
+    uint64 limit = 1;
+    uint64 remaining = 2;
+    uint64 reset_after = 3;
+    uint64 started_at = 4;
+}
+
+
+message BucketSubmitTicketRequest {
+    oneof data {
+        string path = 1;
+        Headers headers = 2;
+    }
+
+    message Headers {
+        map<string, string> headers = 1;
+        uint64 precise_time = 2;
+    }
+    
+}
+
+message BucketSubmitTicketResponse {
+    int64 accepted = 1;
+}