From: MatthieuCoder Date: Mon, 2 Jan 2023 14:59:03 +0000 (+0400) Subject: restructure project X-Git-Tag: v0.1~32 X-Git-Url: https://git.puffer.fish/?a=commitdiff_plain;h=f8c2a144e2f3e47371f5e8352e7a7a0b6707bf88;p=matthieu%2Fnova.git restructure project --- diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index a0da8ab..06771a3 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -7,8 +7,6 @@ RUN apt update -y && apt install libssl-dev pkg-config apt-transport-https curl curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg && \ # Add docker repository apt source echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list && \ - # Add bazel repository apt source - echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | tee /etc/apt/sources.list.d/bazel.list && \ # Install docker apt update -y && apt install docker-ce-cli -y diff --git a/.gitignore b/.gitignore index 22904a1..cc5c15c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ target/ **/local* .ijwb -.idea \ No newline at end of file +.idea +config.yml diff --git a/Cargo.lock b/Cargo.lock index 943bb09..c76107e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" + [[package]] name = "arc-swap" version = "1.5.1" @@ -77,6 +83,27 @@ dependencies = [ "url", ] +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.60" @@ -105,6 +132,52 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "base64" version = "0.13.1" @@ -183,9 +256,11 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" name = "cache" version = "0.1.0" dependencies = [ + "anyhow", "async-nats", "futures-util", "log", + "proto", "redis", "serde", "serde_json", @@ -321,16 +396,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "ctor" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "curve25519-dalek" version = "3.2.0" @@ -464,6 +529,18 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" +[[package]] +name = "dns-lookup" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872" +dependencies = [ + "cfg-if", + "libc", + "socket2", + "winapi", +] + [[package]] name = "ed25519" version = "1.5.2" @@ -557,6 +634,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.25" @@ -691,8 +774,11 @@ dependencies = [ name = "gateway" version = "0.1.0" dependencies = [ + "anyhow", "bytes", "futures", + "leash", + "proto", "serde", "serde_json", "shared", @@ -733,6 +819,12 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + [[package]] name = "h2" version = "0.3.15" @@ -761,6 +853,21 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashring" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0ddd025eccd8a2fff9865e82ef4c8ce00c4a67709036847d95cf3ccffd07a8" +dependencies = [ + "siphasher", +] + +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -816,6 +923,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.8.0" @@ -875,6 +988,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1013,6 +1138,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "leash" +version = "0.1.0" +dependencies = [ + "anyhow", + "serde", + "shared", + "tokio", +] + [[package]] name = "libc" version = "0.2.139" @@ -1070,12 +1205,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + [[package]] name = "memchr" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1103,6 +1250,12 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "native-tls" version = "0.2.11" @@ -1349,6 +1502,16 @@ dependencies = [ "sha1", ] +[[package]] +name = "petgraph" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" version = "1.0.12" @@ -1415,6 +1578,16 @@ dependencies = [ "log", ] +[[package]] +name = "prettyplease" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c8992a85d8e93a28bdf76137db888d3874e3b230dee5ed8bebac4c9f7617773" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.49" @@ -1454,9 +1627,70 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c01db6702aa05baa3f57dec92b8eeeeb4cb19e894e73996b32a4093289e54592" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb5320c680de74ba083512704acb90fe00f28f79207286a848e730c45dd73ed6" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8842bad1a5419bca14eac663ba798f6bc19c413c2fdceb5f3ba3b0932d96720" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "017f79637768cde62820bc2d4fe0e45daaa027755c323ad077767c6c5f173091" +dependencies = [ + "bytes", + "prost", +] + [[package]] name = "proto" version = "0.1.0" +dependencies = [ + "glob", + "prost", + "tonic", + "tonic-build", +] [[package]] name = "protobuf" @@ -1550,6 +1784,24 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "ratelimit" +version = "0.1.0" +dependencies = [ + "anyhow", + "futures-util", + "hyper", + "proto", + "serde", + "serde_json", + "shared", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)", +] + [[package]] name = "redis" version = "0.22.1" @@ -1613,13 +1865,25 @@ dependencies = [ name = "rest" version = "0.1.0" dependencies = [ + "anyhow", + "dns-lookup", "futures-util", + "hashring", + "http", "hyper", "hyper-tls", "lazy_static", + "leash", + "proto", "serde", + "serde_json", "shared", "tokio", + "tokio-scoped", + "tokio-stream", + "tonic", + "tracing", + "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)", "xxhash-rust", ] @@ -1706,6 +1970,12 @@ dependencies = [ "base64", ] +[[package]] +name = "rustversion" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" + [[package]] name = "ryu" version = "1.0.12" @@ -1906,6 +2176,7 @@ dependencies = [ name = "shared" version = "0.1.0" dependencies = [ + "anyhow", "async-nats", "config", "enumflags2", @@ -1951,6 +2222,12 @@ version = "1.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +[[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + [[package]] name = "slab" version = "0.4.7" @@ -2023,6 +2300,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "synstructure" version = "0.12.6" @@ -2168,6 +2451,16 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.8.2" @@ -2211,6 +2504,27 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-scoped" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4beb8ba13bc53ac53ce1d52b42f02e5d8060f0f42138862869beb769722b256" +dependencies = [ + "tokio", + "tokio-stream", +] + +[[package]] +name = "tokio-stream" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.17.2" @@ -2250,6 +2564,96 @@ dependencies = [ "serde", ] +[[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -2263,6 +2667,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2288,6 +2693,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "try-lock" version = "0.2.3" @@ -2362,7 +2777,7 @@ dependencies = [ "serde_json", "tokio", "tracing", - "twilight-http-ratelimiting", + "twilight-http-ratelimiting 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "twilight-model", "twilight-validate", ] @@ -2379,6 +2794,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "twilight-http-ratelimiting" +version = "0.14.0" +source = "git+https://github.com/MatthieuCoder/twilight.git#a7953514373d3e3962435e6a539e0e2504a2c2fd" +dependencies = [ + "futures-util", + "http", + "tokio", + "tracing", +] + [[package]] name = "twilight-model" version = "0.14.0" @@ -2578,13 +3004,13 @@ dependencies = [ name = "webhook" version = "0.1.0" dependencies = [ - "ctor", + "anyhow", "ed25519-dalek", "hex", "hyper", "lazy_static", - "libc", - "rand 0.8.5", + "leash", + "proto", "serde", "serde_json", "shared", @@ -2602,6 +3028,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "which" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c831fbbee9e129a8cf93e7747a82da9d95ba8e16621cae60ec2cdc849bacb7b" +dependencies = [ + "either", + "libc", + "once_cell", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 501a768..785bb95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,9 @@ members = [ "exes/gateway/", "exes/rest/", "exes/webhook/", + "exes/ratelimit/", "libs/proto/", - "libs/shared/" + "libs/shared/", + "libs/leash/" ] \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index f671f9b..2472e45 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,26 +1,92 @@ version: "3.3" services: + nats: + image: bitnami/nats + restart: always + ports: + - 4222:4222 + - 8222:8222 + redis: + image: redis + cache: image: ghcr.io/discordnova/nova/cache + restart: always build: context: . args: - COMPONENT=cache + volumes: + - ./config.yml:/config/default.yml + environment: + - RUST_LOG=info + depends_on: + - nats + - redis + gateway: image: ghcr.io/discordnova/nova/gateway + restart: always build: context: . args: - COMPONENT=gateway + volumes: + - ./config.yml:/config/default.yml + environment: + - RUST_LOG=info + depends_on: + - nats + ports: + - 9000:9000 + rest: image: ghcr.io/discordnova/nova/rest + restart: always build: context: . args: - COMPONENT=rest + volumes: + - ./config.yml:/config/default.yml + environment: + - RUST_LOG=info + depends_on: + - ratelimit + ports: + - 9001:9000 + - 8080:8080 + webhook: image: ghcr.io/discordnova/nova/webhook + restart: always build: context: . args: - COMPONENT=webhook + volumes: + - ./config.yml:/config/default.yml + environment: + - RUST_LOG=info + depends_on: + - nats + ports: + - 9002:9000 + - 8081:8080 + ratelimit: + image: ghcr.io/discordnova/nova/ratelimit + restart: always + build: + context: . + args: + - COMPONENT=ratelimit + volumes: + - ./config.yml:/config/default.yml + environment: + - RUST_LOG=info + depends_on: + - nats + - redis + ports: + - 9003:9000 + - 8082:8080 \ No newline at end of file diff --git a/exes/.gitignore b/exes/.gitignore new file mode 100644 index 0000000..0f02d23 --- /dev/null +++ b/exes/.gitignore @@ -0,0 +1 @@ +ratelimit/ \ No newline at end of file diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml index 61bb449..9a7f9ee 100644 --- a/exes/cache/Cargo.toml +++ b/exes/cache/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] shared = { path = "../../libs/shared" } +proto = { path = "../../libs/proto" } async-nats = "0.25.1" tokio = { version = "1", features = ["full"] } serde = { version = "1.0.8", features = ["derive"] } @@ -14,4 +15,5 @@ log = { version = "0.4", features = ["std"] } serde_json = { version = "1.0" } redis = "*" futures-util = "*" -twilight-model = "0.14" \ No newline at end of file +twilight-model = "0.14" +anyhow = "1.0.68" \ No newline at end of file diff --git a/exes/cache/src/config.rs b/exes/cache/src/config.rs index 2aa71a8..3d9f5e2 100644 --- a/exes/cache/src/config.rs +++ b/exes/cache/src/config.rs @@ -1,6 +1,5 @@ use serde::Deserialize; - #[derive(Debug, Deserialize, Clone, Default)] pub struct CacheConfiguration { pub toggles: Vec -} \ No newline at end of file +} diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs index a74cfa6..312f960 100644 --- a/exes/cache/src/main.rs +++ b/exes/cache/src/main.rs @@ -1,7 +1,7 @@ -use std::error::Error; +use std::{error::Error, pin::Pin}; use async_nats::{Client, Subscriber}; -use futures_util::stream::StreamExt; +use futures_util::{stream::StreamExt, Future}; use log::info; use managers::{ automoderation::Automoderation, bans::Bans, channels::Channels, @@ -22,7 +22,7 @@ pub enum CacheSourcedEvents { } #[derive(Default)] -struct MegaCache { +struct Cache { automoderation: Automoderation, channels: Channels, bans: Bans, @@ -42,18 +42,18 @@ struct MegaCache { async fn main() -> Result<(), Box> { let settings: Settings = Settings::new("cache").unwrap(); info!("loaded configuration: {:?}", settings); - - let nats: Client = settings.nats.to_client().await?; + let nats = + Into::>>>>::into(settings.nats).await?; // let redis: redis::Client = settings.redis.into(); - let mut cache = MegaCache::default(); + let mut cache = Cache::default(); let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?; listen(&mut sub, &mut cache, settings.config.toggles).await; Ok(()) } -async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec) { +async fn listen(sub: &mut Subscriber, cache: &mut Cache, features: Vec) { while let Some(data) = sub.next().await { let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap(); let event = cp.data.data; diff --git a/exes/cache/src/managers/bans.rs b/exes/cache/src/managers/bans.rs index 27e6a34..24a3fcd 100644 --- a/exes/cache/src/managers/bans.rs +++ b/exes/cache/src/managers/bans.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents; use super::CacheManager; use std::future::Future; - #[derive(Default)] pub struct Bans {} impl CacheManager for Bans { diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml index 7ef3d98..d71ed4a 100644 --- a/exes/gateway/Cargo.toml +++ b/exes/gateway/Cargo.toml @@ -5,10 +5,13 @@ edition = "2018" [dependencies] shared = { path = "../../libs/shared" } +proto = { path = "../../libs/proto" } +leash = { path = "../../libs/leash" } tokio = { version = "1", features = ["full"] } twilight-gateway = { version = "0.14" } twilight-model = "0.14" serde = { version = "1.0.8", features = ["derive"] } futures = "0.3" serde_json = { version = "1.0" } -bytes = "*" \ No newline at end of file +bytes = "*" +anyhow = "*" \ No newline at end of file diff --git a/exes/gateway/src/config.rs b/exes/gateway/src/config.rs index 4c62de6..923ab30 100644 --- a/exes/gateway/src/config.rs +++ b/exes/gateway/src/config.rs @@ -2,13 +2,20 @@ use shared::serde::{Deserialize, Serialize}; use twilight_gateway::Intents; #[derive(Serialize, Deserialize, Clone)] -pub struct Config { +pub struct GatewayConfig { pub token: String, - pub intents: Intents + pub intents: Intents, + pub shard: u64, + pub shard_total: u64, } -impl Default for Config { +impl Default for GatewayConfig { fn default() -> Self { - Self { intents: Intents::empty(), token: String::default() } + Self { + intents: Intents::empty(), + token: String::default(), + shard_total: 1, + shard: 1, + } } } diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index 6968fe4..7957b08 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -1,51 +1,69 @@ -use config::Config; +use config::GatewayConfig; +use leash::{ignite, AnyhowResultFuture, Component}; use shared::{ config::Settings, log::{debug, info}, nats_crate::Client, payloads::{CachePayload, DispatchEventTagged, Tracing}, }; -use std::{convert::TryFrom, error::Error}; +use std::{convert::TryFrom, pin::Pin}; use twilight_gateway::{Event, Shard}; mod config; -use futures::StreamExt; +use futures::{Future, StreamExt}; use twilight_model::gateway::event::DispatchEvent; -#[tokio::main] -async fn main() -> Result<(), Box> { - let settings: Settings = Settings::new("gateway").unwrap(); - let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents); - let nats: Client = settings.nats.to_client().await?; +struct GatewayServer {} +impl Component for GatewayServer { + type Config = GatewayConfig; + const SERVICE_NAME: &'static str = "gateway"; - shard.start().await?; + fn start(&self, settings: Settings) -> AnyhowResultFuture<()> { + Box::pin(async move { + let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) + .shard(settings.shard, settings.shard_total)? + .build(); - while let Some(event) = events.next().await { - match event { - Event::Ready(ready) => { - info!("Logged in as {}", ready.user.name); - } + let nats = + Into::>>>>::into(settings.nats) + .await?; + + shard.start().await?; + + while let Some(event) = events.next().await { + match event { + Event::Ready(ready) => { + info!("Logged in as {}", ready.user.name); + } - _ => { - let name = event.kind().name(); - if let Ok(dispatch_event) = DispatchEvent::try_from(event) { - let data = CachePayload { - tracing: Tracing { - node_id: "".to_string(), - span: None, - }, - data: DispatchEventTagged { - data: dispatch_event, - }, - }; - let value = serde_json::to_string(&data)?; - debug!("nats send: {}", value); - let bytes = bytes::Bytes::from(value); - nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) - .await?; + _ => { + let name = event.kind().name(); + if let Ok(dispatch_event) = DispatchEvent::try_from(event) { + let data = CachePayload { + tracing: Tracing { + node_id: "".to_string(), + span: None, + }, + data: DispatchEventTagged { + data: dispatch_event, + }, + }; + let value = serde_json::to_string(&data)?; + debug!("nats send: {}", value); + let bytes = bytes::Bytes::from(value); + nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) + .await?; + } + } } } - } + + Ok(()) + }) } - Ok(()) + fn new() -> Self { + Self {} + } } + +ignite!(GatewayServer); diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml index 7b5b2b5..f4c5ecc 100644 --- a/exes/rest/Cargo.toml +++ b/exes/rest/Cargo.toml @@ -7,10 +7,23 @@ edition = "2018" [dependencies] shared = { path = "../../libs/shared" } +proto = { path = "../../libs/proto" } +leash = { path = "../../libs/leash" } + hyper = { version = "0.14", features = ["full"] } tokio = { version = "1", features = ["full"] } serde = { version = "1.0.8", features = ["derive"] } futures-util = "0.3.17" hyper-tls = "0.5.0" lazy_static = "1.4.0" -xxhash-rust = { version = "0.8.2", features = ["xxh32"] } \ No newline at end of file +xxhash-rust = { version = "0.8.2", features = ["xxh32"] } +twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" } +tracing = "0.1.37" +hashring = "0.3.0" +anyhow = "*" +tonic = "0.8.3" +serde_json = { version = "1.0" } +http = "0.2.8" +tokio-stream = "0.1.11" +dns-lookup = "1.0.8" +tokio-scoped = "0.2.0" \ No newline at end of file diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 559929f..9261de2 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -1,9 +1,21 @@ +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use serde::Deserialize; -#[derive(Debug, Deserialize, Clone, Default)] +fn default_listening_address() -> SocketAddr { + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)) +} + +#[derive(Debug, Deserialize, Clone)] pub struct ServerSettings { - pub port: u16, - pub address: String, + #[serde(default = "default_listening_address")] + pub listening_adress: SocketAddr +} +impl Default for ServerSettings { + fn default() -> Self { + Self { + listening_adress: default_listening_address(), + } + } } #[derive(Debug, Deserialize, Clone, Default)] @@ -12,7 +24,7 @@ pub struct Discord { } #[derive(Debug, Deserialize, Clone, Default)] -pub struct Config { +pub struct ReverseProxyConfig { pub server: ServerSettings, pub discord: Discord, } diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs new file mode 100644 index 0000000..8b0dd52 --- /dev/null +++ b/exes/rest/src/handler.rs @@ -0,0 +1,141 @@ +use std::{ + collections::hash_map::DefaultHasher, + convert::TryFrom, + hash::{Hash, Hasher}, + str::FromStr, +}; + +use anyhow::bail; +use http::{ + header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE}, + HeaderValue, Method as HttpMethod, Request, Response, Uri, +}; +use hyper::{client::HttpConnector, Body, Client}; +use hyper_tls::HttpsConnector; +use shared::log::error; +use twilight_http_ratelimiting::{Method, Path}; + +use crate::ratelimit_client::RemoteRatelimiter; + +/// Normalizes the path +fn normalize_path(request_path: &str) -> (&str, &str) { + if let Some(trimmed_path) = request_path.strip_prefix("/api") { + if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) { + if let Some(version_number) = maybe_api_version.strip_prefix('v') { + if version_number.parse::().is_ok() { + let len = "/api/v".len() + version_number.len(); + return (&request_path[..len], &request_path[len..]); + }; + }; + } + + ("/api", trimmed_path) + } else { + ("/api", request_path) + } +} + +pub async fn handle_request( + client: Client, Body>, + ratelimiter: RemoteRatelimiter, + token: String, + mut request: Request, +) -> Result, anyhow::Error> { + let (hash, uri_string) = { + let method = match *request.method() { + HttpMethod::DELETE => Method::Delete, + HttpMethod::GET => Method::Get, + HttpMethod::PATCH => Method::Patch, + HttpMethod::POST => Method::Post, + HttpMethod::PUT => Method::Put, + _ => { + error!("Unsupported HTTP method in request, {}", request.method()); + bail!("unsupported method"); + } + }; + + let request_path = request.uri().path(); + let (api_path, trimmed_path) = normalize_path(&request_path); + + let mut uri_string = format!("http://192.168.0.27:8000{}{}", api_path, trimmed_path); + if let Some(query) = request.uri().query() { + uri_string.push('?'); + uri_string.push_str(query); + } + + let mut hash = DefaultHasher::new(); + match Path::try_from((method, trimmed_path)) { + Ok(path) => path, + Err(e) => { + error!( + "Failed to parse path for {:?} {}: {:?}", + method, trimmed_path, e + ); + bail!("failed o parse"); + } + } + .hash(&mut hash); + + (hash.finish().to_string(), uri_string) + }; + + let header_sender = match ratelimiter.ticket(hash).await { + Ok(sender) => sender, + Err(e) => { + error!("Failed to receive ticket for ratelimiting: {:?}", e); + bail!("failed to reteive ticket"); + } + }; + + request.headers_mut().insert( + AUTHORIZATION, + HeaderValue::from_bytes(token.as_bytes()) + .expect("strings are guaranteed to be valid utf-8"), + ); + request + .headers_mut() + .insert(HOST, HeaderValue::from_static("discord.com")); + + // Remove forbidden HTTP/2 headers + // https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2 + request.headers_mut().remove(CONNECTION); + request.headers_mut().remove("keep-alive"); + request.headers_mut().remove("proxy-connection"); + request.headers_mut().remove(TRANSFER_ENCODING); + request.headers_mut().remove(UPGRADE); + request.headers_mut().remove(AUTHORIZATION); + request.headers_mut().append( + AUTHORIZATION, + HeaderValue::from_static( + "Bot ODA3MTg4MzM1NzE3Mzg0MjEy.G3sXFM.8gY2sVYDAq2WuPWwDskAAEFLfTg8htooxME-LE", + ), + ); + + let uri = match Uri::from_str(&uri_string) { + Ok(uri) => uri, + Err(e) => { + error!("Failed to create URI for requesting Discord API: {:?}", e); + bail!("failed to create uri"); + } + }; + *request.uri_mut() = uri; + let resp = match client.request(request).await { + Ok(response) => response, + Err(e) => { + error!("Error when requesting the Discord API: {:?}", e); + bail!("failed to request the discord api"); + } + }; + + let ratelimit_headers = resp + .headers() + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string())) + .collect(); + + if header_sender.send(ratelimit_headers).is_err() { + error!("Error when sending ratelimit headers to ratelimiter"); + }; + + Ok(resp) +} diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs index 9fa6ce7..8d014ab 100644 --- a/exes/rest/src/main.rs +++ b/exes/rest/src/main.rs @@ -1,46 +1,56 @@ -use std::{convert::Infallible, sync::Arc}; +use config::ReverseProxyConfig; -use crate::{config::Config, ratelimit::Ratelimiter}; -use shared::{ - config::Settings, - log::{error, info}, - redis_crate::Client, +use handler::handle_request; +use hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Client, Request, Server, }; -use hyper::{server::conn::AddrStream, service::make_service_fn, Server}; -use std::net::ToSocketAddrs; -use tokio::sync::Mutex; - -use crate::proxy::ServiceProxy; +use hyper_tls::HttpsConnector; +use leash::{ignite, AnyhowResultFuture, Component}; +use shared::config::Settings; +use std::convert::Infallible; mod config; -mod proxy; -mod ratelimit; - -#[tokio::main] -async fn main() { - let settings: Settings = Settings::new("rest").unwrap(); - let config = Arc::new(settings.config); - let redis_client: Client = settings.redis.into(); - let redis = Arc::new(Mutex::new( - redis_client.get_async_connection().await.unwrap(), - )); - let ratelimiter = Arc::new(Ratelimiter::new(redis)); - - let addr = format!("{}:{}", config.server.address, config.server.port) - .to_socket_addrs() - .unwrap() - .next() - .unwrap(); - - let service_fn = make_service_fn(move |_: &AddrStream| { - let service_proxy = ServiceProxy::new(config.clone(), ratelimiter.clone()); - async move { Ok::<_, Infallible>(service_proxy) } - }); - - let server = Server::bind(&addr).serve(service_fn); - - info!("starting ratelimit server"); - if let Err(e) = server.await { - error!("server error: {}", e); +mod handler; +mod ratelimit_client; + +struct ReverseProxyServer {} +impl Component for ReverseProxyServer { + type Config = ReverseProxyConfig; + const SERVICE_NAME: &'static str = "rest"; + + fn start(&self, settings: Settings) -> AnyhowResultFuture<()> { + Box::pin(async move { + // Client to the remote ratelimiters + let ratelimiter = ratelimit_client::RemoteRatelimiter::new(); + let client = Client::builder().build(HttpsConnector::new()); + + let service_fn = make_service_fn(move |_: &AddrStream| { + let client = client.clone(); + let ratelimiter = ratelimiter.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |request: Request| { + let client = client.clone(); + let ratelimiter = ratelimiter.clone(); + async move { + handle_request(client, ratelimiter, "token".to_string(), request).await + } + })) + } + }); + + let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn); + + server.await?; + + Ok(()) + }) + } + + fn new() -> Self { + Self {} } } + +ignite!(ReverseProxyServer); diff --git a/exes/rest/src/proxy/mod.rs b/exes/rest/src/proxy/mod.rs deleted file mode 100644 index 65d77aa..0000000 --- a/exes/rest/src/proxy/mod.rs +++ /dev/null @@ -1,138 +0,0 @@ -use crate::{config::Config, ratelimit::Ratelimiter}; -use hyper::{ - client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client, - Request, Response, Uri, -}; -use hyper_tls::HttpsConnector; -use shared::{ - log::debug, - prometheus::{labels, opts, register_counter, register_histogram_vec, Counter, HistogramVec}, -}; -use std::{future::Future, pin::Pin, sync::Arc, task::Poll}; -use tokio::sync::Mutex; - -lazy_static::lazy_static! { - static ref HTTP_COUNTER: Counter = register_counter!(opts!( - "nova_rest_http_requests_total", - "Number of HTTP requests made.", - labels! {"handler" => "all",} - )) - .unwrap(); - - static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!( - "nova_rest_http_request_duration_seconds", - "The HTTP request latencies in seconds.", - &["handler"] - ) - .unwrap(); - - static ref HTTP_COUNTER_STATUS: Counter = register_counter!(opts!( - "nova_rest_http_requests_status", - "Number of HTTP requests made by status", - labels! {"" => ""} - )) - .unwrap(); -} - -#[derive(Clone)] -pub struct ServiceProxy { - client: Client>, - ratelimiter: Arc, - config: Arc, - fail: Arc>, -} - -impl Service> for ServiceProxy { - type Response = Response; - type Error = hyper::Error; - type Future = Pin> + Send>>; - - fn poll_ready( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match self.client.poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, - } - } - - fn call(&mut self, mut req: Request) -> Self::Future { - HTTP_COUNTER.inc(); - - let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer(); - let host = "discord.com"; - let mut new_parts = Parts::default(); - - let path = req.uri().path().to_string(); - - new_parts.scheme = Some("https".parse().unwrap()); - new_parts.authority = Some(host.parse().unwrap()); - new_parts.path_and_query = Some(path.parse().unwrap()); - - *req.uri_mut() = Uri::from_parts(new_parts).unwrap(); - - let headers = req.headers_mut(); - headers.remove("user-agent"); - headers.insert("Host", HeaderValue::from_str("discord.com").unwrap()); - headers.insert( - "Authorization", - HeaderValue::from_str(&format!("Bot {}", self.config.discord.token)).unwrap(), - ); - - println!("{:?}", headers); - - let client = self.client.clone(); - let ratelimiter = self.ratelimiter.clone(); - let fail = self.fail.clone(); - - return Box::pin(async move { - let resp = match ratelimiter.before_request(&req).await { - Ok(allowed) => match allowed { - crate::ratelimit::RatelimiterResponse::Ratelimited => { - debug!("ratelimited"); - Ok(Response::builder().body("ratelimited".into()).unwrap()) - } - _ => { - debug!("forwarding request"); - match client.request(req).await { - Ok(mut response) => { - ratelimiter.after_request(&path, &response).await; - if response.status() != 200 { - *fail.lock().await += 1 - } - response.headers_mut().insert( - "x-fails", - HeaderValue::from_str(&format!("{}", fail.lock().await)) - .unwrap(), - ); - Ok(response) - } - Err(e) => Err(e), - } - } - }, - Err(e) => Ok(Response::builder() - .body(format!("server error: {}", e).into()) - .unwrap()), - }; - timer.observe_duration(); - resp - }); - } -} - -impl ServiceProxy { - pub fn new(config: Arc, ratelimiter: Arc) -> Self { - let https = HttpsConnector::new(); - let client = Client::builder().build::<_, hyper::Body>(https); - let fail = Arc::new(Mutex::new(0)); - ServiceProxy { - client, - config, - ratelimiter, - fail, - } - } -} diff --git a/exes/rest/src/ratelimit/mod.rs b/exes/rest/src/ratelimit/mod.rs deleted file mode 100644 index 132bfd3..0000000 --- a/exes/rest/src/ratelimit/mod.rs +++ /dev/null @@ -1,155 +0,0 @@ -use shared::{ - log::debug, - redis_crate::{aio::Connection, AsyncCommands}, error::GenericError, -}; -use hyper::{Body, Request, Response}; -use std::{ - convert::TryInto, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; -use tokio::sync::Mutex; -use xxhash_rust::xxh32::xxh32; - -pub enum RatelimiterResponse { - NoSuchUrl, - Ratelimited, - Pass, -} - -pub struct Ratelimiter { - redis: Arc>, -} - -impl Ratelimiter { - pub fn new(redis: Arc>) -> Ratelimiter { - return Ratelimiter { redis }; - } - - pub async fn before_request( - &self, - request: &Request, - ) -> Result { - // we lookup if the route hash is stored in the redis table - let path = request.uri().path(); - let hash = xxh32(path.as_bytes(), 32); - let mut redis = self.redis.lock().await; - - let start = SystemTime::now(); - let since_the_epoch = start - .duration_since(UNIX_EPOCH) - .expect("Time went backwards"); - - // global rate litmit - match redis - .get::>(format!( - "nova:rest:ratelimit:global:{}", - since_the_epoch.as_secs() - )) - .await - { - Ok(value) => { - match value { - Some(value) => { - debug!("incr: {}", value); - if value >= 49 { - return Ok(RatelimiterResponse::Ratelimited); - } - } - None => { - let key = - format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()); - // init global ratelimit - redis.set_ex::(key, 0, 2).await.unwrap(); - } - } - } - Err(_) => { - return Err(GenericError::StepFailed("radis ratelimit check".to_string())); - } - }; - - // we lookup the corresponding bucket for this url - match redis - .get::>(format!("nova:rest:ratelimit:url_bucket:{}", hash)) - .await - { - Ok(bucket) => match bucket { - Some(bucket) => { - match redis - .exists::(format!("nova:rest:ratelimit:lock:{}", bucket)) - .await - { - Ok(exists) => { - if exists { - Ok(RatelimiterResponse::Ratelimited) - } else { - Ok(RatelimiterResponse::Pass) - } - } - Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())), - } - } - None => Ok(RatelimiterResponse::NoSuchUrl), - }, - Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())), - } - } - - fn parse_headers(&self, response: &Response) -> Option<(String, i32, i32)> { - if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") { - let bucket = bucket.to_str().unwrap().to_string(); - - let remaining = response.headers().get("X-RateLimit-Remaining").unwrap(); - let reset = response.headers().get("X-RateLimit-Reset-After").unwrap(); - - let remaining_i32 = remaining.to_str().unwrap().parse::().unwrap(); - let reset_ms_i32 = reset.to_str().unwrap().parse::().unwrap().ceil() as i32; - return Some((bucket, remaining_i32, reset_ms_i32)); - } else { - None - } - } - - pub async fn after_request(&self, path: &str, response: &Response) { - let hash = xxh32(path.as_bytes(), 32); - // verified earlier - - let mut redis = self.redis.lock().await; - - let start = SystemTime::now(); - let since_the_epoch = start - .duration_since(UNIX_EPOCH) - .expect("Time went backwards"); - - redis - .incr::( - format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()), - 1, - ) - .await - .unwrap(); - if let Some((bucket, remaining, reset)) = self.parse_headers(response) { - if remaining <= 1 { - // we set a lock for the bucket until the timeout passes - redis - .set_ex::( - format!("nova:rest:ratelimit:lock:{}", bucket), - true, - reset.try_into().unwrap(), - ) - .await - .unwrap(); - } - - redis - .set_ex::( - format!("nova:rest:ratelimit:url_bucket:{}", hash), - bucket, - reset.try_into().unwrap(), - ) - .await - .unwrap(); - } - } -} diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs new file mode 100644 index 0000000..8263d15 --- /dev/null +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -0,0 +1,137 @@ +use self::remote_hashring::{HashRingWrapper, VNode}; +use futures_util::Future; +use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers}; +use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest; +use shared::log::debug; +use std::collections::HashMap; +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::Arc; +use std::time::UNIX_EPOCH; +use std::time::{Duration, SystemTime}; +use tokio::sync::oneshot::{self}; +use tokio::sync::{broadcast, mpsc, RwLock}; +use tokio_stream::wrappers::ReceiverStream; + +mod remote_hashring; + +#[derive(Clone, Debug)] +pub struct RemoteRatelimiter { + remotes: Arc>, + stop: Arc>, +} + +impl Drop for RemoteRatelimiter { + fn drop(&mut self) { + self.stop.clone().send(()).unwrap(); + } +} + +impl RemoteRatelimiter { + async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> { + // get list of dns responses + let responses = dns_lookup::lookup_host("ratelimit") + .unwrap() + .into_iter() + .map(|f| f.to_string()); + + let mut write = self.remotes.write().await; + + for ip in responses { + let a = VNode::new(ip.into()).await?; + write.add(a.clone()); + } + + return Ok(()); + } + + #[must_use] + pub fn new() -> Self { + let (rx, mut tx) = broadcast::channel(1); + let obj = Self { + remotes: Arc::new(RwLock::new(HashRingWrapper::default())), + stop: Arc::new(rx), + }; + + let obj_clone = obj.clone(); + // Task to update the ratelimiters in the background + tokio::spawn(async move { + loop { + let sleep = tokio::time::sleep(Duration::from_secs(10)); + tokio::pin!(sleep); + + debug!("refreshing"); + obj_clone.get_ratelimiters().await.unwrap(); + tokio::select! { + () = &mut sleep => { + println!("timer elapsed"); + }, + _ = tx.recv() => {} + } + } + }); + + obj + } + + pub fn ticket( + &self, + path: String, + ) -> Pin< + Box< + dyn Future>>> + + Send + + 'static, + >, + > { + let remotes = self.remotes.clone(); + let (tx, rx) = oneshot::channel::>(); + + Box::pin(async move { + // Get node managing this path + let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); + + // Buffers for the gRPC streaming channel. + let (send, remote) = mpsc::channel(5); + let (do_request, wait) = oneshot::channel(); + // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it + let stream = ReceiverStream::new(remote); + + // Start the grpc streaming + let ticket = node.submit_ticket(stream).await?; + + // First, send the request + send.send(BucketSubmitTicketRequest { + data: Some(Data::Path(path)), + }) + .await?; + + // We continuously listen for events in the channel. + tokio::spawn(async move { + let message = ticket.into_inner().message().await.unwrap().unwrap(); + + if message.accepted == 1 { + do_request.send(()).unwrap(); + let headers = rx.await.unwrap(); + + send.send(BucketSubmitTicketRequest { + data: Some(Data::Headers(Headers { + precise_time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_millis() as u64, + headers, + })), + }) + .await + .unwrap(); + } + }); + + // Wait for the message to be sent + wait.await?; + + Ok(tx) + }) + } +} diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs new file mode 100644 index 0000000..b9f7800 --- /dev/null +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -0,0 +1,67 @@ +use core::fmt::Debug; +use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient; +use std::hash::Hash; +use std::ops::Deref; +use std::ops::DerefMut; +use tonic::transport::Channel; + +#[derive(Debug, Clone)] +pub struct VNode { + address: String, + + client: RatelimiterClient, +} + +impl Deref for VNode { + type Target = RatelimiterClient; + + fn deref(&self) -> &Self::Target { + &self.client + } +} + +impl DerefMut for VNode { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.client + } +} + +impl Hash for VNode { + fn hash(&self, state: &mut H) { + self.address.hash(state); + } +} + +impl VNode { + pub async fn new(address: String) -> Result { + let client = RatelimiterClient::connect(format!("http://{}:8080", address.clone())).await?; + + Ok(VNode { client, address }) + } +} + +unsafe impl Send for VNode {} + +#[repr(transparent)] +#[derive(Default)] +pub struct HashRingWrapper(hashring::HashRing); + +impl Deref for HashRingWrapper { + type Target = hashring::HashRing; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for HashRingWrapper { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Debug for HashRingWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("HashRing").finish() + } +} diff --git a/exes/webhook/Cargo.toml b/exes/webhook/Cargo.toml index 1d6a5a7..12a6608 100644 --- a/exes/webhook/Cargo.toml +++ b/exes/webhook/Cargo.toml @@ -7,15 +7,16 @@ edition = "2018" hyper = { version = "0.14", features = ["full"] } tokio = { version = "1", features = ["full"] } shared = { path = "../../libs/shared" } +proto = { path = "../../libs/proto" } +leash = { path = "../../libs/leash" } + serde = { version = "1.0.8", features = ["derive"] } hex = "0.4.3" serde_json = { version = "1.0" } -libc = "0.2.101" lazy_static = "1.4.0" -ctor = "0.1.21" ed25519-dalek = "1" twilight-model = { version = "0.14" } -rand = "0.8" +anyhow = "1.0.68" [[bin]] name = "webhook" diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs index a054d33..68f6a5f 100644 --- a/exes/webhook/src/config.rs +++ b/exes/webhook/src/config.rs @@ -1,19 +1,43 @@ -use serde::Deserialize; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -#[derive(Debug, Deserialize, Clone, Default)] +use ed25519_dalek::PublicKey; +use serde::{Deserialize, Deserializer}; + +fn default_listening_address() -> SocketAddr { + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)) +} + +#[derive(Debug, Deserialize, Clone, Copy)] pub struct ServerSettings { - pub port: u16, - pub address: String, + #[serde(default = "default_listening_address")] + pub listening_adress: SocketAddr, +} +impl Default for ServerSettings { + fn default() -> Self { + Self { + listening_adress: default_listening_address(), + } + } +} + +fn deserialize_pk<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let str = String::deserialize(deserializer)?; + let public_key = PublicKey::from_bytes(&hex::decode(&str).unwrap()).unwrap(); + Ok(public_key) } -#[derive(Debug, Deserialize, Clone, Default)] +#[derive(Debug, Deserialize, Clone, Default, Copy)] pub struct Discord { - pub public_key: String, + #[serde(deserialize_with = "deserialize_pk")] + pub public_key: PublicKey, pub client_id: u32, } -#[derive(Debug, Deserialize, Clone, Default)] -pub struct Config { +#[derive(Debug, Deserialize, Clone, Default, Copy)] +pub struct WebhookConfig { pub server: ServerSettings, pub discord: Discord, } diff --git a/exes/webhook/src/handler/handler.rs b/exes/webhook/src/handler/handler.rs index b2ef44c..af79185 100644 --- a/exes/webhook/src/handler/handler.rs +++ b/exes/webhook/src/handler/handler.rs @@ -1,13 +1,12 @@ use super::error::WebhookError; use super::signature::validate_signature; -use crate::config::Config; +use crate::config::WebhookConfig; use ed25519_dalek::PublicKey; use hyper::{ body::{to_bytes, Bytes}, service::Service, Body, Method, Request, Response, StatusCode, }; -use serde::{Deserialize, Serialize}; use shared::nats_crate::Client; use shared::{ log::{debug, error}, @@ -17,10 +16,9 @@ use std::{ future::Future, pin::Pin, str::from_utf8, - sync::Arc, task::{Context, Poll}, }; -use twilight_model::gateway::event::{DispatchEvent}; +use twilight_model::gateway::event::DispatchEvent; use twilight_model::{ application::interaction::{Interaction, InteractionType}, gateway::payload::incoming::InteractionCreate, @@ -28,14 +26,13 @@ use twilight_model::{ /// Hyper service used to handle the discord webhooks #[derive(Clone)] -pub struct HandlerService { - pub config: Arc, - pub nats: Arc, - pub public_key: Arc, +pub struct WebhookService { + pub config: WebhookConfig, + pub nats: Client, } -impl HandlerService { - async fn check_request(&self, req: Request) -> Result { +impl WebhookService { + async fn check_request(req: Request, pk: PublicKey) -> Result { if req.method() == Method::POST { let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") { sig.to_owned() @@ -57,7 +54,7 @@ impl HandlerService { let data = to_bytes(req.into_body()).await?; if validate_signature( - &self.public_key, + &pk, &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(), signature.to_str()?, ) { @@ -74,10 +71,11 @@ impl HandlerService { } async fn process_request( - &mut self, req: Request, + nats: Client, + pk: PublicKey, ) -> Result, WebhookError> { - match self.check_request(req).await { + match Self::check_request(req, pk).await { Ok(data) => { let utf8 = from_utf8(&data); match utf8 { @@ -86,7 +84,7 @@ impl HandlerService { match value.kind { InteractionType::Ping => Ok(Response::builder() .header("Content-Type", "application/json") - .body(serde_json::to_string(&Ping { t: 1 }).unwrap().into()) + .body(r#"{"t":1}"#.into()) .unwrap()), _ => { debug!("calling nats"); @@ -106,10 +104,13 @@ impl HandlerService { let payload = serde_json::to_string(&data).unwrap(); - match self.nats.request( - "nova.cache.dispatch.INTERACTION_CREATE".to_string(), - Bytes::from(payload), - ).await { + match nats + .request( + "nova.cache.dispatch.INTERACTION_CREATE".to_string(), + Bytes::from(payload), + ) + .await + { Ok(response) => Ok(Response::builder() .header("Content-Type", "application/json") .body(Body::from(response.reply.unwrap())) @@ -144,15 +145,9 @@ impl HandlerService { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct Ping { - #[serde(rename = "type")] - t: i32, -} - /// Implementation of the service -impl Service> for HandlerService { - type Response = Response; +impl Service> for WebhookService { + type Response = hyper::Response; type Error = hyper::Error; type Future = Pin> + Send>>; @@ -161,9 +156,10 @@ impl Service> for HandlerService { } fn call(&mut self, req: Request) -> Self::Future { - let mut clone = self.clone(); + let future = + Self::process_request(req, self.nats.clone(), self.config.discord.public_key); Box::pin(async move { - let response = clone.process_request(req).await; + let response = future.await; match response { Ok(r) => Ok(r), diff --git a/exes/webhook/src/handler/make_service.rs b/exes/webhook/src/handler/make_service.rs index 48672a1..b51494a 100644 --- a/exes/webhook/src/handler/make_service.rs +++ b/exes/webhook/src/handler/make_service.rs @@ -1,22 +1,15 @@ -use super::handler::HandlerService; -use crate::config::Config; use hyper::service::Service; -use shared::nats_crate::Client; use std::{ future::{ready, Ready}, - sync::Arc, task::{Context, Poll}, }; -use ed25519_dalek::PublicKey; -pub struct MakeSvc { - pub settings: Arc, - pub nats: Arc, - pub public_key: Arc +pub struct MakeSvc { + pub service: T, } -impl Service for MakeSvc { - type Response = HandlerService; +impl Service for MakeSvc { + type Response = V; type Error = std::io::Error; type Future = Ready>; @@ -25,10 +18,12 @@ impl Service for MakeSvc { } fn call(&mut self, _: T) -> Self::Future { - ready(Ok(HandlerService { - config: self.settings.clone(), - nats: self.nats.clone(), - public_key: self.public_key.clone() - })) + ready(Ok(self.service.clone())) + } +} + +impl MakeSvc { + pub fn new(service: T) -> Self { + Self { service } } } diff --git a/exes/webhook/src/handler/mod.rs b/exes/webhook/src/handler/mod.rs index 20a977a..e4cf35a 100644 --- a/exes/webhook/src/handler/mod.rs +++ b/exes/webhook/src/handler/mod.rs @@ -1,5 +1,5 @@ mod error; -mod handler; +pub mod handler; pub mod make_service; mod signature; diff --git a/exes/webhook/src/handler/tests/signature.rs b/exes/webhook/src/handler/tests/signature.rs index 490143b..0bed86a 100644 --- a/exes/webhook/src/handler/tests/signature.rs +++ b/exes/webhook/src/handler/tests/signature.rs @@ -5,7 +5,10 @@ use ed25519_dalek::PublicKey; fn validate_signature_test() { let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002"; let content = "message de test incroyable".as_bytes().to_vec(); - let public_key = PublicKey::from_bytes(&hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap()).unwrap(); + let public_key = PublicKey::from_bytes( + &hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap(), + ) + .unwrap(); assert!(validate_signature(&public_key, &content, signature)) } @@ -13,7 +16,10 @@ fn validate_signature_test() { #[test] fn validate_signature_reverse_test() { let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002"; - let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap(); + let public_key = PublicKey::from_bytes( + &hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(), + ) + .unwrap(); let content = "ceci est un test qui ne fonctionnera pas!" .as_bytes() @@ -24,10 +30,13 @@ fn validate_signature_reverse_test() { #[test] fn invalid_hex() { let signature = "zzz"; - let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap(); + let public_key = PublicKey::from_bytes( + &hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(), + ) + .unwrap(); let content = "ceci est un test qui ne fonctionnera pas!" .as_bytes() .to_vec(); assert!(!validate_signature(&public_key, &content, signature)) -} \ No newline at end of file +} diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs index 336dd82..efd4147 100644 --- a/exes/webhook/src/main.rs +++ b/exes/webhook/src/main.rs @@ -1,45 +1,47 @@ -use std::{net::ToSocketAddrs, sync::Arc}; mod config; mod handler; -use crate::handler::make_service::MakeSvc; +use std::{future::Future, pin::Pin}; -use crate::config::Config; -use ed25519_dalek::PublicKey; +use crate::{ + config::WebhookConfig, + handler::{handler::WebhookService, make_service::MakeSvc}, +}; use hyper::Server; -use shared::config::Settings; -use shared::log::{error, info}; +use leash::{ignite, AnyhowResultFuture, Component}; +use shared::{config::Settings, log::info, nats_crate::Client}; -#[tokio::main] -async fn main() { - let settings: Settings = Settings::new("webhook").unwrap(); - start(settings).await; -} +#[derive(Clone, Copy)] +struct WebhookServer {} + +impl Component for WebhookServer { + type Config = WebhookConfig; + const SERVICE_NAME: &'static str = "webhook"; + + fn start(&self, settings: Settings) -> AnyhowResultFuture<()> { + Box::pin(async move { + info!("Starting server on {}", settings.server.listening_adress); + + let bind = settings.server.listening_adress; + let nats = + Into::>>>>::into(settings.nats) + .await?; + + let make_service = MakeSvc::new(WebhookService { + config: settings.config, + nats: nats.clone(), + }); -async fn start(settings: Settings) { - let addr = format!( - "{}:{}", - settings.config.server.address, settings.config.server.port - ) - .to_socket_addrs() - .unwrap() - .next() - .unwrap(); - - info!( - "Starting server on {}:{}", - settings.config.server.address, settings.config.server.port - ); - - let config = Arc::new(settings.config); - let public_key = - Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap()); - let server = Server::bind(&addr).serve(MakeSvc { - settings: config, - nats: Arc::new(settings.nats.to_client().await.unwrap()), - public_key: public_key, - }); - - if let Err(e) = server.await { - error!("server error: {}", e); + let server = Server::bind(&bind).serve(make_service); + + server.await?; + + Ok(()) + }) + } + + fn new() -> Self { + Self {} } } + +ignite!(WebhookServer); diff --git a/libs/leash/Cargo.toml b/libs/leash/Cargo.toml new file mode 100644 index 0000000..5cd54a5 --- /dev/null +++ b/libs/leash/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "leash" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +shared = { path = "../shared" } +anyhow = "1.0.68" +tokio = { version = "1.23.0", features = ["full"] } + +serde = "1.0.152" \ No newline at end of file diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs new file mode 100644 index 0000000..360db12 --- /dev/null +++ b/libs/leash/src/lib.rs @@ -0,0 +1,70 @@ +use anyhow::Result; +use serde::de::DeserializeOwned; +use shared::config::Settings; +use std::{future::Future, pin::Pin}; + +pub type AnyhowResultFuture = Pin>>>; +pub trait Component: Send + Sync + 'static + Sized { + type Config: Default + Clone + DeserializeOwned; + + const SERVICE_NAME: &'static str; + fn start(&self, settings: Settings) -> AnyhowResultFuture<()>; + fn new() -> Self; + + fn _internal_start(self) -> AnyhowResultFuture<()> { + Box::pin(async move { + let settings = Settings::::new(Self::SERVICE_NAME); + + // Start the grpc healthcheck + tokio::spawn(async move {}); + + // Start the prometheus monitoring job + tokio::spawn(async move {}); + + self.start(settings?).await + }) + } +} + +#[macro_export] +macro_rules! ignite { + ($c:ty) => { + #[allow(dead_code)] + fn main() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(Box::new(<$c as Component>::new())._internal_start())?; + Ok(()) + } + }; +} + +#[cfg(test)] +mod test { + use serde::Deserialize; + + use crate::Component; + + #[derive(Clone, Copy)] + struct TestComponent {} + + #[derive(Default, Clone, Deserialize, Copy)] + struct TestComponentConfig {} + + impl Component for TestComponent { + type Config = TestComponentConfig; + const SERVICE_NAME: &'static str = "test_component"; + + fn start( + &self, + _settings: shared::config::Settings, + ) -> crate::AnyhowResultFuture<()> { + Box::pin(async move { Ok(()) }) + } + + fn new() -> Self { + Self {} + } + } + + ignite!(TestComponent); +} diff --git a/libs/proto/Cargo.toml b/libs/proto/Cargo.toml index c4f2f2a..2556dfe 100644 --- a/libs/proto/Cargo.toml +++ b/libs/proto/Cargo.toml @@ -4,3 +4,9 @@ version = "0.1.0" edition = "2018" [dependencies] +tonic = "0.8.3" +prost = "0.11.5" + +[build-dependencies] +tonic-build = "0.8.4" +glob = "0.3.0" \ No newline at end of file diff --git a/libs/proto/build.rs b/libs/proto/build.rs new file mode 100644 index 0000000..80c3a55 --- /dev/null +++ b/libs/proto/build.rs @@ -0,0 +1,11 @@ +fn main() -> Result<(), Box> { + let paths: Vec = glob::glob("../../proto/nova/**/*.proto")? + .map(|f| f.unwrap().to_str().unwrap().to_string()) + .collect(); + + tonic_build::configure() + .include_file("genproto.rs") + .compile(&paths, &["../../proto"])?; + + Ok(()) +} diff --git a/libs/proto/src/lib.rs b/libs/proto/src/lib.rs index e69de29..01dc7bc 100644 --- a/libs/proto/src/lib.rs +++ b/libs/proto/src/lib.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), concat!("/", "genproto.rs"))); diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml index 6d6b6f6..ab19ce8 100644 --- a/libs/shared/Cargo.toml +++ b/libs/shared/Cargo.toml @@ -19,6 +19,7 @@ twilight-model = "0.14" serde_json = { version = "1.0" } thiserror = "1.0.38" inner = "0.1.1" +anyhow = "1.0.68" [dependencies.redis] version = "*" diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs index 52137a3..4387dfb 100644 --- a/libs/shared/src/config.rs +++ b/libs/shared/src/config.rs @@ -1,17 +1,11 @@ -use std::env; +use std::{env, ops::Deref}; use config::{Config, Environment, File}; use log::info; -use serde::Deserialize; +use serde::{Deserialize, de::DeserializeOwned}; use crate::error::GenericError; - -/// Settings is the base structure for all the nova's component config -/// you can specify a type T and the name of the component. the "config" -/// field will be equals to the key named after the given component name -/// and will be of type T #[derive(Debug, Deserialize, Clone)] -#[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))] -pub struct Settings { +pub struct Settings { #[serde(skip_deserializing)] pub config: T, pub monitoring: crate::monitoring::MonitoringConfiguration, @@ -19,20 +13,11 @@ pub struct Settings { pub redis: crate::redis::RedisConfiguration, } -/// -impl Settings -where - T: Deserialize<'static> + std::default::Default + Clone, +impl<'de, T: Clone + DeserializeOwned + Default> Settings { - - /// Initializes a new configuration like the other components of nova - /// And starts the prometheus metrics server if needed. pub fn new(service_name: &str) -> Result, GenericError> { - pretty_env_logger::init(); - let mut builder = Config::builder(); - // this file my be shared with all the components builder = builder.add_source(File::with_name("config/default")); let mode = env::var("ENV").unwrap_or_else(|_| "development".into()); info!("Configuration Environment: {}", mode); @@ -49,13 +34,15 @@ where // try to load the config settings.config = config.get::(service_name)?; - - // start the monitoring system if needed - crate::monitoring::start_monitoring(&settings.monitoring); + Ok(settings) } } -pub fn test_init() { - pretty_env_logger::init(); -} +impl Deref for Settings { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.config + } +} \ No newline at end of file diff --git a/libs/shared/src/nats.rs b/libs/shared/src/nats.rs index 05953cc..dc922d5 100644 --- a/libs/shared/src/nats.rs +++ b/libs/shared/src/nats.rs @@ -1,8 +1,7 @@ +use std::{future::Future, pin::Pin}; + use async_nats::Client; use serde::Deserialize; -use std::future::Future; - -use crate::error::GenericError; #[derive(Clone, Debug, Deserialize)] pub struct NatsConfigurationClientCert { @@ -20,10 +19,8 @@ pub struct NatsConfiguration { pub host: String, } -// todo: Prefer From since it automatically gives a free Into implementation -// Allows the configuration to directly create a nats connection -impl NatsConfiguration { - pub async fn to_client(self) -> Result { - Ok(async_nats::connect(self.host).await?) +impl From for Pin>>> { + fn from(value: NatsConfiguration) -> Self { + Box::pin(async move { Ok(async_nats::connect(value.host).await?) }) } -} \ No newline at end of file +} diff --git a/libs/shared/src/redis.rs b/libs/shared/src/redis.rs index a196f8d..5753fb6 100644 --- a/libs/shared/src/redis.rs +++ b/libs/shared/src/redis.rs @@ -1,6 +1,6 @@ -use redis::Client; +use redis::{aio::MultiplexedConnection, Client}; use serde::Deserialize; - +use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, Deserialize)] pub struct RedisConfiguration { @@ -13,3 +13,18 @@ impl Into for RedisConfiguration { redis::Client::open(self.url).unwrap() } } + +impl From + for Pin>>> +{ + fn from(value: RedisConfiguration) -> Self { + Box::pin(async move { + let con = Client::open(value.url)?; + let (multiplex, ready) = con.create_multiplexed_tokio_connection().await?; + + tokio::spawn(ready); + + Ok(multiplex) + }) + } +} diff --git a/proto/nova/management/nova.management.v1alpha.proto b/proto/nova/management/nova.management.v1alpha.proto deleted file mode 100644 index d0d6baf..0000000 --- a/proto/nova/management/nova.management.v1alpha.proto +++ /dev/null @@ -1,58 +0,0 @@ -syntax = "proto3"; -package nova.management.v1alpha; - -message Empty {} - -// Represents the status of a shard -enum ShardStatus { - DISCONNECTED = 0; - RUNNING = 1; - RECONNECTING = 2; -} - -// represents the state of a nova shard -message ShardStatusResponse { - // Status of the shard in the cluster - ShardStatus status = 1; - // Index of the discord shard - int64 identifier = 2; - // If the cluster have a node assigned - string cluster = 3; - // the websocket latency of the shard - int64 latency = 4; -} - -message ShardStatusRequest { - // the id of the shard - int64 identifier = 1; -} - -// represents the status of a cluster -// (an instance of the gateway which holds multiple shards) -message ClusterStatusResponse { - // the unique id of the cluster - string id = 1; - // the node the cluster is running on - string node = 2; - // the average latency of the cluster - int64 average_latency = 3; - // list of all the shards on the cluster - repeated ShardStatusResponse shards = 4; -} - -message ClusterStatusRequest { - string id = 1; -} - -// Represents the status of all the nova clusters & shards -message GlobalClusterStatusResponse { - int64 size = 1; - repeated ClusterStatusResponse shards = 2; -} - -// used by the cli to interact with the nova manager -service ManagementService { - rpc GetGlobalClusterStatus (Empty) returns (GlobalClusterStatusResponse); - rpc GetClusterStatus (ClusterStatusRequest) returns (ClusterStatusResponse); - rpc GetShardStatus (ShardStatusRequest) returns (ShardStatusResponse); -} \ No newline at end of file diff --git a/proto/nova/management/rpc/nova.management.rpc.v1alpha.proto b/proto/nova/management/rpc/nova.management.rpc.v1alpha.proto deleted file mode 100644 index 1ec0168..0000000 --- a/proto/nova/management/rpc/nova.management.rpc.v1alpha.proto +++ /dev/null @@ -1,5 +0,0 @@ -syntax = "proto3"; - -import "common/management/nova.management.v1alpha.proto"; -package nova.management.rpc.v1alpha; - diff --git a/proto/nova/ratelimit/ratelimiter.proto b/proto/nova/ratelimit/ratelimiter.proto new file mode 100644 index 0000000..34d5b6f --- /dev/null +++ b/proto/nova/ratelimit/ratelimiter.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +package nova.ratelimit.ratelimiter; + +service Ratelimiter { + rpc GetBucketInformation(BucketInformationRequest) returns (BucketInformationResponse); + rpc SubmitTicket(stream BucketSubmitTicketRequest) returns (stream BucketSubmitTicketResponse); +} + +message BucketInformationRequest { + string path = 1; +} + +message BucketInformationResponse { + uint64 limit = 1; + uint64 remaining = 2; + uint64 reset_after = 3; + uint64 started_at = 4; +} + + +message BucketSubmitTicketRequest { + oneof data { + string path = 1; + Headers headers = 2; + } + + message Headers { + map headers = 1; + uint64 precise_time = 2; + } + +} + +message BucketSubmitTicketResponse { + int64 accepted = 1; +}