diff options
40 files changed, 1323 insertions, 601 deletions
diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index a0da8ab..06771a3 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -7,8 +7,6 @@ RUN apt update -y && apt install libssl-dev pkg-config apt-transport-https curl      curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg && \      # Add docker repository apt source      echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list && \ -    # Add bazel repository apt source -    echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | tee /etc/apt/sources.list.d/bazel.list && \      # Install docker      apt update -y && apt install docker-ce-cli -y @@ -2,4 +2,5 @@  target/
  **/local*
  .ijwb
 -.idea
\ No newline at end of file +.idea
 +config.yml
 @@ -38,6 +38,12 @@ dependencies = [  ]  [[package]] +name = "anyhow" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" + +[[package]]  name = "arc-swap"  version = "1.5.1"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -78,6 +84,27 @@ dependencies = [  ]  [[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]]  name = "async-trait"  version = "0.1.60"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -106,6 +133,52 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"  [[package]] +name = "axum" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + +[[package]]  name = "base64"  version = "0.13.1"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -183,9 +256,11 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"  name = "cache"  version = "0.1.0"  dependencies = [ + "anyhow",   "async-nats",   "futures-util",   "log", + "proto",   "redis",   "serde",   "serde_json", @@ -322,16 +397,6 @@ dependencies = [  ]  [[package]] -name = "ctor" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" -dependencies = [ - "quote", - "syn", -] - -[[package]]  name = "curve25519-dalek"  version = "3.2.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -465,6 +530,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"  [[package]] +name = "dns-lookup" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872" +dependencies = [ + "cfg-if", + "libc", + "socket2", + "winapi", +] + +[[package]]  name = "ed25519"  version = "1.5.2"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -558,6 +635,12 @@ dependencies = [  ]  [[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]]  name = "flate2"  version = "1.0.25"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -691,8 +774,11 @@ dependencies = [  name = "gateway"  version = "0.1.0"  dependencies = [ + "anyhow",   "bytes",   "futures", + "leash", + "proto",   "serde",   "serde_json",   "shared", @@ -734,6 +820,12 @@ dependencies = [  ]  [[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + +[[package]]  name = "h2"  version = "0.3.15"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -762,6 +854,21 @@ dependencies = [  ]  [[package]] +name = "hashring" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0ddd025eccd8a2fff9865e82ef4c8ce00c4a67709036847d95cf3ccffd07a8" +dependencies = [ + "siphasher", +] + +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + +[[package]]  name = "hermit-abi"  version = "0.1.19"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -817,6 +924,12 @@ dependencies = [  ]  [[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + +[[package]]  name = "httparse"  version = "1.8.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -876,6 +989,18 @@ dependencies = [  ]  [[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + +[[package]]  name = "hyper-tls"  version = "0.5.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1014,6 +1139,16 @@ dependencies = [  ]  [[package]] +name = "leash" +version = "0.1.0" +dependencies = [ + "anyhow", + "serde", + "shared", + "tokio", +] + +[[package]]  name = "libc"  version = "0.2.139"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1071,12 +1206,24 @@ dependencies = [  ]  [[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + +[[package]]  name = "memchr"  version = "2.5.0"  source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"  [[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]]  name = "minimal-lexical"  version = "0.2.1"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1104,6 +1251,12 @@ dependencies = [  ]  [[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + +[[package]]  name = "native-tls"  version = "0.2.11"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1350,6 +1503,16 @@ dependencies = [  ]  [[package]] +name = "petgraph" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" +dependencies = [ + "fixedbitset", + "indexmap", +] + +[[package]]  name = "pin-project"  version = "1.0.12"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1416,6 +1579,16 @@ dependencies = [  ]  [[package]] +name = "prettyplease" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c8992a85d8e93a28bdf76137db888d3874e3b230dee5ed8bebac4c9f7617773" +dependencies = [ + "proc-macro2", + "syn", +] + +[[package]]  name = "proc-macro2"  version = "1.0.49"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1455,8 +1628,69 @@ dependencies = [  ]  [[package]] +name = "prost" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c01db6702aa05baa3f57dec92b8eeeeb4cb19e894e73996b32a4093289e54592" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb5320c680de74ba083512704acb90fe00f28f79207286a848e730c45dd73ed6" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8842bad1a5419bca14eac663ba798f6bc19c413c2fdceb5f3ba3b0932d96720" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "017f79637768cde62820bc2d4fe0e45daaa027755c323ad077767c6c5f173091" +dependencies = [ + "bytes", + "prost", +] + +[[package]]  name = "proto"  version = "0.1.0" +dependencies = [ + "glob", + "prost", + "tonic", + "tonic-build", +]  [[package]]  name = "protobuf" @@ -1551,6 +1785,24 @@ dependencies = [  ]  [[package]] +name = "ratelimit" +version = "0.1.0" +dependencies = [ + "anyhow", + "futures-util", + "hyper", + "proto", + "serde", + "serde_json", + "shared", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)", +] + +[[package]]  name = "redis"  version = "0.22.1"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1613,13 +1865,25 @@ dependencies = [  name = "rest"  version = "0.1.0"  dependencies = [ + "anyhow", + "dns-lookup",   "futures-util", + "hashring", + "http",   "hyper",   "hyper-tls",   "lazy_static", + "leash", + "proto",   "serde", + "serde_json",   "shared",   "tokio", + "tokio-scoped", + "tokio-stream", + "tonic", + "tracing", + "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",   "xxhash-rust",  ] @@ -1707,6 +1971,12 @@ dependencies = [  ]  [[package]] +name = "rustversion" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" + +[[package]]  name = "ryu"  version = "1.0.12"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1906,6 +2176,7 @@ dependencies = [  name = "shared"  version = "0.1.0"  dependencies = [ + "anyhow",   "async-nats",   "config",   "enumflags2", @@ -1952,6 +2223,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c"  [[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + +[[package]]  name = "slab"  version = "0.4.7"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2024,6 +2301,12 @@ dependencies = [  ]  [[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + +[[package]]  name = "synstructure"  version = "0.12.6"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2169,6 +2452,16 @@ dependencies = [  ]  [[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + +[[package]]  name = "tokio-macros"  version = "1.8.2"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2212,6 +2505,27 @@ dependencies = [  ]  [[package]] +name = "tokio-scoped" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4beb8ba13bc53ac53ce1d52b42f02e5d8060f0f42138862869beb769722b256" +dependencies = [ + "tokio", + "tokio-stream", +] + +[[package]] +name = "tokio-stream" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]]  name = "tokio-tungstenite"  version = "0.17.2"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2251,6 +2565,96 @@ dependencies = [  ]  [[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]]  name = "tower-service"  version = "0.3.2"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2263,6 +2667,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"  dependencies = [   "cfg-if", + "log",   "pin-project-lite",   "tracing-attributes",   "tracing-core", @@ -2289,6 +2694,16 @@ dependencies = [  ]  [[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]]  name = "try-lock"  version = "0.2.3"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2362,7 +2777,7 @@ dependencies = [   "serde_json",   "tokio",   "tracing", - "twilight-http-ratelimiting", + "twilight-http-ratelimiting 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",   "twilight-model",   "twilight-validate",  ] @@ -2380,6 +2795,17 @@ dependencies = [  ]  [[package]] +name = "twilight-http-ratelimiting" +version = "0.14.0" +source = "git+https://github.com/MatthieuCoder/twilight.git#a7953514373d3e3962435e6a539e0e2504a2c2fd" +dependencies = [ + "futures-util", + "http", + "tokio", + "tracing", +] + +[[package]]  name = "twilight-model"  version = "0.14.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2578,13 +3004,13 @@ dependencies = [  name = "webhook"  version = "0.1.0"  dependencies = [ - "ctor", + "anyhow",   "ed25519-dalek",   "hex",   "hyper",   "lazy_static", - "libc", - "rand 0.8.5", + "leash", + "proto",   "serde",   "serde_json",   "shared", @@ -2603,6 +3029,17 @@ dependencies = [  ]  [[package]] +name = "which" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c831fbbee9e129a8cf93e7747a82da9d95ba8e16621cae60ec2cdc849bacb7b" +dependencies = [ + "either", + "libc", + "once_cell", +] + +[[package]]  name = "winapi"  version = "0.3.9"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4,7 +4,9 @@ members = [      "exes/gateway/",
      "exes/rest/",
      "exes/webhook/",
 +    "exes/ratelimit/",
      "libs/proto/",
 -    "libs/shared/"
 +    "libs/shared/",
 +    "libs/leash/"
  ]
\ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index f671f9b..2472e45 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,26 +1,92 @@  version: "3.3"  services: +  nats: +    image: bitnami/nats +    restart: always +    ports: +      - 4222:4222 +      - 8222:8222 +  redis: +    image: redis +    cache:      image: ghcr.io/discordnova/nova/cache +    restart: always      build:         context: .        args:          - COMPONENT=cache +    volumes: +      - ./config.yml:/config/default.yml +    environment: +      - RUST_LOG=info +    depends_on: +      - nats +      - redis +      gateway:      image: ghcr.io/discordnova/nova/gateway +    restart: always      build:         context: .        args:          - COMPONENT=gateway +    volumes: +      - ./config.yml:/config/default.yml +    environment: +      - RUST_LOG=info +    depends_on: +      - nats +    ports: +      - 9000:9000 +    rest:      image: ghcr.io/discordnova/nova/rest +    restart: always      build:         context: .        args:          - COMPONENT=rest +    volumes: +      - ./config.yml:/config/default.yml +    environment: +      - RUST_LOG=info +    depends_on: +      - ratelimit +    ports: +      - 9001:9000 +      - 8080:8080 +    webhook:      image: ghcr.io/discordnova/nova/webhook +    restart: always      build:         context: .        args:          - COMPONENT=webhook +    volumes: +      - ./config.yml:/config/default.yml +    environment: +      - RUST_LOG=info +    depends_on: +      - nats +    ports: +      - 9002:9000 +      - 8081:8080 +  ratelimit: +    image: ghcr.io/discordnova/nova/ratelimit +    restart: always +    build:  +      context: . +      args: +        - COMPONENT=ratelimit +    volumes: +      - ./config.yml:/config/default.yml +    environment: +      - RUST_LOG=info +    depends_on: +      - nats +      - redis +    ports: +      - 9003:9000 +      - 8082:8080
\ No newline at end of file diff --git a/exes/.gitignore b/exes/.gitignore new file mode 100644 index 0000000..0f02d23 --- /dev/null +++ b/exes/.gitignore @@ -0,0 +1 @@ +ratelimit/
\ No newline at end of file diff --git a/exes/cache/Cargo.toml b/exes/cache/Cargo.toml index 61bb449..9a7f9ee 100644 --- a/exes/cache/Cargo.toml +++ b/exes/cache/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018"  [dependencies]  shared = { path = "../../libs/shared" } +proto = { path = "../../libs/proto" }  async-nats = "0.25.1"  tokio = { version = "1", features = ["full"] }  serde = { version = "1.0.8", features = ["derive"] } @@ -14,4 +15,5 @@ log = { version = "0.4", features = ["std"] }  serde_json = { version = "1.0" }  redis = "*"  futures-util = "*" -twilight-model = "0.14"
\ No newline at end of file +twilight-model = "0.14" +anyhow = "1.0.68"
\ No newline at end of file diff --git a/exes/cache/src/config.rs b/exes/cache/src/config.rs index 2aa71a8..3d9f5e2 100644 --- a/exes/cache/src/config.rs +++ b/exes/cache/src/config.rs @@ -1,6 +1,5 @@  use serde::Deserialize; -  #[derive(Debug, Deserialize, Clone, Default)]  pub struct CacheConfiguration {      pub toggles: Vec<String> -}
\ No newline at end of file +} diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs index a74cfa6..312f960 100644 --- a/exes/cache/src/main.rs +++ b/exes/cache/src/main.rs @@ -1,7 +1,7 @@ -use std::error::Error; +use std::{error::Error, pin::Pin};  use async_nats::{Client, Subscriber}; -use futures_util::stream::StreamExt; +use futures_util::{stream::StreamExt, Future};  use log::info;  use managers::{      automoderation::Automoderation, bans::Bans, channels::Channels, @@ -22,7 +22,7 @@ pub enum CacheSourcedEvents {  }  #[derive(Default)] -struct MegaCache { +struct Cache {      automoderation: Automoderation,      channels: Channels,      bans: Bans, @@ -42,18 +42,18 @@ struct MegaCache {  async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {      let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();      info!("loaded configuration: {:?}", settings); - -    let nats: Client = settings.nats.to_client().await?; +    let nats = +        Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats).await?;      // let redis: redis::Client = settings.redis.into(); -    let mut cache = MegaCache::default(); +    let mut cache = Cache::default();      let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?;      listen(&mut sub, &mut cache, settings.config.toggles).await;      Ok(())  } -async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec<String>) { +async fn listen(sub: &mut Subscriber, cache: &mut Cache, features: Vec<String>) {      while let Some(data) = sub.next().await {          let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap();          let event = cp.data.data; diff --git a/exes/cache/src/managers/bans.rs b/exes/cache/src/managers/bans.rs index 27e6a34..24a3fcd 100644 --- a/exes/cache/src/managers/bans.rs +++ b/exes/cache/src/managers/bans.rs @@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;  use super::CacheManager;  use std::future::Future; -  #[derive(Default)]  pub struct Bans {}  impl CacheManager for Bans { diff --git a/exes/gateway/Cargo.toml b/exes/gateway/Cargo.toml index 7ef3d98..d71ed4a 100644 --- a/exes/gateway/Cargo.toml +++ b/exes/gateway/Cargo.toml @@ -5,10 +5,13 @@ edition = "2018"  [dependencies]  shared = { path = "../../libs/shared" } +proto = { path = "../../libs/proto" } +leash = { path = "../../libs/leash" }  tokio = { version = "1", features = ["full"] }  twilight-gateway = { version = "0.14" }  twilight-model = "0.14"  serde = { version = "1.0.8", features = ["derive"] }  futures = "0.3"  serde_json = { version = "1.0" } -bytes = "*"
\ No newline at end of file +bytes = "*" +anyhow = "*"
\ No newline at end of file diff --git a/exes/gateway/src/config.rs b/exes/gateway/src/config.rs index 4c62de6..923ab30 100644 --- a/exes/gateway/src/config.rs +++ b/exes/gateway/src/config.rs @@ -2,13 +2,20 @@ use shared::serde::{Deserialize, Serialize};  use twilight_gateway::Intents;  #[derive(Serialize, Deserialize, Clone)] -pub struct Config { +pub struct GatewayConfig {      pub token: String, -    pub intents: Intents +    pub intents: Intents, +    pub shard: u64, +    pub shard_total: u64,  } -impl Default for Config { +impl Default for GatewayConfig {      fn default() -> Self { -        Self { intents: Intents::empty(), token: String::default() } +        Self { +            intents: Intents::empty(), +            token: String::default(), +            shard_total: 1, +            shard: 1, +        }      }  } diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index 6968fe4..7957b08 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -1,51 +1,69 @@ -use config::Config; +use config::GatewayConfig; +use leash::{ignite, AnyhowResultFuture, Component};  use shared::{      config::Settings,      log::{debug, info},      nats_crate::Client,      payloads::{CachePayload, DispatchEventTagged, Tracing},  }; -use std::{convert::TryFrom, error::Error}; +use std::{convert::TryFrom, pin::Pin};  use twilight_gateway::{Event, Shard};  mod config; -use futures::StreamExt; +use futures::{Future, StreamExt};  use twilight_model::gateway::event::DispatchEvent; -#[tokio::main] -async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { -    let settings: Settings<Config> = Settings::new("gateway").unwrap(); -    let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents); -    let nats: Client = settings.nats.to_client().await?; +struct GatewayServer {} +impl Component for GatewayServer { +    type Config = GatewayConfig; +    const SERVICE_NAME: &'static str = "gateway"; -    shard.start().await?; +    fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> { +        Box::pin(async move { +            let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) +                .shard(settings.shard, settings.shard_total)? +                .build(); -    while let Some(event) = events.next().await { -        match event { -            Event::Ready(ready) => { -                info!("Logged in as {}", ready.user.name); -            } +            let nats = +                Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats) +                    .await?; + +            shard.start().await?; + +            while let Some(event) = events.next().await { +                match event { +                    Event::Ready(ready) => { +                        info!("Logged in as {}", ready.user.name); +                    } -            _ => { -                let name = event.kind().name(); -                if let Ok(dispatch_event) = DispatchEvent::try_from(event) { -                    let data = CachePayload { -                        tracing: Tracing { -                            node_id: "".to_string(), -                            span: None, -                        }, -                        data: DispatchEventTagged { -                            data: dispatch_event, -                        }, -                    }; -                    let value = serde_json::to_string(&data)?; -                    debug!("nats send: {}", value); -                    let bytes = bytes::Bytes::from(value); -                    nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) -                        .await?; +                    _ => { +                        let name = event.kind().name(); +                        if let Ok(dispatch_event) = DispatchEvent::try_from(event) { +                            let data = CachePayload { +                                tracing: Tracing { +                                    node_id: "".to_string(), +                                    span: None, +                                }, +                                data: DispatchEventTagged { +                                    data: dispatch_event, +                                }, +                            }; +                            let value = serde_json::to_string(&data)?; +                            debug!("nats send: {}", value); +                            let bytes = bytes::Bytes::from(value); +                            nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) +                                .await?; +                        } +                    }                  }              } -        } + +            Ok(()) +        })      } -    Ok(()) +    fn new() -> Self { +        Self {} +    }  } + +ignite!(GatewayServer); diff --git a/exes/rest/Cargo.toml b/exes/rest/Cargo.toml index 7b5b2b5..f4c5ecc 100644 --- a/exes/rest/Cargo.toml +++ b/exes/rest/Cargo.toml @@ -7,10 +7,23 @@ edition = "2018"  [dependencies]  shared = { path = "../../libs/shared" } +proto = { path = "../../libs/proto" } +leash = { path = "../../libs/leash" } +  hyper = { version = "0.14", features = ["full"] }  tokio = { version = "1", features = ["full"] }  serde = { version = "1.0.8", features = ["derive"] }  futures-util = "0.3.17"  hyper-tls = "0.5.0"  lazy_static = "1.4.0" -xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
\ No newline at end of file +xxhash-rust = { version = "0.8.2", features = ["xxh32"] } +twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" } +tracing = "0.1.37" +hashring = "0.3.0" +anyhow = "*" +tonic = "0.8.3" +serde_json = { version = "1.0" } +http = "0.2.8" +tokio-stream = "0.1.11" +dns-lookup = "1.0.8" +tokio-scoped = "0.2.0"
\ No newline at end of file diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 559929f..9261de2 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -1,9 +1,21 @@ +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};  use serde::Deserialize; -#[derive(Debug, Deserialize, Clone, Default)] +fn default_listening_address() -> SocketAddr { +    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)) +} + +#[derive(Debug, Deserialize, Clone)]  pub struct ServerSettings { -    pub port: u16, -    pub address: String, +    #[serde(default = "default_listening_address")] +    pub listening_adress: SocketAddr +} +impl Default for ServerSettings { +    fn default() -> Self { +        Self { +            listening_adress: default_listening_address(), +        } +    }  }  #[derive(Debug, Deserialize, Clone, Default)] @@ -12,7 +24,7 @@ pub struct Discord {  }  #[derive(Debug, Deserialize, Clone, Default)] -pub struct Config { +pub struct ReverseProxyConfig {      pub server: ServerSettings,      pub discord: Discord,  } diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs new file mode 100644 index 0000000..8b0dd52 --- /dev/null +++ b/exes/rest/src/handler.rs @@ -0,0 +1,141 @@ +use std::{ +    collections::hash_map::DefaultHasher, +    convert::TryFrom, +    hash::{Hash, Hasher}, +    str::FromStr, +}; + +use anyhow::bail; +use http::{ +    header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE}, +    HeaderValue, Method as HttpMethod, Request, Response, Uri, +}; +use hyper::{client::HttpConnector, Body, Client}; +use hyper_tls::HttpsConnector; +use shared::log::error; +use twilight_http_ratelimiting::{Method, Path}; + +use crate::ratelimit_client::RemoteRatelimiter; + +/// Normalizes the path +fn normalize_path(request_path: &str) -> (&str, &str) { +    if let Some(trimmed_path) = request_path.strip_prefix("/api") { +        if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) { +            if let Some(version_number) = maybe_api_version.strip_prefix('v') { +                if version_number.parse::<u8>().is_ok() { +                    let len = "/api/v".len() + version_number.len(); +                    return (&request_path[..len], &request_path[len..]); +                }; +            }; +        } + +        ("/api", trimmed_path) +    } else { +        ("/api", request_path) +    } +} + +pub async fn handle_request( +    client: Client<HttpsConnector<HttpConnector>, Body>, +    ratelimiter: RemoteRatelimiter, +    token: String, +    mut request: Request<Body>, +) -> Result<Response<Body>, anyhow::Error> { +    let (hash, uri_string) = { +        let method = match *request.method() { +            HttpMethod::DELETE => Method::Delete, +            HttpMethod::GET => Method::Get, +            HttpMethod::PATCH => Method::Patch, +            HttpMethod::POST => Method::Post, +            HttpMethod::PUT => Method::Put, +            _ => { +                error!("Unsupported HTTP method in request, {}", request.method()); +                bail!("unsupported method"); +            } +        }; + +        let request_path = request.uri().path(); +        let (api_path, trimmed_path) = normalize_path(&request_path); + +        let mut uri_string = format!("http://192.168.0.27:8000{}{}", api_path, trimmed_path); +        if let Some(query) = request.uri().query() { +            uri_string.push('?'); +            uri_string.push_str(query); +        } + +        let mut hash = DefaultHasher::new(); +        match Path::try_from((method, trimmed_path)) { +            Ok(path) => path, +            Err(e) => { +                error!( +                    "Failed to parse path for {:?} {}: {:?}", +                    method, trimmed_path, e +                ); +                bail!("failed o parse"); +            } +        } +        .hash(&mut hash); + +        (hash.finish().to_string(), uri_string) +    }; + +    let header_sender = match ratelimiter.ticket(hash).await { +        Ok(sender) => sender, +        Err(e) => { +            error!("Failed to receive ticket for ratelimiting: {:?}", e); +            bail!("failed to reteive ticket"); +        } +    }; + +    request.headers_mut().insert( +        AUTHORIZATION, +        HeaderValue::from_bytes(token.as_bytes()) +            .expect("strings are guaranteed to be valid utf-8"), +    ); +    request +        .headers_mut() +        .insert(HOST, HeaderValue::from_static("discord.com")); + +    // Remove forbidden HTTP/2 headers +    // https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2 +    request.headers_mut().remove(CONNECTION); +    request.headers_mut().remove("keep-alive"); +    request.headers_mut().remove("proxy-connection"); +    request.headers_mut().remove(TRANSFER_ENCODING); +    request.headers_mut().remove(UPGRADE); +    request.headers_mut().remove(AUTHORIZATION); +    request.headers_mut().append( +        AUTHORIZATION, +        HeaderValue::from_static( +            "Bot ODA3MTg4MzM1NzE3Mzg0MjEy.G3sXFM.8gY2sVYDAq2WuPWwDskAAEFLfTg8htooxME-LE", +        ), +    ); + +    let uri = match Uri::from_str(&uri_string) { +        Ok(uri) => uri, +        Err(e) => { +            error!("Failed to create URI for requesting Discord API: {:?}", e); +            bail!("failed to create uri"); +        } +    }; +    *request.uri_mut() = uri; +    let resp = match client.request(request).await { +        Ok(response) => response, +        Err(e) => { +            error!("Error when requesting the Discord API: {:?}", e); +            bail!("failed to request the discord api"); +        } +    }; + +    let ratelimit_headers = resp +        .headers() +        .into_iter() +        .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string())) +        .collect(); + +    if header_sender.send(ratelimit_headers).is_err() { +        error!("Error when sending ratelimit headers to ratelimiter"); +    }; + +    Ok(resp) +} diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs index 9fa6ce7..8d014ab 100644 --- a/exes/rest/src/main.rs +++ b/exes/rest/src/main.rs @@ -1,46 +1,56 @@ -use std::{convert::Infallible, sync::Arc}; +use config::ReverseProxyConfig; -use crate::{config::Config, ratelimit::Ratelimiter}; -use shared::{ -    config::Settings, -    log::{error, info}, -    redis_crate::Client, +use handler::handle_request; +use hyper::{ +    server::conn::AddrStream, +    service::{make_service_fn, service_fn}, +    Body, Client, Request, Server,  }; -use hyper::{server::conn::AddrStream, service::make_service_fn, Server}; -use std::net::ToSocketAddrs; -use tokio::sync::Mutex; - -use crate::proxy::ServiceProxy; +use hyper_tls::HttpsConnector; +use leash::{ignite, AnyhowResultFuture, Component}; +use shared::config::Settings; +use std::convert::Infallible;  mod config; -mod proxy; -mod ratelimit; - -#[tokio::main] -async fn main() { -    let settings: Settings<Config> = Settings::new("rest").unwrap(); -    let config = Arc::new(settings.config); -    let redis_client: Client = settings.redis.into(); -    let redis = Arc::new(Mutex::new( -        redis_client.get_async_connection().await.unwrap(), -    )); -    let ratelimiter = Arc::new(Ratelimiter::new(redis)); - -    let addr = format!("{}:{}", config.server.address, config.server.port) -        .to_socket_addrs() -        .unwrap() -        .next() -        .unwrap(); - -    let service_fn = make_service_fn(move |_: &AddrStream| { -        let service_proxy = ServiceProxy::new(config.clone(), ratelimiter.clone()); -        async move { Ok::<_, Infallible>(service_proxy) } -    }); - -    let server = Server::bind(&addr).serve(service_fn); - -    info!("starting ratelimit server"); -    if let Err(e) = server.await { -        error!("server error: {}", e); +mod handler; +mod ratelimit_client; + +struct ReverseProxyServer {} +impl Component for ReverseProxyServer { +    type Config = ReverseProxyConfig; +    const SERVICE_NAME: &'static str = "rest"; + +    fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> { +        Box::pin(async move { +            // Client to the remote ratelimiters +            let ratelimiter = ratelimit_client::RemoteRatelimiter::new(); +            let client = Client::builder().build(HttpsConnector::new()); + +            let service_fn = make_service_fn(move |_: &AddrStream| { +                let client = client.clone(); +                let ratelimiter = ratelimiter.clone(); +                async move { +                    Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { +                        let client = client.clone(); +                        let ratelimiter = ratelimiter.clone(); +                        async move { +                            handle_request(client, ratelimiter, "token".to_string(), request).await +                        } +                    })) +                } +            }); + +            let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn); + +            server.await?; + +            Ok(()) +        }) +    } + +    fn new() -> Self { +        Self {}      }  } + +ignite!(ReverseProxyServer); diff --git a/exes/rest/src/proxy/mod.rs b/exes/rest/src/proxy/mod.rs deleted file mode 100644 index 65d77aa..0000000 --- a/exes/rest/src/proxy/mod.rs +++ /dev/null @@ -1,138 +0,0 @@ -use crate::{config::Config, ratelimit::Ratelimiter}; -use hyper::{ -    client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client, -    Request, Response, Uri, -}; -use hyper_tls::HttpsConnector; -use shared::{ -    log::debug, -    prometheus::{labels, opts, register_counter, register_histogram_vec, Counter, HistogramVec}, -}; -use std::{future::Future, pin::Pin, sync::Arc, task::Poll}; -use tokio::sync::Mutex; - -lazy_static::lazy_static! { -    static ref HTTP_COUNTER: Counter = register_counter!(opts!( -        "nova_rest_http_requests_total", -        "Number of HTTP requests made.", -        labels! {"handler" => "all",} -    )) -    .unwrap(); - -    static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!( -        "nova_rest_http_request_duration_seconds", -        "The HTTP request latencies in seconds.", -        &["handler"] -    ) -    .unwrap(); - -    static ref HTTP_COUNTER_STATUS: Counter = register_counter!(opts!( -        "nova_rest_http_requests_status", -        "Number of HTTP requests made by status", -        labels! {"" => ""} -    )) -    .unwrap(); -} - -#[derive(Clone)] -pub struct ServiceProxy { -    client: Client<HttpsConnector<HttpConnector>>, -    ratelimiter: Arc<Ratelimiter>, -    config: Arc<Config>, -    fail: Arc<Mutex<i32>>, -} - -impl Service<Request<Body>> for ServiceProxy { -    type Response = Response<Body>; -    type Error = hyper::Error; -    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; - -    fn poll_ready( -        &mut self, -        cx: &mut std::task::Context<'_>, -    ) -> std::task::Poll<Result<(), Self::Error>> { -        match self.client.poll_ready(cx) { -            Poll::Ready(Ok(())) => Poll::Ready(Ok(())), -            Poll::Ready(Err(e)) => Poll::Ready(Err(e)), -            Poll::Pending => Poll::Pending, -        } -    } - -    fn call(&mut self, mut req: Request<hyper::Body>) -> Self::Future { -        HTTP_COUNTER.inc(); - -        let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer(); -        let host = "discord.com"; -        let mut new_parts = Parts::default(); - -        let path = req.uri().path().to_string(); - -        new_parts.scheme = Some("https".parse().unwrap()); -        new_parts.authority = Some(host.parse().unwrap()); -        new_parts.path_and_query = Some(path.parse().unwrap()); - -        *req.uri_mut() = Uri::from_parts(new_parts).unwrap(); - -        let headers = req.headers_mut(); -        headers.remove("user-agent"); -        headers.insert("Host", HeaderValue::from_str("discord.com").unwrap()); -        headers.insert( -            "Authorization", -            HeaderValue::from_str(&format!("Bot {}", self.config.discord.token)).unwrap(), -        ); - -        println!("{:?}", headers); - -        let client = self.client.clone(); -        let ratelimiter = self.ratelimiter.clone(); -        let fail = self.fail.clone(); - -        return Box::pin(async move { -            let resp = match ratelimiter.before_request(&req).await { -                Ok(allowed) => match allowed { -                    crate::ratelimit::RatelimiterResponse::Ratelimited => { -                        debug!("ratelimited"); -                        Ok(Response::builder().body("ratelimited".into()).unwrap()) -                    } -                    _ => { -                        debug!("forwarding request"); -                        match client.request(req).await { -                            Ok(mut response) => { -                                ratelimiter.after_request(&path, &response).await; -                                if response.status() != 200 { -                                    *fail.lock().await += 1 -                                } -                                response.headers_mut().insert( -                                    "x-fails", -                                    HeaderValue::from_str(&format!("{}", fail.lock().await)) -                                        .unwrap(), -                                ); -                                Ok(response) -                            } -                            Err(e) => Err(e), -                        } -                    } -                }, -                Err(e) => Ok(Response::builder() -                    .body(format!("server error: {}", e).into()) -                    .unwrap()), -            }; -            timer.observe_duration(); -            resp -        }); -    } -} - -impl ServiceProxy { -    pub fn new(config: Arc<Config>, ratelimiter: Arc<Ratelimiter>) -> Self { -        let https = HttpsConnector::new(); -        let client = Client::builder().build::<_, hyper::Body>(https); -        let fail = Arc::new(Mutex::new(0)); -        ServiceProxy { -            client, -            config, -            ratelimiter, -            fail, -        } -    } -} diff --git a/exes/rest/src/ratelimit/mod.rs b/exes/rest/src/ratelimit/mod.rs deleted file mode 100644 index 132bfd3..0000000 --- a/exes/rest/src/ratelimit/mod.rs +++ /dev/null @@ -1,155 +0,0 @@ -use shared::{ -    log::debug, -    redis_crate::{aio::Connection, AsyncCommands}, error::GenericError, -}; -use hyper::{Body, Request, Response}; -use std::{ -    convert::TryInto, -    sync::Arc, -    time::{SystemTime, UNIX_EPOCH}, -}; -use tokio::sync::Mutex; -use xxhash_rust::xxh32::xxh32; - -pub enum RatelimiterResponse { -    NoSuchUrl, -    Ratelimited, -    Pass, -} - -pub struct Ratelimiter { -    redis: Arc<Mutex<Connection>>, -} - -impl Ratelimiter { -    pub fn new(redis: Arc<Mutex<Connection>>) -> Ratelimiter { -        return Ratelimiter { redis }; -    } - -    pub async fn before_request( -        &self, -        request: &Request<Body>, -    ) -> Result<RatelimiterResponse, GenericError> { -        // we lookup if the route hash is stored in the redis table -        let path = request.uri().path(); -        let hash = xxh32(path.as_bytes(), 32); -        let mut redis = self.redis.lock().await; - -        let start = SystemTime::now(); -        let since_the_epoch = start -            .duration_since(UNIX_EPOCH) -            .expect("Time went backwards"); - -        // global rate litmit -        match redis -            .get::<String, Option<i32>>(format!( -                "nova:rest:ratelimit:global:{}", -                since_the_epoch.as_secs() -            )) -            .await -        { -            Ok(value) => { -                match value { -                    Some(value) => { -                        debug!("incr: {}", value); -                        if value >= 49 { -                            return Ok(RatelimiterResponse::Ratelimited); -                        } -                    } -                    None => { -                        let key = -                            format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()); -                        // init global ratelimit -                        redis.set_ex::<String, i32, ()>(key, 0, 2).await.unwrap(); -                    } -                } -            } -            Err(_) => { -                return Err(GenericError::StepFailed("radis ratelimit check".to_string())); -            } -        }; - -        // we lookup the corresponding bucket for this url -        match redis -            .get::<String, Option<String>>(format!("nova:rest:ratelimit:url_bucket:{}", hash)) -            .await -        { -            Ok(bucket) => match bucket { -                Some(bucket) => { -                    match redis -                        .exists::<String, bool>(format!("nova:rest:ratelimit:lock:{}", bucket)) -                        .await -                    { -                        Ok(exists) => { -                            if exists { -                                Ok(RatelimiterResponse::Ratelimited) -                            } else { -                                Ok(RatelimiterResponse::Pass) -                            } -                        } -                        Err(_) =>  Err(GenericError::StepFailed("radis ratelimit check".to_string())), -                    } -                } -                None => Ok(RatelimiterResponse::NoSuchUrl), -            }, -            Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())), -        } -    } - -    fn parse_headers(&self, response: &Response<Body>) -> Option<(String, i32, i32)> { -        if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") { -            let bucket = bucket.to_str().unwrap().to_string(); - -            let remaining = response.headers().get("X-RateLimit-Remaining").unwrap(); -            let reset = response.headers().get("X-RateLimit-Reset-After").unwrap(); - -            let remaining_i32 = remaining.to_str().unwrap().parse::<i32>().unwrap(); -            let reset_ms_i32 = reset.to_str().unwrap().parse::<f32>().unwrap().ceil() as i32; -            return Some((bucket, remaining_i32, reset_ms_i32)); -        } else { -            None -        } -    } - -    pub async fn after_request(&self, path: &str, response: &Response<Body>) { -        let hash = xxh32(path.as_bytes(), 32); -        // verified earlier - -        let mut redis = self.redis.lock().await; - -        let start = SystemTime::now(); -        let since_the_epoch = start -            .duration_since(UNIX_EPOCH) -            .expect("Time went backwards"); - -        redis -            .incr::<String, i32, ()>( -                format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()), -                1, -            ) -            .await -            .unwrap(); -        if let Some((bucket, remaining, reset)) = self.parse_headers(response) { -            if remaining <= 1 { -                // we set a lock for the bucket until the timeout passes -                redis -                    .set_ex::<String, bool, ()>( -                        format!("nova:rest:ratelimit:lock:{}", bucket), -                        true, -                        reset.try_into().unwrap(), -                    ) -                    .await -                    .unwrap(); -            } - -            redis -                .set_ex::<String, String, ()>( -                    format!("nova:rest:ratelimit:url_bucket:{}", hash), -                    bucket, -                    reset.try_into().unwrap(), -                ) -                .await -                .unwrap(); -        } -    } -} diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs new file mode 100644 index 0000000..8263d15 --- /dev/null +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -0,0 +1,137 @@ +use self::remote_hashring::{HashRingWrapper, VNode}; +use futures_util::Future; +use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers}; +use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest; +use shared::log::debug; +use std::collections::HashMap; +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::Arc; +use std::time::UNIX_EPOCH; +use std::time::{Duration, SystemTime}; +use tokio::sync::oneshot::{self}; +use tokio::sync::{broadcast, mpsc, RwLock}; +use tokio_stream::wrappers::ReceiverStream; + +mod remote_hashring; + +#[derive(Clone, Debug)] +pub struct RemoteRatelimiter { +    remotes: Arc<RwLock<HashRingWrapper>>, +    stop: Arc<tokio::sync::broadcast::Sender<()>>, +} + +impl Drop for RemoteRatelimiter { +    fn drop(&mut self) { +        self.stop.clone().send(()).unwrap(); +    } +} + +impl RemoteRatelimiter { +    async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> { +        // get list of dns responses +        let responses = dns_lookup::lookup_host("ratelimit") +            .unwrap() +            .into_iter() +            .map(|f| f.to_string()); + +        let mut write = self.remotes.write().await; + +        for ip in responses { +            let a = VNode::new(ip.into()).await?; +            write.add(a.clone()); +        } + +        return Ok(()); +    } + +    #[must_use] +    pub fn new() -> Self { +        let (rx, mut tx) = broadcast::channel(1); +        let obj = Self { +            remotes: Arc::new(RwLock::new(HashRingWrapper::default())), +            stop: Arc::new(rx), +        }; + +        let obj_clone = obj.clone(); +        // Task to update the ratelimiters in the background +        tokio::spawn(async move { +            loop { +                let sleep = tokio::time::sleep(Duration::from_secs(10)); +                tokio::pin!(sleep); + +                debug!("refreshing"); +                obj_clone.get_ratelimiters().await.unwrap(); +                tokio::select! { +                    () = &mut sleep => { +                        println!("timer elapsed"); +                    }, +                    _ = tx.recv() => {} +                } +            } +        }); + +        obj +    } + +    pub fn ticket( +        &self, +        path: String, +    ) -> Pin< +        Box< +            dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>> +                + Send +                + 'static, +        >, +    > { +        let remotes = self.remotes.clone(); +        let (tx, rx) = oneshot::channel::<HashMap<String, String>>(); + +        Box::pin(async move { +            // Get node managing this path +            let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); + +            // Buffers for the gRPC streaming channel. +            let (send, remote) = mpsc::channel(5); +            let (do_request, wait) = oneshot::channel(); +            // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it +            let stream = ReceiverStream::new(remote); + +            // Start the grpc streaming +            let ticket = node.submit_ticket(stream).await?; + +            // First, send the request +            send.send(BucketSubmitTicketRequest { +                data: Some(Data::Path(path)), +            }) +            .await?; + +            // We continuously listen for events in the channel. +            tokio::spawn(async move { +                let message = ticket.into_inner().message().await.unwrap().unwrap(); + +                if message.accepted == 1 { +                    do_request.send(()).unwrap(); +                    let headers = rx.await.unwrap(); + +                    send.send(BucketSubmitTicketRequest { +                        data: Some(Data::Headers(Headers { +                            precise_time: SystemTime::now() +                                .duration_since(UNIX_EPOCH) +                                .expect("time went backwards") +                                .as_millis() as u64, +                            headers, +                        })), +                    }) +                    .await +                    .unwrap(); +                } +            }); + +            // Wait for the message to be sent +            wait.await?; + +            Ok(tx) +        }) +    } +} diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs new file mode 100644 index 0000000..b9f7800 --- /dev/null +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -0,0 +1,67 @@ +use core::fmt::Debug; +use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient; +use std::hash::Hash; +use std::ops::Deref; +use std::ops::DerefMut; +use tonic::transport::Channel; + +#[derive(Debug, Clone)] +pub struct VNode { +    address: String, + +    client: RatelimiterClient<Channel>, +} + +impl Deref for VNode { +    type Target = RatelimiterClient<Channel>; + +    fn deref(&self) -> &Self::Target { +        &self.client +    } +} + +impl DerefMut for VNode { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.client +    } +} + +impl Hash for VNode { +    fn hash<H: std::hash::Hasher>(&self, state: &mut H) { +        self.address.hash(state); +    } +} + +impl VNode { +    pub async fn new(address: String) -> Result<Self, tonic::transport::Error> { +        let client = RatelimiterClient::connect(format!("http://{}:8080", address.clone())).await?; + +        Ok(VNode { client, address }) +    } +} + +unsafe impl Send for VNode {} + +#[repr(transparent)] +#[derive(Default)] +pub struct HashRingWrapper(hashring::HashRing<VNode>); + +impl Deref for HashRingWrapper { +    type Target = hashring::HashRing<VNode>; + +    fn deref(&self) -> &Self::Target { +        &self.0 +    } +} + +impl DerefMut for HashRingWrapper { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.0 +    } +} + +impl Debug for HashRingWrapper { +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +        f.debug_tuple("HashRing").finish() +    } +} diff --git a/exes/webhook/Cargo.toml b/exes/webhook/Cargo.toml index 1d6a5a7..12a6608 100644 --- a/exes/webhook/Cargo.toml +++ b/exes/webhook/Cargo.toml @@ -7,15 +7,16 @@ edition = "2018"  hyper = { version = "0.14", features = ["full"] }  tokio = { version = "1", features = ["full"] }  shared = { path = "../../libs/shared" } +proto = { path = "../../libs/proto" } +leash = { path = "../../libs/leash" } +  serde = { version = "1.0.8", features = ["derive"] }  hex = "0.4.3"  serde_json = { version = "1.0" } -libc = "0.2.101"  lazy_static = "1.4.0" -ctor = "0.1.21"  ed25519-dalek = "1"  twilight-model = { version = "0.14" } -rand = "0.8" +anyhow = "1.0.68"  [[bin]]  name = "webhook" diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs index a054d33..68f6a5f 100644 --- a/exes/webhook/src/config.rs +++ b/exes/webhook/src/config.rs @@ -1,19 +1,43 @@ -use serde::Deserialize; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -#[derive(Debug, Deserialize, Clone, Default)] +use ed25519_dalek::PublicKey; +use serde::{Deserialize, Deserializer}; + +fn default_listening_address() -> SocketAddr { +    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)) +} + +#[derive(Debug, Deserialize, Clone, Copy)]  pub struct ServerSettings { -    pub port: u16, -    pub address: String, +    #[serde(default = "default_listening_address")] +    pub listening_adress: SocketAddr, +} +impl Default for ServerSettings { +    fn default() -> Self { +        Self { +            listening_adress: default_listening_address(), +        } +    } +} + +fn deserialize_pk<'de, D>(deserializer: D) -> Result<PublicKey, D::Error> +where +    D: Deserializer<'de>, +{ +    let str = String::deserialize(deserializer)?; +    let public_key = PublicKey::from_bytes(&hex::decode(&str).unwrap()).unwrap(); +    Ok(public_key)  } -#[derive(Debug, Deserialize, Clone, Default)] +#[derive(Debug, Deserialize, Clone, Default, Copy)]  pub struct Discord { -    pub public_key: String, +    #[serde(deserialize_with = "deserialize_pk")] +    pub public_key: PublicKey,      pub client_id: u32,  } -#[derive(Debug, Deserialize, Clone, Default)] -pub struct Config { +#[derive(Debug, Deserialize, Clone, Default, Copy)] +pub struct WebhookConfig {      pub server: ServerSettings,      pub discord: Discord,  } diff --git a/exes/webhook/src/handler/handler.rs b/exes/webhook/src/handler/handler.rs index b2ef44c..af79185 100644 --- a/exes/webhook/src/handler/handler.rs +++ b/exes/webhook/src/handler/handler.rs @@ -1,13 +1,12 @@  use super::error::WebhookError;  use super::signature::validate_signature; -use crate::config::Config; +use crate::config::WebhookConfig;  use ed25519_dalek::PublicKey;  use hyper::{      body::{to_bytes, Bytes},      service::Service,      Body, Method, Request, Response, StatusCode,  }; -use serde::{Deserialize, Serialize};  use shared::nats_crate::Client;  use shared::{      log::{debug, error}, @@ -17,10 +16,9 @@ use std::{      future::Future,      pin::Pin,      str::from_utf8, -    sync::Arc,      task::{Context, Poll},  }; -use twilight_model::gateway::event::{DispatchEvent}; +use twilight_model::gateway::event::DispatchEvent;  use twilight_model::{      application::interaction::{Interaction, InteractionType},      gateway::payload::incoming::InteractionCreate, @@ -28,14 +26,13 @@ use twilight_model::{  /// Hyper service used to handle the discord webhooks  #[derive(Clone)] -pub struct HandlerService { -    pub config: Arc<Config>, -    pub nats: Arc<Client>, -    pub public_key: Arc<PublicKey>, +pub struct WebhookService { +    pub config: WebhookConfig, +    pub nats: Client,  } -impl HandlerService { -    async fn check_request(&self, req: Request<Body>) -> Result<Bytes, WebhookError> { +impl WebhookService { +    async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {          if req.method() == Method::POST {              let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {                  sig.to_owned() @@ -57,7 +54,7 @@ impl HandlerService {              let data = to_bytes(req.into_body()).await?;              if validate_signature( -                &self.public_key, +                &pk,                  &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),                  signature.to_str()?,              ) { @@ -74,10 +71,11 @@ impl HandlerService {      }      async fn process_request( -        &mut self,          req: Request<Body>, +        nats: Client, +        pk: PublicKey,      ) -> Result<Response<Body>, WebhookError> { -        match self.check_request(req).await { +        match Self::check_request(req, pk).await {              Ok(data) => {                  let utf8 = from_utf8(&data);                  match utf8 { @@ -86,7 +84,7 @@ impl HandlerService {                              match value.kind {                                  InteractionType::Ping => Ok(Response::builder()                                      .header("Content-Type", "application/json") -                                    .body(serde_json::to_string(&Ping { t: 1 }).unwrap().into()) +                                    .body(r#"{"t":1}"#.into())                                      .unwrap()),                                  _ => {                                      debug!("calling nats"); @@ -106,10 +104,13 @@ impl HandlerService {                                      let payload = serde_json::to_string(&data).unwrap(); -                                    match self.nats.request( -                                        "nova.cache.dispatch.INTERACTION_CREATE".to_string(), -                                        Bytes::from(payload), -                                    ).await { +                                    match nats +                                        .request( +                                            "nova.cache.dispatch.INTERACTION_CREATE".to_string(), +                                            Bytes::from(payload), +                                        ) +                                        .await +                                    {                                          Ok(response) => Ok(Response::builder()                                              .header("Content-Type", "application/json")                                              .body(Body::from(response.reply.unwrap())) @@ -144,15 +145,9 @@ impl HandlerService {      }  } -#[derive(Debug, Serialize, Deserialize)] -pub struct Ping { -    #[serde(rename = "type")] -    t: i32, -} -  /// Implementation of the service -impl Service<Request<Body>> for HandlerService { -    type Response = Response<Body>; +impl Service<hyper::Request<Body>> for WebhookService { +    type Response = hyper::Response<Body>;      type Error = hyper::Error;      type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; @@ -161,9 +156,10 @@ impl Service<Request<Body>> for HandlerService {      }      fn call(&mut self, req: Request<Body>) -> Self::Future { -        let mut clone = self.clone(); +        let future = +            Self::process_request(req, self.nats.clone(), self.config.discord.public_key);          Box::pin(async move { -            let response = clone.process_request(req).await; +            let response = future.await;              match response {                  Ok(r) => Ok(r), diff --git a/exes/webhook/src/handler/make_service.rs b/exes/webhook/src/handler/make_service.rs index 48672a1..b51494a 100644 --- a/exes/webhook/src/handler/make_service.rs +++ b/exes/webhook/src/handler/make_service.rs @@ -1,22 +1,15 @@ -use super::handler::HandlerService; -use crate::config::Config;  use hyper::service::Service; -use shared::nats_crate::Client;  use std::{      future::{ready, Ready}, -    sync::Arc,      task::{Context, Poll},  }; -use ed25519_dalek::PublicKey; -pub struct MakeSvc { -    pub settings: Arc<Config>, -    pub nats: Arc<Client>, -    pub public_key: Arc<PublicKey> +pub struct MakeSvc<T: Clone> { +    pub service: T,  } -impl<T> Service<T> for MakeSvc { -    type Response = HandlerService; +impl<T, V: Clone> Service<T> for MakeSvc<V> { +    type Response = V;      type Error = std::io::Error;      type Future = Ready<Result<Self::Response, Self::Error>>; @@ -25,10 +18,12 @@ impl<T> Service<T> for MakeSvc {      }      fn call(&mut self, _: T) -> Self::Future { -        ready(Ok(HandlerService { -            config: self.settings.clone(), -            nats: self.nats.clone(), -            public_key: self.public_key.clone() -        })) +        ready(Ok(self.service.clone())) +    } +} + +impl<T: Clone> MakeSvc<T> { +    pub fn new(service: T) -> Self { +        Self { service }      }  } diff --git a/exes/webhook/src/handler/mod.rs b/exes/webhook/src/handler/mod.rs index 20a977a..e4cf35a 100644 --- a/exes/webhook/src/handler/mod.rs +++ b/exes/webhook/src/handler/mod.rs @@ -1,5 +1,5 @@  mod error; -mod handler; +pub mod handler;  pub mod make_service;  mod signature; diff --git a/exes/webhook/src/handler/tests/signature.rs b/exes/webhook/src/handler/tests/signature.rs index 490143b..0bed86a 100644 --- a/exes/webhook/src/handler/tests/signature.rs +++ b/exes/webhook/src/handler/tests/signature.rs @@ -5,7 +5,10 @@ use ed25519_dalek::PublicKey;  fn validate_signature_test() {      let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002";      let content = "message de test incroyable".as_bytes().to_vec(); -    let public_key = PublicKey::from_bytes(&hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap()).unwrap(); +    let public_key = PublicKey::from_bytes( +        &hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap(), +    ) +    .unwrap();      assert!(validate_signature(&public_key, &content, signature))  } @@ -13,7 +16,10 @@ fn validate_signature_test() {  #[test]  fn validate_signature_reverse_test() {      let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002"; -    let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap(); +    let public_key = PublicKey::from_bytes( +        &hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(), +    ) +    .unwrap();      let content = "ceci est un test qui ne fonctionnera pas!"          .as_bytes() @@ -24,10 +30,13 @@ fn validate_signature_reverse_test() {  #[test]  fn invalid_hex() {      let signature = "zzz"; -    let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap(); +    let public_key = PublicKey::from_bytes( +        &hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(), +    ) +    .unwrap();      let content = "ceci est un test qui ne fonctionnera pas!"          .as_bytes()          .to_vec();      assert!(!validate_signature(&public_key, &content, signature)) -}
\ No newline at end of file +} diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs index 336dd82..efd4147 100644 --- a/exes/webhook/src/main.rs +++ b/exes/webhook/src/main.rs @@ -1,45 +1,47 @@ -use std::{net::ToSocketAddrs, sync::Arc};  mod config;  mod handler; -use crate::handler::make_service::MakeSvc; +use std::{future::Future, pin::Pin}; -use crate::config::Config; -use ed25519_dalek::PublicKey; +use crate::{ +    config::WebhookConfig, +    handler::{handler::WebhookService, make_service::MakeSvc}, +};  use hyper::Server; -use shared::config::Settings; -use shared::log::{error, info}; +use leash::{ignite, AnyhowResultFuture, Component}; +use shared::{config::Settings, log::info, nats_crate::Client}; -#[tokio::main] -async fn main() { -    let settings: Settings<Config> = Settings::new("webhook").unwrap(); -    start(settings).await; -} +#[derive(Clone, Copy)] +struct WebhookServer {} + +impl Component for WebhookServer { +    type Config = WebhookConfig; +    const SERVICE_NAME: &'static str = "webhook"; + +    fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> { +        Box::pin(async move { +            info!("Starting server on {}", settings.server.listening_adress); + +            let bind = settings.server.listening_adress; +            let nats = +                Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats) +                    .await?; + +            let make_service = MakeSvc::new(WebhookService { +                config: settings.config, +                nats: nats.clone(), +            }); -async fn start(settings: Settings<Config>) { -    let addr = format!( -        "{}:{}", -        settings.config.server.address, settings.config.server.port -    ) -    .to_socket_addrs() -    .unwrap() -    .next() -    .unwrap(); - -    info!( -        "Starting server on {}:{}", -        settings.config.server.address, settings.config.server.port -    ); - -    let config = Arc::new(settings.config); -    let public_key = -        Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap()); -    let server = Server::bind(&addr).serve(MakeSvc { -        settings: config, -        nats: Arc::new(settings.nats.to_client().await.unwrap()), -        public_key: public_key, -    }); - -    if let Err(e) = server.await { -        error!("server error: {}", e); +            let server = Server::bind(&bind).serve(make_service); + +            server.await?; + +            Ok(()) +        }) +    } + +    fn new() -> Self { +        Self {}      }  } + +ignite!(WebhookServer); diff --git a/libs/leash/Cargo.toml b/libs/leash/Cargo.toml new file mode 100644 index 0000000..5cd54a5 --- /dev/null +++ b/libs/leash/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "leash" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +shared = { path = "../shared" } +anyhow = "1.0.68" +tokio = { version = "1.23.0", features = ["full"] } + +serde = "1.0.152"
\ No newline at end of file diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs new file mode 100644 index 0000000..360db12 --- /dev/null +++ b/libs/leash/src/lib.rs @@ -0,0 +1,70 @@ +use anyhow::Result; +use serde::de::DeserializeOwned; +use shared::config::Settings; +use std::{future::Future, pin::Pin}; + +pub type AnyhowResultFuture<T> = Pin<Box<dyn Future<Output = Result<T>>>>; +pub trait Component: Send + Sync + 'static + Sized { +    type Config: Default + Clone + DeserializeOwned; + +    const SERVICE_NAME: &'static str; +    fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()>; +    fn new() -> Self; + +    fn _internal_start(self) -> AnyhowResultFuture<()> { +        Box::pin(async move { +            let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME); + +            // Start the grpc healthcheck +            tokio::spawn(async move {}); + +            // Start the prometheus monitoring job +            tokio::spawn(async move {}); + +            self.start(settings?).await +        }) +    } +} + +#[macro_export] +macro_rules! ignite { +    ($c:ty) => { +        #[allow(dead_code)] +        fn main() -> anyhow::Result<()> { +            let rt = tokio::runtime::Runtime::new()?; +            rt.block_on(Box::new(<$c as Component>::new())._internal_start())?; +            Ok(()) +        } +    }; +} + +#[cfg(test)] +mod test { +    use serde::Deserialize; + +    use crate::Component; + +    #[derive(Clone, Copy)] +    struct TestComponent {} + +    #[derive(Default, Clone, Deserialize, Copy)] +    struct TestComponentConfig {} + +    impl Component for TestComponent { +        type Config = TestComponentConfig; +        const SERVICE_NAME: &'static str = "test_component"; + +        fn start( +            &self, +            _settings: shared::config::Settings<Self::Config>, +        ) -> crate::AnyhowResultFuture<()> { +            Box::pin(async move { Ok(()) }) +        } + +        fn new() -> Self { +            Self {} +        } +    } +     +    ignite!(TestComponent); +} diff --git a/libs/proto/Cargo.toml b/libs/proto/Cargo.toml index c4f2f2a..2556dfe 100644 --- a/libs/proto/Cargo.toml +++ b/libs/proto/Cargo.toml @@ -4,3 +4,9 @@ version = "0.1.0"  edition = "2018"  [dependencies] +tonic = "0.8.3" +prost = "0.11.5" + +[build-dependencies] +tonic-build = "0.8.4" +glob = "0.3.0"
\ No newline at end of file diff --git a/libs/proto/build.rs b/libs/proto/build.rs new file mode 100644 index 0000000..80c3a55 --- /dev/null +++ b/libs/proto/build.rs @@ -0,0 +1,11 @@ +fn main() -> Result<(), Box<dyn std::error::Error>> { +    let paths: Vec<String> = glob::glob("../../proto/nova/**/*.proto")? +        .map(|f| f.unwrap().to_str().unwrap().to_string()) +        .collect(); + +    tonic_build::configure() +        .include_file("genproto.rs") +        .compile(&paths, &["../../proto"])?; + +    Ok(()) +} diff --git a/libs/proto/src/lib.rs b/libs/proto/src/lib.rs index e69de29..01dc7bc 100644 --- a/libs/proto/src/lib.rs +++ b/libs/proto/src/lib.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), concat!("/", "genproto.rs"))); diff --git a/libs/shared/Cargo.toml b/libs/shared/Cargo.toml index 6d6b6f6..ab19ce8 100644 --- a/libs/shared/Cargo.toml +++ b/libs/shared/Cargo.toml @@ -19,6 +19,7 @@ twilight-model = "0.14"  serde_json = { version = "1.0" }  thiserror = "1.0.38"  inner = "0.1.1" +anyhow = "1.0.68"  [dependencies.redis]  version = "*" diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs index 52137a3..4387dfb 100644 --- a/libs/shared/src/config.rs +++ b/libs/shared/src/config.rs @@ -1,17 +1,11 @@ -use std::env; +use std::{env, ops::Deref};  use config::{Config, Environment, File};  use log::info; -use serde::Deserialize; +use serde::{Deserialize, de::DeserializeOwned};  use crate::error::GenericError; - -/// Settings<T> is the base structure for all the nova's component config -/// you can specify a type T and the name of the component. the "config" -/// field will be equals to the key named after the given component name -/// and will be of type T  #[derive(Debug, Deserialize, Clone)] -#[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))] -pub struct Settings<T> { +pub struct Settings<T: Clone + DeserializeOwned + Default> {      #[serde(skip_deserializing)]      pub config: T,      pub monitoring: crate::monitoring::MonitoringConfiguration, @@ -19,20 +13,11 @@ pub struct Settings<T> {      pub redis: crate::redis::RedisConfiguration,  } -///  -impl<T> Settings<T> -where -    T: Deserialize<'static> + std::default::Default + Clone, +impl<'de, T: Clone + DeserializeOwned + Default> Settings<T>  { - -    /// Initializes a new configuration like the other components of nova -    /// And starts the prometheus metrics server if needed.      pub fn new(service_name: &str) -> Result<Settings<T>, GenericError> { -        pretty_env_logger::init(); -          let mut builder = Config::builder(); -        // this file my be shared with all the components          builder = builder.add_source(File::with_name("config/default"));          let mode = env::var("ENV").unwrap_or_else(|_| "development".into());          info!("Configuration Environment: {}", mode); @@ -49,13 +34,15 @@ where          //  try to load the config          settings.config = config.get::<T>(service_name)?; -         -        // start the monitoring system if needed -        crate::monitoring::start_monitoring(&settings.monitoring); +          Ok(settings)      }  } -pub fn test_init() { -    pretty_env_logger::init(); -} +impl<T: Clone + DeserializeOwned + Default>  Deref for Settings<T> { +    type Target = T; + +    fn deref(&self) -> &Self::Target { +        &self.config +    } +}
\ No newline at end of file diff --git a/libs/shared/src/nats.rs b/libs/shared/src/nats.rs index 05953cc..dc922d5 100644 --- a/libs/shared/src/nats.rs +++ b/libs/shared/src/nats.rs @@ -1,8 +1,7 @@ +use std::{future::Future, pin::Pin}; +  use async_nats::Client;  use serde::Deserialize; -use std::future::Future; - -use crate::error::GenericError;  #[derive(Clone, Debug, Deserialize)]  pub struct NatsConfigurationClientCert { @@ -20,10 +19,8 @@ pub struct NatsConfiguration {      pub host: String,  } -// todo: Prefer From since it automatically gives a free Into implementation -// Allows the configuration to directly create a nats connection -impl NatsConfiguration { -    pub async fn to_client(self) -> Result<Client, GenericError> { -        Ok(async_nats::connect(self.host).await?) +impl From<NatsConfiguration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>>>> { +    fn from(value: NatsConfiguration) -> Self { +        Box::pin(async move { Ok(async_nats::connect(value.host).await?) })      } -}
\ No newline at end of file +} diff --git a/libs/shared/src/redis.rs b/libs/shared/src/redis.rs index a196f8d..5753fb6 100644 --- a/libs/shared/src/redis.rs +++ b/libs/shared/src/redis.rs @@ -1,6 +1,6 @@ -use redis::Client; +use redis::{aio::MultiplexedConnection, Client};  use serde::Deserialize; - +use std::{future::Future, pin::Pin};  #[derive(Clone, Debug, Deserialize)]  pub struct RedisConfiguration { @@ -13,3 +13,18 @@ impl Into<Client> for RedisConfiguration {          redis::Client::open(self.url).unwrap()      }  } + +impl From<RedisConfiguration> +    for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>>>> +{ +    fn from(value: RedisConfiguration) -> Self { +        Box::pin(async move { +            let con = Client::open(value.url)?; +            let (multiplex, ready) = con.create_multiplexed_tokio_connection().await?; + +            tokio::spawn(ready); + +            Ok(multiplex) +        }) +    } +} diff --git a/proto/nova/management/nova.management.v1alpha.proto b/proto/nova/management/nova.management.v1alpha.proto deleted file mode 100644 index d0d6baf..0000000 --- a/proto/nova/management/nova.management.v1alpha.proto +++ /dev/null @@ -1,58 +0,0 @@ -syntax = "proto3"; -package nova.management.v1alpha; - -message Empty {} - -// Represents the status of a shard -enum ShardStatus { -    DISCONNECTED = 0; -    RUNNING = 1; -    RECONNECTING = 2; -} - -// represents the state of a nova shard -message ShardStatusResponse { -    // Status of the shard in the cluster -     ShardStatus status = 1; -    // Index of the discord shard -     int64 identifier = 2; -    // If the cluster have a node assigned -    string cluster = 3; -    // the websocket latency of the shard -    int64 latency = 4; -} - -message ShardStatusRequest { -    // the id of the shard -     int64 identifier = 1; -} - -// represents the status of a cluster -// (an instance of the gateway which holds multiple shards) -message ClusterStatusResponse { -    // the unique id of the cluster -     string id = 1; -    // the node the cluster is running on -     string node = 2; -    // the average latency of the cluster -    int64 average_latency = 3; -    // list of all the shards on the cluster -    repeated ShardStatusResponse shards = 4; -} - -message ClusterStatusRequest { -     string id = 1;     -} - -// Represents the status of all the nova clusters & shards -message GlobalClusterStatusResponse { -    int64 size = 1; -    repeated ClusterStatusResponse shards = 2; -} - -// used by the cli to interact with the nova manager -service ManagementService { -    rpc GetGlobalClusterStatus (Empty) returns (GlobalClusterStatusResponse); -    rpc GetClusterStatus (ClusterStatusRequest) returns (ClusterStatusResponse); -    rpc GetShardStatus (ShardStatusRequest) returns (ShardStatusResponse); -}
\ No newline at end of file diff --git a/proto/nova/management/rpc/nova.management.rpc.v1alpha.proto b/proto/nova/management/rpc/nova.management.rpc.v1alpha.proto deleted file mode 100644 index 1ec0168..0000000 --- a/proto/nova/management/rpc/nova.management.rpc.v1alpha.proto +++ /dev/null @@ -1,5 +0,0 @@ -syntax = "proto3"; - -import "common/management/nova.management.v1alpha.proto"; -package nova.management.rpc.v1alpha; - diff --git a/proto/nova/ratelimit/ratelimiter.proto b/proto/nova/ratelimit/ratelimiter.proto new file mode 100644 index 0000000..34d5b6f --- /dev/null +++ b/proto/nova/ratelimit/ratelimiter.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +package nova.ratelimit.ratelimiter; + +service Ratelimiter { +    rpc GetBucketInformation(BucketInformationRequest) returns (BucketInformationResponse); +    rpc SubmitTicket(stream BucketSubmitTicketRequest) returns (stream BucketSubmitTicketResponse); +} + +message BucketInformationRequest { +    string path = 1; +} + +message BucketInformationResponse { +    uint64 limit = 1; +    uint64 remaining = 2; +    uint64 reset_after = 3; +    uint64 started_at = 4; +} + + +message BucketSubmitTicketRequest { +    oneof data { +        string path = 1; +        Headers headers = 2; +    } + +    message Headers { +        map<string, string> headers = 1; +        uint64 precise_time = 2; +    } +     +} + +message BucketSubmitTicketResponse { +    int64 accepted = 1; +}  | 
