curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg && \
# Add docker repository apt source
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list && \
- # Add bazel repository apt source
- echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | tee /etc/apt/sources.list.d/bazel.list && \
# Install docker
apt update -y && apt install docker-ce-cli -y
target/\r
**/local*\r
.ijwb\r
-.idea
\ No newline at end of file
+.idea\r
+config.yml\r
"libc",
]
+[[package]]
+name = "anyhow"
+version = "1.0.68"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
+
[[package]]
name = "arc-swap"
version = "1.5.1"
"url",
]
+[[package]]
+name = "async-stream"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "async-trait"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
+[[package]]
+name = "axum"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48"
+dependencies = [
+ "async-trait",
+ "axum-core",
+ "bitflags",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "hyper",
+ "itoa",
+ "matchit",
+ "memchr",
+ "mime",
+ "percent-encoding",
+ "pin-project-lite",
+ "rustversion",
+ "serde",
+ "sync_wrapper",
+ "tower",
+ "tower-http",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "axum-core"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body",
+ "mime",
+ "rustversion",
+ "tower-layer",
+ "tower-service",
+]
+
[[package]]
name = "base64"
version = "0.13.1"
name = "cache"
version = "0.1.0"
dependencies = [
+ "anyhow",
"async-nats",
"futures-util",
"log",
+ "proto",
"redis",
"serde",
"serde_json",
"typenum",
]
-[[package]]
-name = "ctor"
-version = "0.1.26"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
-dependencies = [
- "quote",
- "syn",
-]
-
[[package]]
name = "curve25519-dalek"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"
+[[package]]
+name = "dns-lookup"
+version = "1.0.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "socket2",
+ "winapi",
+]
+
[[package]]
name = "ed25519"
version = "1.5.2"
"instant",
]
+[[package]]
+name = "fixedbitset"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
+
[[package]]
name = "flate2"
version = "1.0.25"
name = "gateway"
version = "0.1.0"
dependencies = [
+ "anyhow",
"bytes",
"futures",
+ "leash",
+ "proto",
"serde",
"serde_json",
"shared",
"wasi 0.11.0+wasi-snapshot-preview1",
]
+[[package]]
+name = "glob"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
+
[[package]]
name = "h2"
version = "0.3.15"
"ahash",
]
+[[package]]
+name = "hashring"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dd0ddd025eccd8a2fff9865e82ef4c8ce00c4a67709036847d95cf3ccffd07a8"
+dependencies = [
+ "siphasher",
+]
+
+[[package]]
+name = "heck"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
+
[[package]]
name = "hermit-abi"
version = "0.1.19"
"pin-project-lite",
]
+[[package]]
+name = "http-range-header"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
+
[[package]]
name = "httparse"
version = "1.8.0"
"tokio-rustls",
]
+[[package]]
+name = "hyper-timeout"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
+dependencies = [
+ "hyper",
+ "pin-project-lite",
+ "tokio",
+ "tokio-io-timeout",
+]
+
[[package]]
name = "hyper-tls"
version = "0.5.0"
"tokio",
]
+[[package]]
+name = "leash"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "serde",
+ "shared",
+ "tokio",
+]
+
[[package]]
name = "libc"
version = "0.2.139"
"cfg-if",
]
+[[package]]
+name = "matchit"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
+
[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
+[[package]]
+name = "mime"
+version = "0.3.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
+
[[package]]
name = "minimal-lexical"
version = "0.2.1"
"windows-sys 0.42.0",
]
+[[package]]
+name = "multimap"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
+
[[package]]
name = "native-tls"
version = "0.2.11"
"sha1",
]
+[[package]]
+name = "petgraph"
+version = "0.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143"
+dependencies = [
+ "fixedbitset",
+ "indexmap",
+]
+
[[package]]
name = "pin-project"
version = "1.0.12"
"log",
]
+[[package]]
+name = "prettyplease"
+version = "0.1.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c8992a85d8e93a28bdf76137db888d3874e3b230dee5ed8bebac4c9f7617773"
+dependencies = [
+ "proc-macro2",
+ "syn",
+]
+
[[package]]
name = "proc-macro2"
version = "1.0.49"
"thiserror",
]
+[[package]]
+name = "prost"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c01db6702aa05baa3f57dec92b8eeeeb4cb19e894e73996b32a4093289e54592"
+dependencies = [
+ "bytes",
+ "prost-derive",
+]
+
+[[package]]
+name = "prost-build"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cb5320c680de74ba083512704acb90fe00f28f79207286a848e730c45dd73ed6"
+dependencies = [
+ "bytes",
+ "heck",
+ "itertools",
+ "lazy_static",
+ "log",
+ "multimap",
+ "petgraph",
+ "prettyplease",
+ "prost",
+ "prost-types",
+ "regex",
+ "syn",
+ "tempfile",
+ "which",
+]
+
+[[package]]
+name = "prost-derive"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c8842bad1a5419bca14eac663ba798f6bc19c413c2fdceb5f3ba3b0932d96720"
+dependencies = [
+ "anyhow",
+ "itertools",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "prost-types"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "017f79637768cde62820bc2d4fe0e45daaa027755c323ad077767c6c5f173091"
+dependencies = [
+ "bytes",
+ "prost",
+]
+
[[package]]
name = "proto"
version = "0.1.0"
+dependencies = [
+ "glob",
+ "prost",
+ "tonic",
+ "tonic-build",
+]
[[package]]
name = "protobuf"
"rand_core 0.5.1",
]
+[[package]]
+name = "ratelimit"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "futures-util",
+ "hyper",
+ "proto",
+ "serde",
+ "serde_json",
+ "shared",
+ "tokio",
+ "tokio-stream",
+ "tonic",
+ "tracing",
+ "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",
+]
+
[[package]]
name = "redis"
version = "0.22.1"
name = "rest"
version = "0.1.0"
dependencies = [
+ "anyhow",
+ "dns-lookup",
"futures-util",
+ "hashring",
+ "http",
"hyper",
"hyper-tls",
"lazy_static",
+ "leash",
+ "proto",
"serde",
+ "serde_json",
"shared",
"tokio",
+ "tokio-scoped",
+ "tokio-stream",
+ "tonic",
+ "tracing",
+ "twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",
"xxhash-rust",
]
"base64",
]
+[[package]]
+name = "rustversion"
+version = "1.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70"
+
[[package]]
name = "ryu"
version = "1.0.12"
name = "shared"
version = "0.1.0"
dependencies = [
+ "anyhow",
"async-nats",
"config",
"enumflags2",
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c"
+[[package]]
+name = "siphasher"
+version = "0.3.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
+
[[package]]
name = "slab"
version = "0.4.7"
"unicode-ident",
]
+[[package]]
+name = "sync_wrapper"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
+
[[package]]
name = "synstructure"
version = "0.12.6"
"windows-sys 0.42.0",
]
+[[package]]
+name = "tokio-io-timeout"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
+dependencies = [
+ "pin-project-lite",
+ "tokio",
+]
+
[[package]]
name = "tokio-macros"
version = "1.8.2"
"webpki",
]
+[[package]]
+name = "tokio-scoped"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e4beb8ba13bc53ac53ce1d52b42f02e5d8060f0f42138862869beb769722b256"
+dependencies = [
+ "tokio",
+ "tokio-stream",
+]
+
+[[package]]
+name = "tokio-stream"
+version = "0.1.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
+dependencies = [
+ "futures-core",
+ "pin-project-lite",
+ "tokio",
+]
+
[[package]]
name = "tokio-tungstenite"
version = "0.17.2"
"serde",
]
+[[package]]
+name = "tonic"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
+dependencies = [
+ "async-stream",
+ "async-trait",
+ "axum",
+ "base64",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "hyper",
+ "hyper-timeout",
+ "percent-encoding",
+ "pin-project",
+ "prost",
+ "prost-derive",
+ "tokio",
+ "tokio-stream",
+ "tokio-util",
+ "tower",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+ "tracing-futures",
+]
+
+[[package]]
+name = "tonic-build"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4"
+dependencies = [
+ "prettyplease",
+ "proc-macro2",
+ "prost-build",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "tower"
+version = "0.4.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
+dependencies = [
+ "futures-core",
+ "futures-util",
+ "indexmap",
+ "pin-project",
+ "pin-project-lite",
+ "rand 0.8.5",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "tower-http"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858"
+dependencies = [
+ "bitflags",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "http",
+ "http-body",
+ "http-range-header",
+ "pin-project-lite",
+ "tower",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "tower-layer"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
+
[[package]]
name = "tower-service"
version = "0.3.2"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
+ "log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
"once_cell",
]
+[[package]]
+name = "tracing-futures"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
+dependencies = [
+ "pin-project",
+ "tracing",
+]
+
[[package]]
name = "try-lock"
version = "0.2.3"
"serde_json",
"tokio",
"tracing",
- "twilight-http-ratelimiting",
+ "twilight-http-ratelimiting 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"twilight-model",
"twilight-validate",
]
"tracing",
]
+[[package]]
+name = "twilight-http-ratelimiting"
+version = "0.14.0"
+source = "git+https://github.com/MatthieuCoder/twilight.git#a7953514373d3e3962435e6a539e0e2504a2c2fd"
+dependencies = [
+ "futures-util",
+ "http",
+ "tokio",
+ "tracing",
+]
+
[[package]]
name = "twilight-model"
version = "0.14.0"
name = "webhook"
version = "0.1.0"
dependencies = [
- "ctor",
+ "anyhow",
"ed25519-dalek",
"hex",
"hyper",
"lazy_static",
- "libc",
- "rand 0.8.5",
+ "leash",
+ "proto",
"serde",
"serde_json",
"shared",
"untrusted",
]
+[[package]]
+name = "which"
+version = "4.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1c831fbbee9e129a8cf93e7747a82da9d95ba8e16621cae60ec2cdc849bacb7b"
+dependencies = [
+ "either",
+ "libc",
+ "once_cell",
+]
+
[[package]]
name = "winapi"
version = "0.3.9"
"exes/gateway/",\r
"exes/rest/",\r
"exes/webhook/",\r
+ "exes/ratelimit/",\r
\r
"libs/proto/",\r
- "libs/shared/"\r
+ "libs/shared/",\r
+ "libs/leash/"\r
]
\ No newline at end of file
version: "3.3"
services:
+ nats:
+ image: bitnami/nats
+ restart: always
+ ports:
+ - 4222:4222
+ - 8222:8222
+ redis:
+ image: redis
+
cache:
image: ghcr.io/discordnova/nova/cache
+ restart: always
build:
context: .
args:
- COMPONENT=cache
+ volumes:
+ - ./config.yml:/config/default.yml
+ environment:
+ - RUST_LOG=info
+ depends_on:
+ - nats
+ - redis
+
gateway:
image: ghcr.io/discordnova/nova/gateway
+ restart: always
build:
context: .
args:
- COMPONENT=gateway
+ volumes:
+ - ./config.yml:/config/default.yml
+ environment:
+ - RUST_LOG=info
+ depends_on:
+ - nats
+ ports:
+ - 9000:9000
+
rest:
image: ghcr.io/discordnova/nova/rest
+ restart: always
build:
context: .
args:
- COMPONENT=rest
+ volumes:
+ - ./config.yml:/config/default.yml
+ environment:
+ - RUST_LOG=info
+ depends_on:
+ - ratelimit
+ ports:
+ - 9001:9000
+ - 8080:8080
+
webhook:
image: ghcr.io/discordnova/nova/webhook
+ restart: always
build:
context: .
args:
- COMPONENT=webhook
+ volumes:
+ - ./config.yml:/config/default.yml
+ environment:
+ - RUST_LOG=info
+ depends_on:
+ - nats
+ ports:
+ - 9002:9000
+ - 8081:8080
+ ratelimit:
+ image: ghcr.io/discordnova/nova/ratelimit
+ restart: always
+ build:
+ context: .
+ args:
+ - COMPONENT=ratelimit
+ volumes:
+ - ./config.yml:/config/default.yml
+ environment:
+ - RUST_LOG=info
+ depends_on:
+ - nats
+ - redis
+ ports:
+ - 9003:9000
+ - 8082:8080
\ No newline at end of file
--- /dev/null
+ratelimit/
\ No newline at end of file
[dependencies]
shared = { path = "../../libs/shared" }
+proto = { path = "../../libs/proto" }
async-nats = "0.25.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.8", features = ["derive"] }
serde_json = { version = "1.0" }
redis = "*"
futures-util = "*"
-twilight-model = "0.14"
\ No newline at end of file
+twilight-model = "0.14"
+anyhow = "1.0.68"
\ No newline at end of file
use serde::Deserialize;
-
#[derive(Debug, Deserialize, Clone, Default)]
pub struct CacheConfiguration {
pub toggles: Vec<String>
-}
\ No newline at end of file
+}
-use std::error::Error;
+use std::{error::Error, pin::Pin};
use async_nats::{Client, Subscriber};
-use futures_util::stream::StreamExt;
+use futures_util::{stream::StreamExt, Future};
use log::info;
use managers::{
automoderation::Automoderation, bans::Bans, channels::Channels,
}
#[derive(Default)]
-struct MegaCache {
+struct Cache {
automoderation: Automoderation,
channels: Channels,
bans: Bans,
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();
info!("loaded configuration: {:?}", settings);
-
- let nats: Client = settings.nats.to_client().await?;
+ let nats =
+ Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats).await?;
// let redis: redis::Client = settings.redis.into();
- let mut cache = MegaCache::default();
+ let mut cache = Cache::default();
let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?;
listen(&mut sub, &mut cache, settings.config.toggles).await;
Ok(())
}
-async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec<String>) {
+async fn listen(sub: &mut Subscriber, cache: &mut Cache, features: Vec<String>) {
while let Some(data) = sub.next().await {
let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap();
let event = cp.data.data;
use super::CacheManager;
use std::future::Future;
-
#[derive(Default)]
pub struct Bans {}
impl CacheManager for Bans {
[dependencies]
shared = { path = "../../libs/shared" }
+proto = { path = "../../libs/proto" }
+leash = { path = "../../libs/leash" }
tokio = { version = "1", features = ["full"] }
twilight-gateway = { version = "0.14" }
twilight-model = "0.14"
serde = { version = "1.0.8", features = ["derive"] }
futures = "0.3"
serde_json = { version = "1.0" }
-bytes = "*"
\ No newline at end of file
+bytes = "*"
+anyhow = "*"
\ No newline at end of file
use twilight_gateway::Intents;
#[derive(Serialize, Deserialize, Clone)]
-pub struct Config {
+pub struct GatewayConfig {
pub token: String,
- pub intents: Intents
+ pub intents: Intents,
+ pub shard: u64,
+ pub shard_total: u64,
}
-impl Default for Config {
+impl Default for GatewayConfig {
fn default() -> Self {
- Self { intents: Intents::empty(), token: String::default() }
+ Self {
+ intents: Intents::empty(),
+ token: String::default(),
+ shard_total: 1,
+ shard: 1,
+ }
}
}
-use config::Config;
+use config::GatewayConfig;
+use leash::{ignite, AnyhowResultFuture, Component};
use shared::{
config::Settings,
log::{debug, info},
nats_crate::Client,
payloads::{CachePayload, DispatchEventTagged, Tracing},
};
-use std::{convert::TryFrom, error::Error};
+use std::{convert::TryFrom, pin::Pin};
use twilight_gateway::{Event, Shard};
mod config;
-use futures::StreamExt;
+use futures::{Future, StreamExt};
use twilight_model::gateway::event::DispatchEvent;
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
- let settings: Settings<Config> = Settings::new("gateway").unwrap();
- let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents);
- let nats: Client = settings.nats.to_client().await?;
+struct GatewayServer {}
+impl Component for GatewayServer {
+ type Config = GatewayConfig;
+ const SERVICE_NAME: &'static str = "gateway";
- shard.start().await?;
+ fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+ Box::pin(async move {
+ let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
+ .shard(settings.shard, settings.shard_total)?
+ .build();
- while let Some(event) = events.next().await {
- match event {
- Event::Ready(ready) => {
- info!("Logged in as {}", ready.user.name);
- }
+ let nats =
+ Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats)
+ .await?;
+
+ shard.start().await?;
+
+ while let Some(event) = events.next().await {
+ match event {
+ Event::Ready(ready) => {
+ info!("Logged in as {}", ready.user.name);
+ }
- _ => {
- let name = event.kind().name();
- if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
- let data = CachePayload {
- tracing: Tracing {
- node_id: "".to_string(),
- span: None,
- },
- data: DispatchEventTagged {
- data: dispatch_event,
- },
- };
- let value = serde_json::to_string(&data)?;
- debug!("nats send: {}", value);
- let bytes = bytes::Bytes::from(value);
- nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
- .await?;
+ _ => {
+ let name = event.kind().name();
+ if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+ let data = CachePayload {
+ tracing: Tracing {
+ node_id: "".to_string(),
+ span: None,
+ },
+ data: DispatchEventTagged {
+ data: dispatch_event,
+ },
+ };
+ let value = serde_json::to_string(&data)?;
+ debug!("nats send: {}", value);
+ let bytes = bytes::Bytes::from(value);
+ nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+ .await?;
+ }
+ }
}
}
- }
+
+ Ok(())
+ })
}
- Ok(())
+ fn new() -> Self {
+ Self {}
+ }
}
+
+ignite!(GatewayServer);
[dependencies]
shared = { path = "../../libs/shared" }
+proto = { path = "../../libs/proto" }
+leash = { path = "../../libs/leash" }
+
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.8", features = ["derive"] }
futures-util = "0.3.17"
hyper-tls = "0.5.0"
lazy_static = "1.4.0"
-xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
\ No newline at end of file
+xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
+twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" }
+tracing = "0.1.37"
+hashring = "0.3.0"
+anyhow = "*"
+tonic = "0.8.3"
+serde_json = { version = "1.0" }
+http = "0.2.8"
+tokio-stream = "0.1.11"
+dns-lookup = "1.0.8"
+tokio-scoped = "0.2.0"
\ No newline at end of file
+use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use serde::Deserialize;
-#[derive(Debug, Deserialize, Clone, Default)]
+fn default_listening_address() -> SocketAddr {
+ SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080))
+}
+
+#[derive(Debug, Deserialize, Clone)]
pub struct ServerSettings {
- pub port: u16,
- pub address: String,
+ #[serde(default = "default_listening_address")]
+ pub listening_adress: SocketAddr
+}
+impl Default for ServerSettings {
+ fn default() -> Self {
+ Self {
+ listening_adress: default_listening_address(),
+ }
+ }
}
#[derive(Debug, Deserialize, Clone, Default)]
}
#[derive(Debug, Deserialize, Clone, Default)]
-pub struct Config {
+pub struct ReverseProxyConfig {
pub server: ServerSettings,
pub discord: Discord,
}
--- /dev/null
+use std::{
+ collections::hash_map::DefaultHasher,
+ convert::TryFrom,
+ hash::{Hash, Hasher},
+ str::FromStr,
+};
+
+use anyhow::bail;
+use http::{
+ header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
+ HeaderValue, Method as HttpMethod, Request, Response, Uri,
+};
+use hyper::{client::HttpConnector, Body, Client};
+use hyper_tls::HttpsConnector;
+use shared::log::error;
+use twilight_http_ratelimiting::{Method, Path};
+
+use crate::ratelimit_client::RemoteRatelimiter;
+
+/// Normalizes the path
+fn normalize_path(request_path: &str) -> (&str, &str) {
+ if let Some(trimmed_path) = request_path.strip_prefix("/api") {
+ if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) {
+ if let Some(version_number) = maybe_api_version.strip_prefix('v') {
+ if version_number.parse::<u8>().is_ok() {
+ let len = "/api/v".len() + version_number.len();
+ return (&request_path[..len], &request_path[len..]);
+ };
+ };
+ }
+
+ ("/api", trimmed_path)
+ } else {
+ ("/api", request_path)
+ }
+}
+
+pub async fn handle_request(
+ client: Client<HttpsConnector<HttpConnector>, Body>,
+ ratelimiter: RemoteRatelimiter,
+ token: String,
+ mut request: Request<Body>,
+) -> Result<Response<Body>, anyhow::Error> {
+ let (hash, uri_string) = {
+ let method = match *request.method() {
+ HttpMethod::DELETE => Method::Delete,
+ HttpMethod::GET => Method::Get,
+ HttpMethod::PATCH => Method::Patch,
+ HttpMethod::POST => Method::Post,
+ HttpMethod::PUT => Method::Put,
+ _ => {
+ error!("Unsupported HTTP method in request, {}", request.method());
+ bail!("unsupported method");
+ }
+ };
+
+ let request_path = request.uri().path();
+ let (api_path, trimmed_path) = normalize_path(&request_path);
+
+ let mut uri_string = format!("http://192.168.0.27:8000{}{}", api_path, trimmed_path);
+ if let Some(query) = request.uri().query() {
+ uri_string.push('?');
+ uri_string.push_str(query);
+ }
+
+ let mut hash = DefaultHasher::new();
+ match Path::try_from((method, trimmed_path)) {
+ Ok(path) => path,
+ Err(e) => {
+ error!(
+ "Failed to parse path for {:?} {}: {:?}",
+ method, trimmed_path, e
+ );
+ bail!("failed o parse");
+ }
+ }
+ .hash(&mut hash);
+
+ (hash.finish().to_string(), uri_string)
+ };
+
+ let header_sender = match ratelimiter.ticket(hash).await {
+ Ok(sender) => sender,
+ Err(e) => {
+ error!("Failed to receive ticket for ratelimiting: {:?}", e);
+ bail!("failed to reteive ticket");
+ }
+ };
+
+ request.headers_mut().insert(
+ AUTHORIZATION,
+ HeaderValue::from_bytes(token.as_bytes())
+ .expect("strings are guaranteed to be valid utf-8"),
+ );
+ request
+ .headers_mut()
+ .insert(HOST, HeaderValue::from_static("discord.com"));
+
+ // Remove forbidden HTTP/2 headers
+ // https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2
+ request.headers_mut().remove(CONNECTION);
+ request.headers_mut().remove("keep-alive");
+ request.headers_mut().remove("proxy-connection");
+ request.headers_mut().remove(TRANSFER_ENCODING);
+ request.headers_mut().remove(UPGRADE);
+ request.headers_mut().remove(AUTHORIZATION);
+ request.headers_mut().append(
+ AUTHORIZATION,
+ HeaderValue::from_static(
+ "Bot ODA3MTg4MzM1NzE3Mzg0MjEy.G3sXFM.8gY2sVYDAq2WuPWwDskAAEFLfTg8htooxME-LE",
+ ),
+ );
+
+ let uri = match Uri::from_str(&uri_string) {
+ Ok(uri) => uri,
+ Err(e) => {
+ error!("Failed to create URI for requesting Discord API: {:?}", e);
+ bail!("failed to create uri");
+ }
+ };
+ *request.uri_mut() = uri;
+ let resp = match client.request(request).await {
+ Ok(response) => response,
+ Err(e) => {
+ error!("Error when requesting the Discord API: {:?}", e);
+ bail!("failed to request the discord api");
+ }
+ };
+
+ let ratelimit_headers = resp
+ .headers()
+ .into_iter()
+ .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string()))
+ .collect();
+
+ if header_sender.send(ratelimit_headers).is_err() {
+ error!("Error when sending ratelimit headers to ratelimiter");
+ };
+
+ Ok(resp)
+}
-use std::{convert::Infallible, sync::Arc};
+use config::ReverseProxyConfig;
-use crate::{config::Config, ratelimit::Ratelimiter};
-use shared::{
- config::Settings,
- log::{error, info},
- redis_crate::Client,
+use handler::handle_request;
+use hyper::{
+ server::conn::AddrStream,
+ service::{make_service_fn, service_fn},
+ Body, Client, Request, Server,
};
-use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
-use std::net::ToSocketAddrs;
-use tokio::sync::Mutex;
-
-use crate::proxy::ServiceProxy;
+use hyper_tls::HttpsConnector;
+use leash::{ignite, AnyhowResultFuture, Component};
+use shared::config::Settings;
+use std::convert::Infallible;
mod config;
-mod proxy;
-mod ratelimit;
-
-#[tokio::main]
-async fn main() {
- let settings: Settings<Config> = Settings::new("rest").unwrap();
- let config = Arc::new(settings.config);
- let redis_client: Client = settings.redis.into();
- let redis = Arc::new(Mutex::new(
- redis_client.get_async_connection().await.unwrap(),
- ));
- let ratelimiter = Arc::new(Ratelimiter::new(redis));
-
- let addr = format!("{}:{}", config.server.address, config.server.port)
- .to_socket_addrs()
- .unwrap()
- .next()
- .unwrap();
-
- let service_fn = make_service_fn(move |_: &AddrStream| {
- let service_proxy = ServiceProxy::new(config.clone(), ratelimiter.clone());
- async move { Ok::<_, Infallible>(service_proxy) }
- });
-
- let server = Server::bind(&addr).serve(service_fn);
-
- info!("starting ratelimit server");
- if let Err(e) = server.await {
- error!("server error: {}", e);
+mod handler;
+mod ratelimit_client;
+
+struct ReverseProxyServer {}
+impl Component for ReverseProxyServer {
+ type Config = ReverseProxyConfig;
+ const SERVICE_NAME: &'static str = "rest";
+
+ fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+ Box::pin(async move {
+ // Client to the remote ratelimiters
+ let ratelimiter = ratelimit_client::RemoteRatelimiter::new();
+ let client = Client::builder().build(HttpsConnector::new());
+
+ let service_fn = make_service_fn(move |_: &AddrStream| {
+ let client = client.clone();
+ let ratelimiter = ratelimiter.clone();
+ async move {
+ Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
+ let client = client.clone();
+ let ratelimiter = ratelimiter.clone();
+ async move {
+ handle_request(client, ratelimiter, "token".to_string(), request).await
+ }
+ }))
+ }
+ });
+
+ let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn);
+
+ server.await?;
+
+ Ok(())
+ })
+ }
+
+ fn new() -> Self {
+ Self {}
}
}
+
+ignite!(ReverseProxyServer);
+++ /dev/null
-use crate::{config::Config, ratelimit::Ratelimiter};
-use hyper::{
- client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client,
- Request, Response, Uri,
-};
-use hyper_tls::HttpsConnector;
-use shared::{
- log::debug,
- prometheus::{labels, opts, register_counter, register_histogram_vec, Counter, HistogramVec},
-};
-use std::{future::Future, pin::Pin, sync::Arc, task::Poll};
-use tokio::sync::Mutex;
-
-lazy_static::lazy_static! {
- static ref HTTP_COUNTER: Counter = register_counter!(opts!(
- "nova_rest_http_requests_total",
- "Number of HTTP requests made.",
- labels! {"handler" => "all",}
- ))
- .unwrap();
-
- static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!(
- "nova_rest_http_request_duration_seconds",
- "The HTTP request latencies in seconds.",
- &["handler"]
- )
- .unwrap();
-
- static ref HTTP_COUNTER_STATUS: Counter = register_counter!(opts!(
- "nova_rest_http_requests_status",
- "Number of HTTP requests made by status",
- labels! {"" => ""}
- ))
- .unwrap();
-}
-
-#[derive(Clone)]
-pub struct ServiceProxy {
- client: Client<HttpsConnector<HttpConnector>>,
- ratelimiter: Arc<Ratelimiter>,
- config: Arc<Config>,
- fail: Arc<Mutex<i32>>,
-}
-
-impl Service<Request<Body>> for ServiceProxy {
- type Response = Response<Body>;
- type Error = hyper::Error;
- type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
-
- fn poll_ready(
- &mut self,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), Self::Error>> {
- match self.client.poll_ready(cx) {
- Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
- Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
- Poll::Pending => Poll::Pending,
- }
- }
-
- fn call(&mut self, mut req: Request<hyper::Body>) -> Self::Future {
- HTTP_COUNTER.inc();
-
- let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer();
- let host = "discord.com";
- let mut new_parts = Parts::default();
-
- let path = req.uri().path().to_string();
-
- new_parts.scheme = Some("https".parse().unwrap());
- new_parts.authority = Some(host.parse().unwrap());
- new_parts.path_and_query = Some(path.parse().unwrap());
-
- *req.uri_mut() = Uri::from_parts(new_parts).unwrap();
-
- let headers = req.headers_mut();
- headers.remove("user-agent");
- headers.insert("Host", HeaderValue::from_str("discord.com").unwrap());
- headers.insert(
- "Authorization",
- HeaderValue::from_str(&format!("Bot {}", self.config.discord.token)).unwrap(),
- );
-
- println!("{:?}", headers);
-
- let client = self.client.clone();
- let ratelimiter = self.ratelimiter.clone();
- let fail = self.fail.clone();
-
- return Box::pin(async move {
- let resp = match ratelimiter.before_request(&req).await {
- Ok(allowed) => match allowed {
- crate::ratelimit::RatelimiterResponse::Ratelimited => {
- debug!("ratelimited");
- Ok(Response::builder().body("ratelimited".into()).unwrap())
- }
- _ => {
- debug!("forwarding request");
- match client.request(req).await {
- Ok(mut response) => {
- ratelimiter.after_request(&path, &response).await;
- if response.status() != 200 {
- *fail.lock().await += 1
- }
- response.headers_mut().insert(
- "x-fails",
- HeaderValue::from_str(&format!("{}", fail.lock().await))
- .unwrap(),
- );
- Ok(response)
- }
- Err(e) => Err(e),
- }
- }
- },
- Err(e) => Ok(Response::builder()
- .body(format!("server error: {}", e).into())
- .unwrap()),
- };
- timer.observe_duration();
- resp
- });
- }
-}
-
-impl ServiceProxy {
- pub fn new(config: Arc<Config>, ratelimiter: Arc<Ratelimiter>) -> Self {
- let https = HttpsConnector::new();
- let client = Client::builder().build::<_, hyper::Body>(https);
- let fail = Arc::new(Mutex::new(0));
- ServiceProxy {
- client,
- config,
- ratelimiter,
- fail,
- }
- }
-}
+++ /dev/null
-use shared::{
- log::debug,
- redis_crate::{aio::Connection, AsyncCommands}, error::GenericError,
-};
-use hyper::{Body, Request, Response};
-use std::{
- convert::TryInto,
- sync::Arc,
- time::{SystemTime, UNIX_EPOCH},
-};
-use tokio::sync::Mutex;
-use xxhash_rust::xxh32::xxh32;
-
-pub enum RatelimiterResponse {
- NoSuchUrl,
- Ratelimited,
- Pass,
-}
-
-pub struct Ratelimiter {
- redis: Arc<Mutex<Connection>>,
-}
-
-impl Ratelimiter {
- pub fn new(redis: Arc<Mutex<Connection>>) -> Ratelimiter {
- return Ratelimiter { redis };
- }
-
- pub async fn before_request(
- &self,
- request: &Request<Body>,
- ) -> Result<RatelimiterResponse, GenericError> {
- // we lookup if the route hash is stored in the redis table
- let path = request.uri().path();
- let hash = xxh32(path.as_bytes(), 32);
- let mut redis = self.redis.lock().await;
-
- let start = SystemTime::now();
- let since_the_epoch = start
- .duration_since(UNIX_EPOCH)
- .expect("Time went backwards");
-
- // global rate litmit
- match redis
- .get::<String, Option<i32>>(format!(
- "nova:rest:ratelimit:global:{}",
- since_the_epoch.as_secs()
- ))
- .await
- {
- Ok(value) => {
- match value {
- Some(value) => {
- debug!("incr: {}", value);
- if value >= 49 {
- return Ok(RatelimiterResponse::Ratelimited);
- }
- }
- None => {
- let key =
- format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs());
- // init global ratelimit
- redis.set_ex::<String, i32, ()>(key, 0, 2).await.unwrap();
- }
- }
- }
- Err(_) => {
- return Err(GenericError::StepFailed("radis ratelimit check".to_string()));
- }
- };
-
- // we lookup the corresponding bucket for this url
- match redis
- .get::<String, Option<String>>(format!("nova:rest:ratelimit:url_bucket:{}", hash))
- .await
- {
- Ok(bucket) => match bucket {
- Some(bucket) => {
- match redis
- .exists::<String, bool>(format!("nova:rest:ratelimit:lock:{}", bucket))
- .await
- {
- Ok(exists) => {
- if exists {
- Ok(RatelimiterResponse::Ratelimited)
- } else {
- Ok(RatelimiterResponse::Pass)
- }
- }
- Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())),
- }
- }
- None => Ok(RatelimiterResponse::NoSuchUrl),
- },
- Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())),
- }
- }
-
- fn parse_headers(&self, response: &Response<Body>) -> Option<(String, i32, i32)> {
- if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") {
- let bucket = bucket.to_str().unwrap().to_string();
-
- let remaining = response.headers().get("X-RateLimit-Remaining").unwrap();
- let reset = response.headers().get("X-RateLimit-Reset-After").unwrap();
-
- let remaining_i32 = remaining.to_str().unwrap().parse::<i32>().unwrap();
- let reset_ms_i32 = reset.to_str().unwrap().parse::<f32>().unwrap().ceil() as i32;
- return Some((bucket, remaining_i32, reset_ms_i32));
- } else {
- None
- }
- }
-
- pub async fn after_request(&self, path: &str, response: &Response<Body>) {
- let hash = xxh32(path.as_bytes(), 32);
- // verified earlier
-
- let mut redis = self.redis.lock().await;
-
- let start = SystemTime::now();
- let since_the_epoch = start
- .duration_since(UNIX_EPOCH)
- .expect("Time went backwards");
-
- redis
- .incr::<String, i32, ()>(
- format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()),
- 1,
- )
- .await
- .unwrap();
- if let Some((bucket, remaining, reset)) = self.parse_headers(response) {
- if remaining <= 1 {
- // we set a lock for the bucket until the timeout passes
- redis
- .set_ex::<String, bool, ()>(
- format!("nova:rest:ratelimit:lock:{}", bucket),
- true,
- reset.try_into().unwrap(),
- )
- .await
- .unwrap();
- }
-
- redis
- .set_ex::<String, String, ()>(
- format!("nova:rest:ratelimit:url_bucket:{}", hash),
- bucket,
- reset.try_into().unwrap(),
- )
- .await
- .unwrap();
- }
- }
-}
--- /dev/null
+use self::remote_hashring::{HashRingWrapper, VNode};
+use futures_util::Future;
+use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};
+use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest;
+use shared::log::debug;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::time::UNIX_EPOCH;
+use std::time::{Duration, SystemTime};
+use tokio::sync::oneshot::{self};
+use tokio::sync::{broadcast, mpsc, RwLock};
+use tokio_stream::wrappers::ReceiverStream;
+
+mod remote_hashring;
+
+#[derive(Clone, Debug)]
+pub struct RemoteRatelimiter {
+ remotes: Arc<RwLock<HashRingWrapper>>,
+ stop: Arc<tokio::sync::broadcast::Sender<()>>,
+}
+
+impl Drop for RemoteRatelimiter {
+ fn drop(&mut self) {
+ self.stop.clone().send(()).unwrap();
+ }
+}
+
+impl RemoteRatelimiter {
+ async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> {
+ // get list of dns responses
+ let responses = dns_lookup::lookup_host("ratelimit")
+ .unwrap()
+ .into_iter()
+ .map(|f| f.to_string());
+
+ let mut write = self.remotes.write().await;
+
+ for ip in responses {
+ let a = VNode::new(ip.into()).await?;
+ write.add(a.clone());
+ }
+
+ return Ok(());
+ }
+
+ #[must_use]
+ pub fn new() -> Self {
+ let (rx, mut tx) = broadcast::channel(1);
+ let obj = Self {
+ remotes: Arc::new(RwLock::new(HashRingWrapper::default())),
+ stop: Arc::new(rx),
+ };
+
+ let obj_clone = obj.clone();
+ // Task to update the ratelimiters in the background
+ tokio::spawn(async move {
+ loop {
+ let sleep = tokio::time::sleep(Duration::from_secs(10));
+ tokio::pin!(sleep);
+
+ debug!("refreshing");
+ obj_clone.get_ratelimiters().await.unwrap();
+ tokio::select! {
+ () = &mut sleep => {
+ println!("timer elapsed");
+ },
+ _ = tx.recv() => {}
+ }
+ }
+ });
+
+ obj
+ }
+
+ pub fn ticket(
+ &self,
+ path: String,
+ ) -> Pin<
+ Box<
+ dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>>
+ + Send
+ + 'static,
+ >,
+ > {
+ let remotes = self.remotes.clone();
+ let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
+
+ Box::pin(async move {
+ // Get node managing this path
+ let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
+
+ // Buffers for the gRPC streaming channel.
+ let (send, remote) = mpsc::channel(5);
+ let (do_request, wait) = oneshot::channel();
+ // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
+ let stream = ReceiverStream::new(remote);
+
+ // Start the grpc streaming
+ let ticket = node.submit_ticket(stream).await?;
+
+ // First, send the request
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Path(path)),
+ })
+ .await?;
+
+ // We continuously listen for events in the channel.
+ tokio::spawn(async move {
+ let message = ticket.into_inner().message().await.unwrap().unwrap();
+
+ if message.accepted == 1 {
+ do_request.send(()).unwrap();
+ let headers = rx.await.unwrap();
+
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Headers(Headers {
+ precise_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("time went backwards")
+ .as_millis() as u64,
+ headers,
+ })),
+ })
+ .await
+ .unwrap();
+ }
+ });
+
+ // Wait for the message to be sent
+ wait.await?;
+
+ Ok(tx)
+ })
+ }
+}
--- /dev/null
+use core::fmt::Debug;
+use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient;
+use std::hash::Hash;
+use std::ops::Deref;
+use std::ops::DerefMut;
+use tonic::transport::Channel;
+
+#[derive(Debug, Clone)]
+pub struct VNode {
+ address: String,
+
+ client: RatelimiterClient<Channel>,
+}
+
+impl Deref for VNode {
+ type Target = RatelimiterClient<Channel>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.client
+ }
+}
+
+impl DerefMut for VNode {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.client
+ }
+}
+
+impl Hash for VNode {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.address.hash(state);
+ }
+}
+
+impl VNode {
+ pub async fn new(address: String) -> Result<Self, tonic::transport::Error> {
+ let client = RatelimiterClient::connect(format!("http://{}:8080", address.clone())).await?;
+
+ Ok(VNode { client, address })
+ }
+}
+
+unsafe impl Send for VNode {}
+
+#[repr(transparent)]
+#[derive(Default)]
+pub struct HashRingWrapper(hashring::HashRing<VNode>);
+
+impl Deref for HashRingWrapper {
+ type Target = hashring::HashRing<VNode>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for HashRingWrapper {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+impl Debug for HashRingWrapper {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_tuple("HashRing").finish()
+ }
+}
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
shared = { path = "../../libs/shared" }
+proto = { path = "../../libs/proto" }
+leash = { path = "../../libs/leash" }
+
serde = { version = "1.0.8", features = ["derive"] }
hex = "0.4.3"
serde_json = { version = "1.0" }
-libc = "0.2.101"
lazy_static = "1.4.0"
-ctor = "0.1.21"
ed25519-dalek = "1"
twilight-model = { version = "0.14" }
-rand = "0.8"
+anyhow = "1.0.68"
[[bin]]
name = "webhook"
-use serde::Deserialize;
+use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
-#[derive(Debug, Deserialize, Clone, Default)]
+use ed25519_dalek::PublicKey;
+use serde::{Deserialize, Deserializer};
+
+fn default_listening_address() -> SocketAddr {
+ SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080))
+}
+
+#[derive(Debug, Deserialize, Clone, Copy)]
pub struct ServerSettings {
- pub port: u16,
- pub address: String,
+ #[serde(default = "default_listening_address")]
+ pub listening_adress: SocketAddr,
+}
+impl Default for ServerSettings {
+ fn default() -> Self {
+ Self {
+ listening_adress: default_listening_address(),
+ }
+ }
+}
+
+fn deserialize_pk<'de, D>(deserializer: D) -> Result<PublicKey, D::Error>
+where
+ D: Deserializer<'de>,
+{
+ let str = String::deserialize(deserializer)?;
+ let public_key = PublicKey::from_bytes(&hex::decode(&str).unwrap()).unwrap();
+ Ok(public_key)
}
-#[derive(Debug, Deserialize, Clone, Default)]
+#[derive(Debug, Deserialize, Clone, Default, Copy)]
pub struct Discord {
- pub public_key: String,
+ #[serde(deserialize_with = "deserialize_pk")]
+ pub public_key: PublicKey,
pub client_id: u32,
}
-#[derive(Debug, Deserialize, Clone, Default)]
-pub struct Config {
+#[derive(Debug, Deserialize, Clone, Default, Copy)]
+pub struct WebhookConfig {
pub server: ServerSettings,
pub discord: Discord,
}
use super::error::WebhookError;
use super::signature::validate_signature;
-use crate::config::Config;
+use crate::config::WebhookConfig;
use ed25519_dalek::PublicKey;
use hyper::{
body::{to_bytes, Bytes},
service::Service,
Body, Method, Request, Response, StatusCode,
};
-use serde::{Deserialize, Serialize};
use shared::nats_crate::Client;
use shared::{
log::{debug, error},
future::Future,
pin::Pin,
str::from_utf8,
- sync::Arc,
task::{Context, Poll},
};
-use twilight_model::gateway::event::{DispatchEvent};
+use twilight_model::gateway::event::DispatchEvent;
use twilight_model::{
application::interaction::{Interaction, InteractionType},
gateway::payload::incoming::InteractionCreate,
/// Hyper service used to handle the discord webhooks
#[derive(Clone)]
-pub struct HandlerService {
- pub config: Arc<Config>,
- pub nats: Arc<Client>,
- pub public_key: Arc<PublicKey>,
+pub struct WebhookService {
+ pub config: WebhookConfig,
+ pub nats: Client,
}
-impl HandlerService {
- async fn check_request(&self, req: Request<Body>) -> Result<Bytes, WebhookError> {
+impl WebhookService {
+ async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {
if req.method() == Method::POST {
let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {
sig.to_owned()
let data = to_bytes(req.into_body()).await?;
if validate_signature(
- &self.public_key,
+ &pk,
&[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),
signature.to_str()?,
) {
}
async fn process_request(
- &mut self,
req: Request<Body>,
+ nats: Client,
+ pk: PublicKey,
) -> Result<Response<Body>, WebhookError> {
- match self.check_request(req).await {
+ match Self::check_request(req, pk).await {
Ok(data) => {
let utf8 = from_utf8(&data);
match utf8 {
match value.kind {
InteractionType::Ping => Ok(Response::builder()
.header("Content-Type", "application/json")
- .body(serde_json::to_string(&Ping { t: 1 }).unwrap().into())
+ .body(r#"{"t":1}"#.into())
.unwrap()),
_ => {
debug!("calling nats");
let payload = serde_json::to_string(&data).unwrap();
- match self.nats.request(
- "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
- Bytes::from(payload),
- ).await {
+ match nats
+ .request(
+ "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
+ Bytes::from(payload),
+ )
+ .await
+ {
Ok(response) => Ok(Response::builder()
.header("Content-Type", "application/json")
.body(Body::from(response.reply.unwrap()))
}
}
-#[derive(Debug, Serialize, Deserialize)]
-pub struct Ping {
- #[serde(rename = "type")]
- t: i32,
-}
-
/// Implementation of the service
-impl Service<Request<Body>> for HandlerService {
- type Response = Response<Body>;
+impl Service<hyper::Request<Body>> for WebhookService {
+ type Response = hyper::Response<Body>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
- let mut clone = self.clone();
+ let future =
+ Self::process_request(req, self.nats.clone(), self.config.discord.public_key);
Box::pin(async move {
- let response = clone.process_request(req).await;
+ let response = future.await;
match response {
Ok(r) => Ok(r),
-use super::handler::HandlerService;
-use crate::config::Config;
use hyper::service::Service;
-use shared::nats_crate::Client;
use std::{
future::{ready, Ready},
- sync::Arc,
task::{Context, Poll},
};
-use ed25519_dalek::PublicKey;
-pub struct MakeSvc {
- pub settings: Arc<Config>,
- pub nats: Arc<Client>,
- pub public_key: Arc<PublicKey>
+pub struct MakeSvc<T: Clone> {
+ pub service: T,
}
-impl<T> Service<T> for MakeSvc {
- type Response = HandlerService;
+impl<T, V: Clone> Service<T> for MakeSvc<V> {
+ type Response = V;
type Error = std::io::Error;
type Future = Ready<Result<Self::Response, Self::Error>>;
}
fn call(&mut self, _: T) -> Self::Future {
- ready(Ok(HandlerService {
- config: self.settings.clone(),
- nats: self.nats.clone(),
- public_key: self.public_key.clone()
- }))
+ ready(Ok(self.service.clone()))
+ }
+}
+
+impl<T: Clone> MakeSvc<T> {
+ pub fn new(service: T) -> Self {
+ Self { service }
}
}
mod error;
-mod handler;
+pub mod handler;
pub mod make_service;
mod signature;
fn validate_signature_test() {
let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002";
let content = "message de test incroyable".as_bytes().to_vec();
- let public_key = PublicKey::from_bytes(&hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap()).unwrap();
+ let public_key = PublicKey::from_bytes(
+ &hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap(),
+ )
+ .unwrap();
assert!(validate_signature(&public_key, &content, signature))
}
#[test]
fn validate_signature_reverse_test() {
let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002";
- let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap();
+ let public_key = PublicKey::from_bytes(
+ &hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(),
+ )
+ .unwrap();
let content = "ceci est un test qui ne fonctionnera pas!"
.as_bytes()
#[test]
fn invalid_hex() {
let signature = "zzz";
- let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap();
+ let public_key = PublicKey::from_bytes(
+ &hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(),
+ )
+ .unwrap();
let content = "ceci est un test qui ne fonctionnera pas!"
.as_bytes()
.to_vec();
assert!(!validate_signature(&public_key, &content, signature))
-}
\ No newline at end of file
+}
-use std::{net::ToSocketAddrs, sync::Arc};
mod config;
mod handler;
-use crate::handler::make_service::MakeSvc;
+use std::{future::Future, pin::Pin};
-use crate::config::Config;
-use ed25519_dalek::PublicKey;
+use crate::{
+ config::WebhookConfig,
+ handler::{handler::WebhookService, make_service::MakeSvc},
+};
use hyper::Server;
-use shared::config::Settings;
-use shared::log::{error, info};
+use leash::{ignite, AnyhowResultFuture, Component};
+use shared::{config::Settings, log::info, nats_crate::Client};
-#[tokio::main]
-async fn main() {
- let settings: Settings<Config> = Settings::new("webhook").unwrap();
- start(settings).await;
-}
+#[derive(Clone, Copy)]
+struct WebhookServer {}
+
+impl Component for WebhookServer {
+ type Config = WebhookConfig;
+ const SERVICE_NAME: &'static str = "webhook";
+
+ fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+ Box::pin(async move {
+ info!("Starting server on {}", settings.server.listening_adress);
+
+ let bind = settings.server.listening_adress;
+ let nats =
+ Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats)
+ .await?;
+
+ let make_service = MakeSvc::new(WebhookService {
+ config: settings.config,
+ nats: nats.clone(),
+ });
-async fn start(settings: Settings<Config>) {
- let addr = format!(
- "{}:{}",
- settings.config.server.address, settings.config.server.port
- )
- .to_socket_addrs()
- .unwrap()
- .next()
- .unwrap();
-
- info!(
- "Starting server on {}:{}",
- settings.config.server.address, settings.config.server.port
- );
-
- let config = Arc::new(settings.config);
- let public_key =
- Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap());
- let server = Server::bind(&addr).serve(MakeSvc {
- settings: config,
- nats: Arc::new(settings.nats.to_client().await.unwrap()),
- public_key: public_key,
- });
-
- if let Err(e) = server.await {
- error!("server error: {}", e);
+ let server = Server::bind(&bind).serve(make_service);
+
+ server.await?;
+
+ Ok(())
+ })
+ }
+
+ fn new() -> Self {
+ Self {}
}
}
+
+ignite!(WebhookServer);
--- /dev/null
+[package]
+name = "leash"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+shared = { path = "../shared" }
+anyhow = "1.0.68"
+tokio = { version = "1.23.0", features = ["full"] }
+
+serde = "1.0.152"
\ No newline at end of file
--- /dev/null
+use anyhow::Result;
+use serde::de::DeserializeOwned;
+use shared::config::Settings;
+use std::{future::Future, pin::Pin};
+
+pub type AnyhowResultFuture<T> = Pin<Box<dyn Future<Output = Result<T>>>>;
+pub trait Component: Send + Sync + 'static + Sized {
+ type Config: Default + Clone + DeserializeOwned;
+
+ const SERVICE_NAME: &'static str;
+ fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()>;
+ fn new() -> Self;
+
+ fn _internal_start(self) -> AnyhowResultFuture<()> {
+ Box::pin(async move {
+ let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME);
+
+ // Start the grpc healthcheck
+ tokio::spawn(async move {});
+
+ // Start the prometheus monitoring job
+ tokio::spawn(async move {});
+
+ self.start(settings?).await
+ })
+ }
+}
+
+#[macro_export]
+macro_rules! ignite {
+ ($c:ty) => {
+ #[allow(dead_code)]
+ fn main() -> anyhow::Result<()> {
+ let rt = tokio::runtime::Runtime::new()?;
+ rt.block_on(Box::new(<$c as Component>::new())._internal_start())?;
+ Ok(())
+ }
+ };
+}
+
+#[cfg(test)]
+mod test {
+ use serde::Deserialize;
+
+ use crate::Component;
+
+ #[derive(Clone, Copy)]
+ struct TestComponent {}
+
+ #[derive(Default, Clone, Deserialize, Copy)]
+ struct TestComponentConfig {}
+
+ impl Component for TestComponent {
+ type Config = TestComponentConfig;
+ const SERVICE_NAME: &'static str = "test_component";
+
+ fn start(
+ &self,
+ _settings: shared::config::Settings<Self::Config>,
+ ) -> crate::AnyhowResultFuture<()> {
+ Box::pin(async move { Ok(()) })
+ }
+
+ fn new() -> Self {
+ Self {}
+ }
+ }
+
+ ignite!(TestComponent);
+}
edition = "2018"
[dependencies]
+tonic = "0.8.3"
+prost = "0.11.5"
+
+[build-dependencies]
+tonic-build = "0.8.4"
+glob = "0.3.0"
\ No newline at end of file
--- /dev/null
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+ let paths: Vec<String> = glob::glob("../../proto/nova/**/*.proto")?
+ .map(|f| f.unwrap().to_str().unwrap().to_string())
+ .collect();
+
+ tonic_build::configure()
+ .include_file("genproto.rs")
+ .compile(&paths, &["../../proto"])?;
+
+ Ok(())
+}
+include!(concat!(env!("OUT_DIR"), concat!("/", "genproto.rs")));
serde_json = { version = "1.0" }
thiserror = "1.0.38"
inner = "0.1.1"
+anyhow = "1.0.68"
[dependencies.redis]
version = "*"
-use std::env;
+use std::{env, ops::Deref};
use config::{Config, Environment, File};
use log::info;
-use serde::Deserialize;
+use serde::{Deserialize, de::DeserializeOwned};
use crate::error::GenericError;
-
-/// Settings<T> is the base structure for all the nova's component config
-/// you can specify a type T and the name of the component. the "config"
-/// field will be equals to the key named after the given component name
-/// and will be of type T
#[derive(Debug, Deserialize, Clone)]
-#[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))]
-pub struct Settings<T> {
+pub struct Settings<T: Clone + DeserializeOwned + Default> {
#[serde(skip_deserializing)]
pub config: T,
pub monitoring: crate::monitoring::MonitoringConfiguration,
pub redis: crate::redis::RedisConfiguration,
}
-///
-impl<T> Settings<T>
-where
- T: Deserialize<'static> + std::default::Default + Clone,
+impl<'de, T: Clone + DeserializeOwned + Default> Settings<T>
{
-
- /// Initializes a new configuration like the other components of nova
- /// And starts the prometheus metrics server if needed.
pub fn new(service_name: &str) -> Result<Settings<T>, GenericError> {
- pretty_env_logger::init();
-
let mut builder = Config::builder();
- // this file my be shared with all the components
builder = builder.add_source(File::with_name("config/default"));
let mode = env::var("ENV").unwrap_or_else(|_| "development".into());
info!("Configuration Environment: {}", mode);
// try to load the config
settings.config = config.get::<T>(service_name)?;
-
- // start the monitoring system if needed
- crate::monitoring::start_monitoring(&settings.monitoring);
+
Ok(settings)
}
}
-pub fn test_init() {
- pretty_env_logger::init();
-}
+impl<T: Clone + DeserializeOwned + Default> Deref for Settings<T> {
+ type Target = T;
+
+ fn deref(&self) -> &Self::Target {
+ &self.config
+ }
+}
\ No newline at end of file
+use std::{future::Future, pin::Pin};
+
use async_nats::Client;
use serde::Deserialize;
-use std::future::Future;
-
-use crate::error::GenericError;
#[derive(Clone, Debug, Deserialize)]
pub struct NatsConfigurationClientCert {
pub host: String,
}
-// todo: Prefer From since it automatically gives a free Into implementation
-// Allows the configuration to directly create a nats connection
-impl NatsConfiguration {
- pub async fn to_client(self) -> Result<Client, GenericError> {
- Ok(async_nats::connect(self.host).await?)
+impl From<NatsConfiguration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>>>> {
+ fn from(value: NatsConfiguration) -> Self {
+ Box::pin(async move { Ok(async_nats::connect(value.host).await?) })
}
-}
\ No newline at end of file
+}
-use redis::Client;
+use redis::{aio::MultiplexedConnection, Client};
use serde::Deserialize;
-
+use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, Deserialize)]
pub struct RedisConfiguration {
redis::Client::open(self.url).unwrap()
}
}
+
+impl From<RedisConfiguration>
+ for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>>>>
+{
+ fn from(value: RedisConfiguration) -> Self {
+ Box::pin(async move {
+ let con = Client::open(value.url)?;
+ let (multiplex, ready) = con.create_multiplexed_tokio_connection().await?;
+
+ tokio::spawn(ready);
+
+ Ok(multiplex)
+ })
+ }
+}
+++ /dev/null
-syntax = "proto3";
-package nova.management.v1alpha;
-
-message Empty {}
-
-// Represents the status of a shard
-enum ShardStatus {
- DISCONNECTED = 0;
- RUNNING = 1;
- RECONNECTING = 2;
-}
-
-// represents the state of a nova shard
-message ShardStatusResponse {
- // Status of the shard in the cluster
- ShardStatus status = 1;
- // Index of the discord shard
- int64 identifier = 2;
- // If the cluster have a node assigned
- string cluster = 3;
- // the websocket latency of the shard
- int64 latency = 4;
-}
-
-message ShardStatusRequest {
- // the id of the shard
- int64 identifier = 1;
-}
-
-// represents the status of a cluster
-// (an instance of the gateway which holds multiple shards)
-message ClusterStatusResponse {
- // the unique id of the cluster
- string id = 1;
- // the node the cluster is running on
- string node = 2;
- // the average latency of the cluster
- int64 average_latency = 3;
- // list of all the shards on the cluster
- repeated ShardStatusResponse shards = 4;
-}
-
-message ClusterStatusRequest {
- string id = 1;
-}
-
-// Represents the status of all the nova clusters & shards
-message GlobalClusterStatusResponse {
- int64 size = 1;
- repeated ClusterStatusResponse shards = 2;
-}
-
-// used by the cli to interact with the nova manager
-service ManagementService {
- rpc GetGlobalClusterStatus (Empty) returns (GlobalClusterStatusResponse);
- rpc GetClusterStatus (ClusterStatusRequest) returns (ClusterStatusResponse);
- rpc GetShardStatus (ShardStatusRequest) returns (ShardStatusResponse);
-}
\ No newline at end of file
+++ /dev/null
-syntax = "proto3";
-
-import "common/management/nova.management.v1alpha.proto";
-package nova.management.rpc.v1alpha;
-
--- /dev/null
+syntax = "proto3";
+
+package nova.ratelimit.ratelimiter;
+
+service Ratelimiter {
+ rpc GetBucketInformation(BucketInformationRequest) returns (BucketInformationResponse);
+ rpc SubmitTicket(stream BucketSubmitTicketRequest) returns (stream BucketSubmitTicketResponse);
+}
+
+message BucketInformationRequest {
+ string path = 1;
+}
+
+message BucketInformationResponse {
+ uint64 limit = 1;
+ uint64 remaining = 2;
+ uint64 reset_after = 3;
+ uint64 started_at = 4;
+}
+
+
+message BucketSubmitTicketRequest {
+ oneof data {
+ string path = 1;
+ Headers headers = 2;
+ }
+
+ message Headers {
+ map<string, string> headers = 1;
+ uint64 precise_time = 2;
+ }
+
+}
+
+message BucketSubmitTicketResponse {
+ int64 accepted = 1;
+}