"webhook",
]
+[[package]]
+name = "anes"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
+
[[package]]
name = "anyhow"
version = "1.0.68"
"twilight-model",
]
+[[package]]
+name = "cast"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
+
[[package]]
name = "cbindgen"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+[[package]]
+name = "ciborium"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b0c137568cc60b904a7724001b35ce2630fd00d5d84805fbb608ab89509d788f"
+dependencies = [
+ "ciborium-io",
+ "ciborium-ll",
+ "serde",
+]
+
+[[package]]
+name = "ciborium-io"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "346de753af073cc87b52b2083a506b38ac176a44cfb05497b622e27be899b369"
+
+[[package]]
+name = "ciborium-ll"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "213030a2b5a4e0c0892b6652260cf6ccac84827b83a85a534e178e3906c4cf1b"
+dependencies = [
+ "ciborium-io",
+ "half",
+]
+
[[package]]
name = "clap"
version = "3.2.23"
"cfg-if",
]
+[[package]]
+name = "criterion"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb"
+dependencies = [
+ "anes",
+ "atty",
+ "cast",
+ "ciborium",
+ "clap",
+ "criterion-plot",
+ "futures",
+ "itertools",
+ "lazy_static",
+ "num-traits",
+ "oorandom",
+ "plotters",
+ "rayon",
+ "regex",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "tinytemplate",
+ "tokio",
+ "walkdir",
+]
+
+[[package]]
+name = "criterion-plot"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
+dependencies = [
+ "cast",
+ "itertools",
+]
+
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
"crossbeam-utils",
]
+[[package]]
+name = "crossbeam-deque"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc"
+dependencies = [
+ "cfg-if",
+ "crossbeam-epoch",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.9.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a"
+dependencies = [
+ "autocfg",
+ "cfg-if",
+ "crossbeam-utils",
+ "memoffset",
+ "scopeguard",
+]
+
[[package]]
name = "crossbeam-utils"
version = "0.8.14"
"tracing",
]
+[[package]]
+name = "half"
+version = "1.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
+
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
+[[package]]
+name = "memoffset"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
+dependencies = [
+ "autocfg",
+]
+
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
+[[package]]
+name = "oorandom"
+version = "11.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
+
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
+[[package]]
+name = "plotters"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2538b639e642295546c50fcd545198c9d64ee2a38620a628724a3b266d5fbf97"
+dependencies = [
+ "num-traits",
+ "plotters-backend",
+ "plotters-svg",
+ "wasm-bindgen",
+ "web-sys",
+]
+
+[[package]]
+name = "plotters-backend"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "193228616381fecdc1224c62e96946dfbc73ff4384fba576e052ff8c1bea8142"
+
+[[package]]
+name = "plotters-svg"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9a81d2759aae1dae668f783c308bc5c8ebd191ff4184aaa1b37f65a6ae5a56f"
+dependencies = [
+ "plotters-backend",
+]
+
[[package]]
name = "ppv-lite86"
version = "0.2.17"
version = "0.1.0"
dependencies = [
"anyhow",
+ "criterion",
+ "env_logger 0.7.1",
"hyper",
"leash",
"opentelemetry",
"redis",
"serde",
"shared",
+ "test-log",
"tokio",
"tokio-stream",
+ "tokio-test",
"tonic",
"tracing",
"tracing-opentelemetry",
+ "tracing-subscriber",
+ "tracing-test",
"twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",
]
+[[package]]
+name = "rayon"
+version = "1.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6db3a213adf02b3bcfd2d3846bb41cb22857d131789e01df434fb7e7bc0759b7"
+dependencies = [
+ "either",
+ "rayon-core",
+]
+
+[[package]]
+name = "rayon-core"
+version = "1.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cac410af5d00ab6884528b4ab69d1e8e146e8d471201800fa1b4524126de6ad3"
+dependencies = [
+ "crossbeam-channel",
+ "crossbeam-deque",
+ "crossbeam-utils",
+ "num_cpus",
+]
+
[[package]]
name = "redis"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"
+[[package]]
+name = "same-file"
+version = "1.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
+dependencies = [
+ "winapi-util",
+]
+
[[package]]
name = "schannel"
version = "0.1.20"
"winapi-util",
]
+[[package]]
+name = "test-log"
+version = "0.2.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38f0c854faeb68a048f0f2dc410c5ddae3bf83854ef0e4977d58306a5edef50e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "textwrap"
version = "0.16.0"
"time-core",
]
+[[package]]
+name = "tinytemplate"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc"
+dependencies = [
+ "serde",
+ "serde_json",
+]
+
[[package]]
name = "tinyvec"
version = "1.6.0"
"tokio",
]
+[[package]]
+name = "tokio-test"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
+dependencies = [
+ "async-stream",
+ "bytes",
+ "futures-core",
+ "tokio",
+ "tokio-stream",
+]
+
[[package]]
name = "tokio-tungstenite"
version = "0.17.2"
"tracing-log",
]
+[[package]]
+name = "tracing-test"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e3d272c44878d2bbc9f4a20ad463724f03e19dbc667c6e84ac433ab7ffcc70b"
+dependencies = [
+ "lazy_static",
+ "tracing-core",
+ "tracing-subscriber",
+ "tracing-test-macro",
+]
+
+[[package]]
+name = "tracing-test-macro"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "744324b12d69a9fc1edea4b38b7b1311295b662d161ad5deac17bb1358224a08"
+dependencies = [
+ "lazy_static",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
+[[package]]
+name = "walkdir"
+version = "2.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
+dependencies = [
+ "same-file",
+ "winapi",
+ "winapi-util",
+]
+
[[package]]
name = "want"
version = "0.3.0"
--- /dev/null
+# Nova architecture
+
+The nova architecture is composed of multiple components. Each of them is horizontally scale-able.
+
+```
+ ┌──────────────────┐
+ │ │
+ ┌─────────────┤ Discord API ├──────────────┐
+ │ │ │ │
+ │ └────────┬─────────┘ │
+ │ │ │
+ │ │ │
+ │ │ │
+ ┌─────────┴────────┐ ┌────────┴─────────┐ ┌─────────┴────────┐
+ │ │ │ │ │ │
+ │ Rest Proxy │ │ Gateway client │ │ Webhook Server │
+ │ │ │ │ │ │
+ └─────────┬──┬─────┘ └────────┬─────────┘ └─────────┬────────┘
+ │ │ │ │
+ │ │ │ │
+ │ │ │ │
+ │ │ │ │
+ │ │ │ │
+ │ │ │ │
+ │ └───────┐ │ │
+┌────────────────┐ ┌────────┴───────┐ │ ┌───────┴────────┐ │
+│ │ │ │ │ │ ├───────────────┘
+│ Redis ├───┤ Ratelimit │ │ │ Nats broker │
+│ │ │ │ │ │ ├──────────────────┐
+└────────────────┘ └────────────────┘ │ └───────┬────────┘ │
+ │ │ │
+ │ │ │
+ │ ┌───────┴────────┐ ┌──────┴─────┐
+ │ │ │ │ │
+ │ │ Cache manager ├───────────┤ User │
+ │ │ │ │ │
+ │ └────────────────┘ └──────┬─────┘
+ └───────────────────────────────────────┘
+```
+
+## Rest Proxy
+
# 5 Minutes quickstart
-This page shows you how to start a new nova project
-under five minutes using a typescript project,
-hold tight this is going to be fast.
-
-## Requirements
-
-* A discord bot application available
-* [Docker](https://docker.io/) (or alternatives) available to you.
-* A domain name / [ngrok.io](https://ngrok.com/) domain (for webhooks only)
-
-> If you are deploying nova to production, consider following the
-> production guide instead.
-
-## Setting up a nova instance
-
-Clone the [example repo](https://github.com/discordnova/nova-quickstart.git) like so
-
-`git clone https://github.com/discordnova/nova-quickstart.git`,
-
-In this folder, find the `config.yml.example` file and rename it to match `config.yml`
-
-Next, you need to fill all the environment variables to match your discord bot.
+++ /dev/null
-clean:
- rm ./build/*
-
-library:
- cargo build --release
-
-build: library
- cp ../../target/release/liball_in_one.a ./build
- go build -a -ldflags '-s' -o build/all-in-one
-
-all: library build
-
-run: all-in-one
- ./build/all-in-one
-
+++ /dev/null
-package main
-
-/*
-#cgo LDFLAGS: -L./build -lall_in_one -lz -lm
-#include "./build/all-in-one.h"
-*/
-import "C"
-
-import (
- "fmt"
- "os"
- "os/signal"
- "syscall"
- "time"
-
- "github.com/Jeffail/gabs"
- "github.com/alicebob/miniredis/v2"
-
- server "github.com/nats-io/nats-server/v2/server"
-)
-
-func main() {
- // Intialise les logs de la librarie Rust
- C.init_logs()
- // Charge la configuration
- str := C.GoString(C.load_config())
-
- // Démarre une instance MiniRedis
- mr := miniredis.NewMiniRedis()
- err := mr.Start()
-
- if err != nil {
- panic(err)
- }
-
- // Démarre un serveur Nats
- opts := &server.Options{}
- opts.Host = "0.0.0.0"
- ns, err := server.NewServer(opts)
-
- if err != nil {
- panic(err)
- }
-
- go ns.Start()
-
- if !ns.ReadyForConnections(4 * time.Second) {
- panic("not ready for connection")
- }
-
- // Edite le json de configuration donné
- // Et injecte la configuration des servers Nats et MiniRedis
- json, _ := gabs.ParseJSON([]byte(str))
- json.Set(fmt.Sprintf("redis://%s", mr.Addr()), "redis", "url")
- json.Set("localhost", "nats", "host")
- json.Set(1, "webhook", "discord", "client_id")
-
- // Démarre une instance de nova
- instance := C.start_instance(C.CString(json.String()))
-
- // Wait for a SIGINT
- c := make(chan os.Signal, 1)
- signal.Notify(c,
- syscall.SIGHUP,
- syscall.SIGINT,
- syscall.SIGTERM,
- syscall.SIGQUIT)
- <-c
-
- println("Arret de nova all in one")
- C.stop_instance(instance)
-}
use std::cell::RefCell;
use anyhow::Result;
-use libc::c_int;
use tracing::error;
thread_local! {
}
/// Update the most recent error, clearing whatever may have been there before.
-pub fn stacktrace(err: anyhow::Error) -> String {
+#[must_use] pub fn stacktrace(err: &anyhow::Error) -> String {
format!("{err}")
}
Ok(ok) => Some(ok),
Err(error) => {
// Call the handler
- handle_error(error);
+ handle_error(&error);
None
}
}
}
-pub fn handle_error(error: anyhow::Error) {
+/// # Panics
+/// Panics if the stacktrace size is > than an i32
+pub fn handle_error(error: &anyhow::Error) {
ERROR_HANDLER.with(|val| {
let mut stacktrace = stacktrace(error);
// Call the error handler
unsafe {
func(
- stacktrace.len() as c_int + 1,
- stacktrace.as_mut_ptr() as *mut i8,
+ (stacktrace.len() + 1).try_into().unwrap(),
+ stacktrace.as_mut_ptr().cast::<i8>(),
);
}
}
});
}
+/// # Panics
+/// Panics if an incorrect `RUST_LOG` variables is specified.
#[no_mangle]
pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneInstance {
+ // Returning a null pointer (unaligned) is expected.
+ #[allow(clippy::cast_ptr_alignment)]
wrap_result(move || {
let value = CString::from_raw(config);
let json = value.to_str()?;
.with(fmt::layer())
.with(
EnvFilter::builder()
- .with_default_directive(Directive::from_str("info").unwrap())
+ .with_default_directive(Directive::from_str("info").expect(""))
.from_env()
.unwrap(),
)
// Error handling task
runtime.spawn(async move {
while let Some(error) = errors.recv().await {
- handle_error(error)
+ handle_error(&error);
}
});
+#![deny(
+ clippy::all,
+ clippy::correctness,
+ clippy::suspicious,
+ clippy::style,
+ clippy::complexity,
+ clippy::perf,
+ clippy::pedantic,
+ clippy::nursery,
+)]
+
pub mod errors;
pub mod ffi;
pub mod utils;
name: &str,
) -> Result<Settings<T>> {
let value: Value = serde_json::from_str(settings)?;
- let section: T = serde_json::from_value(value.get(name).unwrap().to_owned())?;
+ let section: T = serde_json::from_value(value.get(name).unwrap().clone())?;
let mut settings: Settings<T> = serde_json::from_value(value)?;
settings.config = section;
let mode = std::env::var("ENV").unwrap_or_else(|_| "development".into());
info!("Configuration Environment: {}", mode);
- builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false));
+ builder = builder.add_source(File::with_name(&format!("config/{mode}")).required(false));
builder = builder.add_source(File::with_name("config/local").required(false));
let env = Environment::with_prefix("NOVA").separator("__");
name = "cache"
version = "0.1.0"
edition = "2018"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+description = "Stores the data from discord if needed"
+readme = "README.md"
+repository = "https://github.com/discordnova/nova.git"
+keywords = ["discord", "scaleable", "cache"]
+categories = ["microservices", "nova"]
+license = "APACHE2"
[dependencies]
shared = { path = "../../libs/shared" }
--- /dev/null
+# Cache
+
+Stores the data from discord if needed
\ No newline at end of file
use twilight_gateway::Intents;
#[derive(Serialize, Deserialize, Clone)]
-pub struct GatewayConfig {
+pub struct Gateway {
pub token: String,
pub intents: Intents,
pub shard: u64,
pub shard_total: u64,
}
-impl Default for GatewayConfig {
+impl Default for Gateway {
fn default() -> Self {
Self {
intents: Intents::empty(),
+#![deny(
+ clippy::all,
+ clippy::correctness,
+ clippy::suspicious,
+ clippy::style,
+ clippy::complexity,
+ clippy::perf,
+ clippy::pedantic,
+ clippy::nursery,
+ unsafe_code
+)]
+#![allow(clippy::redundant_pub_crate)]
use async_nats::{Client, HeaderMap, HeaderValue};
-use config::GatewayConfig;
+use config::Gateway;
use leash::{AnyhowResultFuture, Component};
use opentelemetry::{global, propagation::Injector};
use shared::{
use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_gateway::{Event, Shard};
pub mod config;
-use tracing::{debug, info, trace_span};
+use tracing::{debug, error, info, trace_span};
use twilight_model::gateway::event::DispatchEvent;
struct MetadataMap<'a>(&'a mut HeaderMap);
impl<'a> Injector for MetadataMap<'a> {
fn set(&mut self, key: &str, value: String) {
- self.0.insert(key, HeaderValue::from_str(&value).unwrap())
+ self.0.insert(key, HeaderValue::from_str(&value).unwrap());
}
}
pub struct GatewayServer {}
impl Component for GatewayServer {
- type Config = GatewayConfig;
+ type Config = Gateway;
const SERVICE_NAME: &'static str = "gateway";
fn start(
mut stop: oneshot::Receiver<()>,
) -> AnyhowResultFuture<()> {
Box::pin(async move {
- let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
+ let (shard, mut events) = Shard::builder(settings.token.clone(), settings.intents)
.shard(settings.shard, settings.shard_total)?
.build();
loop {
select! {
event = events.next() => {
-
if let Some(event) = event {
- match event {
- Event::Ready(ready) => {
- info!("Logged in as {}", ready.user.name);
- },
-
- _ => {
-
- let name = event.kind().name();
- if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
- debug!("handling event {}", name.unwrap());
-
- let data = CachePayload {
- data: DispatchEventTagged {
- data: dispatch_event,
- },
- };
- let value = serde_json::to_string(&data)?;
- let bytes = bytes::Bytes::from(value);
-
- let span = trace_span!("nats send");
-
- let mut header_map = HeaderMap::new();
- let context = span.context();
- global::get_text_map_propagator(|propagator| {
- propagator.inject_context(&context, &mut MetadataMap(&mut header_map))
- });
-
- nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes)
- .await?;
- }
- }
- }
+ let _ = handle_event(event, &nats)
+ .await
+ .map_err(|err| error!(error = ?err, "event publish failed"));
} else {
break
}
Self {}
}
}
+
+async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
+ if let Event::Ready(ready) = event {
+ info!("Logged in as {}", ready.user.name);
+ } else {
+ let name = event.kind().name();
+ if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+ debug!("handling event {}", name.unwrap());
+
+ let data = CachePayload {
+ data: DispatchEventTagged {
+ data: dispatch_event,
+ },
+ };
+ let value = serde_json::to_string(&data)?;
+ let bytes = bytes::Bytes::from(value);
+
+ let span = trace_span!("nats send");
+
+ let mut header_map = HeaderMap::new();
+ let context = span.context();
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&context, &mut MetadataMap(&mut header_map));
+ });
+
+ nats.publish_with_headers(
+ format!("nova.cache.dispatch.{}", name.unwrap()),
+ header_map,
+ bytes,
+ )
+ .await?;
+ }
+ }
+
+ Ok(())
+}
tokio-stream = "0.1.11"
-redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
\ No newline at end of file
+redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
+
+[dev-dependencies]
+criterion = { version = "0.4", features = ["async_tokio"] }
+tokio-test = "*"
+tracing-test = "0.2.3"
+tracing-subscriber = "*"
+env_logger = "*"
+test-log = { version = "0.2.11", features = ["log", "trace"] }
+
+[[bench]]
+name = "bucket"
+harness = false
--- /dev/null
+use std::ops::Add;
+use std::time::{Duration, SystemTime};
+
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
+use ratelimit::buckets::bucket::Bucket;
+use tokio::runtime::Runtime;
+use twilight_http_ratelimiting::RatelimitHeaders;
+
+pub fn acquire_ticket(c: &mut Criterion) {
+ let rt = Runtime::new().unwrap();
+
+ let bucket = rt.block_on(async move {
+ let bucket = Bucket::new();
+
+ let mreset = SystemTime::now()
+ .add(Duration::from_secs(3600))
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis()
+ .to_string();
+ let headers = [
+ (
+ "x-ratelimit-bucket",
+ "d721dea6054f6322373d361f98e5c38b".as_bytes(),
+ ),
+ ("x-ratelimit-limit", "100".as_bytes()),
+ ("x-ratelimit-remaining", "1".as_bytes()),
+ ("x-ratelimit-reset", mreset.as_bytes()),
+ ("x-ratelimit-reset-after", "100000.000".as_bytes()),
+ ];
+ if let RatelimitHeaders::Present(present) =
+ RatelimitHeaders::from_pairs(headers.into_iter()).unwrap()
+ {
+ bucket.update(
+ &present,
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64,
+ );
+ }
+ bucket
+ });
+
+ let size: usize = 1024;
+ c.bench_with_input(BenchmarkId::new("input_example", size), &size, |b, _| {
+ // Insert a call to `to_async` to convert the bencher to async mode.
+ // The timing loops are the same as with the normal bencher.
+ b.to_async(&rt).iter(|| async {
+ bucket.ticket().await.unwrap();
+ });
+ });
+}
+
+criterion_group!(benches, acquire_ticket);
+criterion_main!(benches);
time::{Duration, SystemTime, UNIX_EPOCH},
};
+use tracing::debug;
+
#[derive(Default, Debug)]
pub struct AtomicInstant(AtomicU64);
impl AtomicInstant {
+ #[must_use]
pub const fn empty() -> Self {
Self(AtomicU64::new(0))
}
pub fn elapsed(&self) -> Duration {
+ // Truncation is expected
+ #[allow(clippy::cast_possible_truncation)]
Duration::from_millis(
SystemTime::now()
.duration_since(UNIX_EPOCH)
- .unwrap()
+ .expect("time went backwards")
.as_millis() as u64
- - self.0.load(Ordering::SeqCst),
+ - self.0.load(Ordering::Relaxed),
)
}
pub fn as_millis(&self) -> u64 {
- self.0.load(Ordering::SeqCst)
+ self.0.load(Ordering::Relaxed)
}
pub fn set_millis(&self, millis: u64) {
- self.0.store(millis, Ordering::SeqCst);
+ // get address of struct
+ let b = self as *const _ as usize;
+ debug!(millis, this = ?b, "settings instant millis");
+ self.0.store(millis, Ordering::Relaxed);
}
pub fn is_empty(&self) -> bool {
- self.as_millis() == 0
+ let millis = self.as_millis();
+ // get address of struct
+ let b = self as *const _ as usize;
+ debug!(millis, this = ?b, "settings instant millis");
+ debug!(empty = (millis == 0), millis, this = ?b, "instant empty check");
+ millis == 0
}
}
+use super::{async_queue::AsyncQueue, atomic_instant::AtomicInstant};
use std::{
sync::{
atomic::{AtomicU64, Ordering},
use tracing::{debug, trace};
use twilight_http_ratelimiting::headers::Present;
-use super::{async_queue::AsyncQueue, atomic_instant::AtomicInstant, redis_lock::RedisLock};
-
#[derive(Clone, Debug)]
pub enum TimeRemaining {
Finished,
Some(Duration),
}
+/// A bucket is a simple atomic implementation of a bucket used for ratelimiting
+/// It can be updated dynamically depending on the discord api responses.
+///
+/// # Usage
+/// ```
+/// use ratelimit::buckets::bucket::Bucket;
+/// use twilight_http_ratelimiting::RatelimitHeaders;
+/// use std::time::SystemTime;
+///
+/// let bucket = Bucket::new();
+///
+/// // Feed the headers informations into the bucket to update it
+/// let headers = [
+/// ( "x-ratelimit-bucket", "bucket id".as_bytes()),
+/// ("x-ratelimit-limit", "100".as_bytes()),
+/// ("x-ratelimit-remaining", "0".as_bytes()),
+/// ("x-ratelimit-reset", "".as_bytes()),
+/// ("x-ratelimit-reset-after", "10.000".as_bytes()),
+/// ];
+///
+/// // Parse the headers
+/// let present = if let Ok(RatelimitHeaders::Present(present)) = RatelimitHeaders::from_pairs(headers.into_iter()) { present } else { todo!() };
+///
+/// // this should idealy the time of the request
+/// let current_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
+///
+/// bucket.update(present, current_time).await;
+///
+/// ```
+///
+/// # Async
+/// You need to call this struct new method in a tokio 1.x async runtime.
#[derive(Debug)]
pub struct Bucket {
+ /// Limits of tickets that can be accepted
pub limit: AtomicU64,
- /// Queue associated with this bucket.
- pub queue: AsyncQueue,
- /// Number of tickets remaining.
+ /// Remaining requests that can be executed
pub remaining: AtomicU64,
- /// Duration after the [`Self::last_update`] time the bucket will refresh.
+ /// Time to wait after [`Self::last_update`] before accepting new tickets.
pub reset_after: AtomicU64,
- /// When the bucket's ratelimit refresh countdown started (unix millis)
+ /// Last update got from the discord upstream
pub last_update: AtomicInstant,
- pub tasks: Vec<JoinHandle<()>>,
+ /// List of tasks that dequeue tasks from [`Self::queue`]
+ tasks: Vec<JoinHandle<()>>,
+ /// Queue of tickets to be processed.
+ queue: AsyncQueue,
}
impl Drop for Bucket {
+ /// Simply abord the dequeue tasks to aboid leaking memory via arc(s)
fn drop(&mut self) {
for join in &self.tasks {
join.abort();
}
impl Bucket {
- /// Create a new bucket for the specified [`Path`].
- pub fn new(global: Arc<RedisLock>) -> Arc<Self> {
+ /// Creates a new bucket with four dequeue tasks
+ /// # Async
+ /// This functions **should** be called in a tokio 1.x runtime, otherwise the function *will* panic.
+ #[must_use]
+ pub fn new() -> Arc<Self> {
let tasks = vec![];
let this = Arc::new(Self {
// Run with 4 dequeue tasks
for _ in 0..4 {
let this = this.clone();
- let global = global.clone();
tokio::spawn(async move {
+ // continuously wait for elements in the queue to process them sequantially.
+ // this is using parallel tasks to allow (hopefully) better performance.
while let Some(element) = this.queue.pop().await {
- // we need to wait
- if let Some(duration) = global.locked_for().await {
- tokio::time::sleep(duration).await;
- }
-
if this.remaining() == 0 {
debug!("0 tickets remaining, we have to wait.");
match this.time_remaining() {
TimeRemaining::Finished => {
+ debug!("waiting seems finished.");
this.try_reset();
}
TimeRemaining::Some(duration) => {
this.try_reset();
}
- TimeRemaining::NotStarted => {}
+ TimeRemaining::NotStarted => {
+ debug!("we should not wait");
+ }
}
}
this
}
- /// Total number of tickets allotted in a cycle.
+ /// Total number of tickets allowed in a cycle.
pub fn limit(&self) -> u64 {
self.limit.load(Ordering::Relaxed)
}
- /// Number of tickets remaining.
+ /// Number of tickets remaining in the current cycle.
pub fn remaining(&self) -> u64 {
self.remaining.load(Ordering::Relaxed)
}
- /// Duration after the [`started_at`] time the bucket will refresh.
- ///
- /// [`started_at`]: Self::started_at
+ /// Duration after the [`Self::last_update`] time the bucket will refresh.
pub fn reset_after(&self) -> u64 {
self.reset_after.load(Ordering::Relaxed)
}
let last_update = &self.last_update;
if last_update.is_empty() {
+ debug!("last update is empty");
+
+ TimeRemaining::NotStarted
+ } else {
let elapsed = last_update.elapsed();
if elapsed > Duration::from_millis(reset_after) {
}
TimeRemaining::Some(Duration::from_millis(reset_after) - elapsed)
- } else {
- TimeRemaining::NotStarted
}
}
- /// Try to reset this bucket's [`started_at`] value if it has finished.
+ /// Try to reset this bucket's [`Self::last_update`] value if it has finished.
///
/// Returns whether resetting was possible.
- ///
- /// [`started_at`]: Self::started_at
pub fn try_reset(&self) -> bool {
if self.last_update.is_empty() {
return false;
}
/// Update this bucket's ratelimit data after a request has been made.
- pub fn update(&self, ratelimits: Present, time: u64) {
+ /// The time of the request should be given.
+ pub fn update(&self, ratelimits: &Present, time: u64) {
let bucket_limit = self.limit();
if self.last_update.is_empty() {
+ debug!(millis = time, "updated the last update time");
self.last_update.set_millis(time);
}
.store(ratelimits.remaining(), Ordering::Relaxed);
}
- pub async fn ticket(&self) -> oneshot::Receiver<()> {
+ /// Submits a ticket to the queue
+ /// A oneshot receiver is returned and will be called when the ticket is accepted.
+ pub fn ticket(&self) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
self.queue.push(tx);
rx
}
}
+
+#[cfg(test)]
+mod tests {
+ use std::{
+ ops::Add,
+ time::{Duration, Instant, SystemTime},
+ };
+
+ use tokio::time::timeout;
+ use tracing::info;
+ use twilight_http_ratelimiting::RatelimitHeaders;
+
+ use super::Bucket;
+
+ #[test_log::test(tokio::test)]
+ async fn should_ratelimit() {
+ let bucket = Bucket::new();
+
+ // Intialize a bucket with one remaining ticket
+ // and that resets in oue hour
+ let mreset = SystemTime::now()
+ .add(Duration::from_secs(100))
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis()
+ .to_string();
+ let headers: [(&str, &[u8]); 5] = [
+ ("x-ratelimit-bucket", b"123"),
+ ("x-ratelimit-limit", b"100"),
+ ("x-ratelimit-remaining", b"1"),
+ ("x-ratelimit-reset", mreset.as_bytes()),
+ ("x-ratelimit-reset-after", b"100.000"),
+ ];
+ if let RatelimitHeaders::Present(present) =
+ RatelimitHeaders::from_pairs(headers.into_iter()).unwrap()
+ {
+ // Integer truncating is expected
+ #[allow(clippy::cast_possible_truncation)]
+ bucket.update(
+ &present,
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64,
+ );
+ }
+
+ let ticket = bucket.ticket();
+
+ info!("first request");
+ // We should accept one ticket
+ let respo = timeout(Duration::from_secs(10), ticket).await;
+ assert!(respo.is_ok());
+
+ info!("second request");
+
+ let ticket = bucket.ticket();
+ // We should accept one ticket
+ let respo = timeout(Duration::from_secs(1), ticket).await;
+
+ // the ticket should not have responded because the queue is locked
+ assert!(respo.is_err());
+ }
+
+ #[test_log::test(tokio::test)]
+ async fn should_block_until_possible() {
+ let bucket = Bucket::new();
+
+ // Intialize a bucket with one remaining ticket
+ // and that resets in oue hour
+ let mreset = SystemTime::now()
+ .add(Duration::from_secs(100))
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis()
+ .to_string();
+ let headers: [(&str, &[u8]); 5] = [
+ ("x-ratelimit-bucket", b"123"),
+ ("x-ratelimit-limit", b"100"),
+ ("x-ratelimit-remaining", b"0"),
+ ("x-ratelimit-reset", mreset.as_bytes()),
+ ("x-ratelimit-reset-after", b"100.000"),
+ ];
+
+ if let RatelimitHeaders::Present(present) =
+ RatelimitHeaders::from_pairs(headers.into_iter()).unwrap()
+ {
+ // Integer truncating is expected
+ #[allow(clippy::cast_possible_truncation)]
+ bucket.update(
+ &present,
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64,
+ );
+ }
+
+ let ticket = bucket.ticket();
+ let start = Instant::now();
+
+ // in this case, the ratelimiter should wait 10 seconds
+ let respo = timeout(Duration::from_secs(12), ticket).await;
+ let end = start.elapsed().as_secs();
+
+ // we should have waited 10 seconds (+- 1s)
+ assert_eq!(10, end);
+ // and the ticket should be a success
+ assert!(respo.is_ok());
+ }
+}
+use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
+
pub mod async_queue;
pub mod atomic_instant;
pub mod bucket;
+pub mod noop_lock;
pub mod redis_lock;
+
+pub trait GlobalLock: Send + Sync {
+ fn lock_for<'a>(
+ self: &'a Arc<Self>,
+ duration: Duration,
+ ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
+ fn is_locked<'a>(
+ self: &'a Arc<Self>,
+ ) -> Pin<Box<dyn Future<Output = Option<Duration>> + Send + 'a>>;
+}
--- /dev/null
+use super::GlobalLock;
+
+pub struct NoOpLock;
+impl GlobalLock for NoOpLock {
+ fn lock_for<'a>(
+ self: &'a std::sync::Arc<Self>,
+ _duration: std::time::Duration,
+ ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'a>> {
+ Box::pin(async move {})
+ }
+
+ fn is_locked<'a>(
+ self: &'a std::sync::Arc<Self>,
+ ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<std::time::Duration>> + Send + 'a>>
+ {
+ Box::pin(async move { None })
+ }
+}
use std::{
+ future::Future,
+ pin::Pin,
sync::{atomic::AtomicU64, Arc},
time::{Duration, SystemTime},
};
use tokio::sync::Mutex;
use tracing::debug;
-/// This is flawed and needs to be replaced sometime with the real RedisLock algorithm
+use super::GlobalLock;
+
+/// This is flawed and needs to be replaced sometime with the real `RedisLock` algorithm
#[derive(Debug)]
pub struct RedisLock {
redis: Mutex<MultiplexedConnection>,
}
impl RedisLock {
- /// Set the global ratelimit as exhausted.
- pub async fn lock_for(self: &Arc<Self>, duration: Duration) {
- debug!("locking globally for {}", duration.as_secs());
- let _: () = self
- .redis
- .lock()
- .await
- .set_ex(
- "nova:rls:lock",
- 1,
- (duration.as_secs() + 1).try_into().unwrap(),
- )
- .await
- .unwrap();
-
- self.is_locked.store(
- (SystemTime::now() + duration)
- .duration_since(SystemTime::UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64,
- std::sync::atomic::Ordering::Relaxed,
- );
+ #[must_use]
+ pub fn new(redis: MultiplexedConnection) -> Arc<Self> {
+ Arc::new(Self {
+ redis: Mutex::new(redis),
+ is_locked: AtomicU64::new(0),
+ })
}
+}
+
+impl GlobalLock for RedisLock {
+ fn lock_for<'a>(
+ self: &'a Arc<Self>,
+ duration: Duration,
+ ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
+ Box::pin(async move {
+ debug!("locking globally for {}", duration.as_secs());
+ let _: () = self
+ .redis
+ .lock()
+ .await
+ .set_ex(
+ "nova:rls:lock",
+ 1,
+ (duration.as_secs() + 1).try_into().unwrap(),
+ )
+ .await
+ .unwrap();
- pub async fn locked_for(self: &Arc<Self>) -> Option<Duration> {
- let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed);
- if load != 0 {
- if load
- > SystemTime::now()
+ // Integer truncating is expected
+ #[allow(clippy::cast_possible_truncation)]
+ self.is_locked.store(
+ (SystemTime::now() + duration)
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
- .as_millis() as u64
- {
- return Some(Duration::from_millis(load));
- } else {
+ .as_millis() as u64,
+ std::sync::atomic::Ordering::Relaxed,
+ );
+ })
+ }
+
+ fn is_locked<'a>(
+ self: &'a Arc<Self>,
+ ) -> Pin<Box<dyn Future<Output = Option<Duration>> + Send + 'a>> {
+ Box::pin(async move {
+ let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed);
+ if load != 0 {
+ // Integer truncating is expected
+ #[allow(clippy::cast_possible_truncation)]
+ if load
+ > SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64
+ {
+ return Some(Duration::from_millis(load));
+ }
self.is_locked
.store(0, std::sync::atomic::Ordering::Relaxed);
}
- }
- let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await;
- match result {
- Ok(remaining_time) => {
- if remaining_time > 0 {
- let duration = Duration::from_secs(remaining_time as u64);
- debug!("external global lock detected, locking");
- self.lock_for(duration).await;
- Some(duration)
- } else {
- None
+ let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await;
+ match result {
+ Ok(remaining_time) => {
+ if remaining_time > 0 {
+ // Sign loss is allowed since we know it's a positive number
+ // because a ttl is always positive when the key exists and have a ttl
+ // otherwise redis *will* return a negative number, hence the check for
+ // a positive sign.
+ #[allow(clippy::cast_sign_loss)]
+ let duration = Duration::from_secs(remaining_time as u64);
+ debug!("external global lock detected, locking");
+ self.lock_for(duration).await;
+ Some(duration)
+ } else {
+ None
+ }
}
- }
- Err(error) => {
- debug!("redis call failed: {}", error);
+ Err(error) => {
+ debug!("redis call failed: {}", error);
- None
+ None
+ }
}
- }
- }
-
- pub fn new(redis: MultiplexedConnection) -> Arc<Self> {
- Arc::new(Self {
- redis: Mutex::new(redis),
- is_locked: AtomicU64::new(0),
})
}
}
}
#[derive(Debug, Deserialize, Clone, Default)]
-pub struct RatelimitServerConfig {
+pub struct Ratelimit {
pub server: ServerSettings,
}
use crate::buckets::bucket::Bucket;
use crate::buckets::redis_lock::RedisLock;
+use crate::buckets::GlobalLock;
pub struct RLServer {
global: Arc<RedisLock>,
struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);
impl<'a> Extractor for MetadataMap<'a> {
- /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None
+ /// Get a value for a key from the `MetadataMap`. If the value can't be converted to &str, returns None
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
}
- /// Collect all the keys from the MetadataMap.
+ /// Collect all the keys from the `MetadataMap`.
fn keys(&self) -> Vec<&str> {
self.0
.keys()
)
.unwrap();
+ if let Some(duration) = self.global.is_locked().await {
+ tokio::time::sleep(duration).await;
+ }
+
let bucket: Arc<Bucket> = if self.buckets.read().await.contains_key(&data.path) {
self.buckets
.read()
.expect("impossible")
.clone()
} else {
- let bucket = Bucket::new(self.global.clone());
+ let bucket = Bucket::new();
self.buckets.write().await.insert(data.path, bucket.clone());
bucket
};
global.retry_after()
);
self.global
+ .clone()
.lock_for(Duration::from_secs(global.retry_after()))
.await;
}
RatelimitHeaders::None => {}
RatelimitHeaders::Present(present) => {
// we should update the bucket.
- bucket.update(present, data.precise_time);
+ bucket.update(&present, data.precise_time);
}
_ => unreachable!(),
};
.expect("impossible")
.clone()
} else {
- let bucket = Bucket::new(self.global.clone());
+ let bucket = Bucket::new();
self.buckets.write().await.insert(data.path, bucket.clone());
bucket
};
// wait for the ticket to be accepted
- bucket.ticket().await;
+ let _ = bucket.ticket().await;
Ok(Response::new(()))
}
+#![deny(
+ clippy::all,
+ clippy::correctness,
+ clippy::suspicious,
+ clippy::style,
+ clippy::complexity,
+ clippy::perf,
+ clippy::pedantic,
+ clippy::nursery,
+ unsafe_code
+)]
+
use buckets::redis_lock::RedisLock;
-use config::RatelimitServerConfig;
+use config::Ratelimit;
use grpc::RLServer;
use leash::{AnyhowResultFuture, Component};
use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer;
use tokio::sync::oneshot;
use tonic::transport::Server;
-mod buckets;
+pub mod buckets;
mod config;
mod grpc;
pub struct RatelimiterServerComponent {}
impl Component for RatelimiterServerComponent {
- type Config = RatelimitServerConfig;
+ type Config = Ratelimit;
const SERVICE_NAME: &'static str = "ratelimiter";
fn start(
}
#[derive(Debug, Deserialize, Clone, Default)]
-pub struct ReverseProxyConfig {
+pub struct ReverseProxy {
pub server: ServerSettings,
pub discord: Discord,
pub ratelimiter_address: String,
let request_path = request.uri().path();
let (api_path, trimmed_path) = normalize_path(request_path);
- let mut uri_string = format!("http://127.0.0.1:9999{}{}", api_path, trimmed_path);
+ let mut uri_string = format!("http://127.0.0.1:9999{api_path}{trimmed_path}");
if let Some(query) = request.uri().query() {
uri_string.push('?');
uri_string.push_str(query);
.expect("Failed to check header")
.starts_with("Bot")
{
- *auth = HeaderValue::from_str(&format!("Bot {}", token))?;
+ *auth = HeaderValue::from_str(&format!("Bot {token}"))?;
}
} else {
request.headers_mut().insert(
AUTHORIZATION,
- HeaderValue::from_str(&format!("Bot {}", token))?,
+ HeaderValue::from_str(&format!("Bot {token}"))?,
);
}
let headers = resp
.headers()
.into_iter()
- .map(|(k, v)| (k.to_string(), v.to_str().map(|f| f.to_string())))
+ .map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string)))
.filter(|f| f.1.is_ok())
.map(|f| (f.0, f.1.expect("errors should be filtered")))
.collect();
- let _ = ratelimiter
+ let _submit_headers = ratelimiter
.submit_headers(hash, headers)
.instrument(info_span!("submitting headers"))
.await;
-use config::ReverseProxyConfig;
+#![deny(
+ clippy::all,
+ clippy::correctness,
+ clippy::suspicious,
+ clippy::style,
+ clippy::complexity,
+ clippy::perf,
+ clippy::pedantic,
+ clippy::nursery,
+ unsafe_code
+)]
+
+use config::ReverseProxy;
use handler::handle_request;
use hyper::{
pub struct ReverseProxyServer {}
impl Component for ReverseProxyServer {
- type Config = ReverseProxyConfig;
+ type Config = ReverseProxy;
const SERVICE_NAME: &'static str = "rest";
fn start(
-use crate::config::ReverseProxyConfig;
+use crate::config::ReverseProxy;
use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode};
use anyhow::anyhow;
current_remotes: Vec<String>,
stop: Arc<tokio::sync::broadcast::Sender<()>>,
- config: ReverseProxyConfig,
+ config: ReverseProxy,
}
impl Drop for RemoteRatelimiter {
// get list of dns responses
let responses: Vec<String> = dns_lookup::lookup_host(&self.config.ratelimiter_address)?
.into_iter()
- .filter(|address| address.is_ipv4())
+ .filter(std::net::IpAddr::is_ipv4)
.map(|address| address.to_string())
.collect();
let mut write = self.remotes.write().await;
for ip in &responses {
- if !self.current_remotes.contains(&ip) {
- let a = VNode::new(ip.to_owned(), self.config.ratelimiter_port).await?;
+ if !self.current_remotes.contains(ip) {
+ let a = VNode::new(ip.clone(), self.config.ratelimiter_port).await?;
write.add(a.clone());
}
}
}
#[must_use]
- pub fn new(config: ReverseProxyConfig) -> Self {
+ pub fn new(config: ReverseProxy) -> Self {
let (rx, mut tx) = broadcast::channel(1);
let obj = Self {
remotes: Arc::new(RwLock::new(HashRingWrapper::default())),
stop: Arc::new(rx),
config,
- current_remotes: vec![]
+ current_remotes: vec![],
};
let obj_clone = obj.clone();
match obj_clone.get_ratelimiters().await {
Ok(_) => {
- debug!("refreshed ratelimiting servers")
+ debug!("refreshed ratelimiting servers");
}
Err(err) => {
error!("refreshing ratelimiting servers failed {}", err);
let context = span.context();
let mut request = Request::new(BucketSubmitTicketRequest { path });
global::get_text_map_propagator(|propagator| {
- propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()))
+ propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()));
});
// Requesting
let time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_millis();
+ // truncation is expected
+ #[allow(clippy::cast_possible_truncation)]
let mut request = Request::new(HeadersSubmitRequest {
path,
precise_time: time as u64,
headers,
});
global::get_text_map_propagator(|propagator| {
- propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()))
+ propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()));
});
node.submit_headers(request).await?;
pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap);
impl<'a> Injector for MetadataMap<'a> {
- /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs
+ /// Set a key and value in the `MetadataMap`. Does nothing if the key or value are not valid inputs
fn set(&mut self, key: &str, value: String) {
if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) {
impl VNode {
pub async fn new(address: String, port: u16) -> Result<Self, tonic::transport::Error> {
- let host = format!("http://{}:{}", address.clone(), port);
+ let host = format!("http://{}:{port}", address.clone());
debug!("connecting to {}", host);
let client = RatelimiterClient::connect(host).await?;
- Ok(VNode { client, address })
+ Ok(Self { address, client })
}
}
}
#[derive(Debug, Deserialize, Clone, Default, Copy)]
-pub struct WebhookConfig {
+pub struct Webhook {
pub server: ServerSettings,
pub discord: Discord,
}
+++ /dev/null
-use hyper::{header::ToStrError, Body, Response, StatusCode};
-
-pub struct WebhookError {
- pub code: StatusCode,
- pub message: String,
-}
-
-impl WebhookError {
- pub fn new(code: StatusCode, message: &str) -> WebhookError {
- WebhookError {
- code,
- message: message.to_string(),
- }
- }
-}
-
-impl From<WebhookError> for Response<Body> {
- fn from(value: WebhookError) -> Self {
- Response::builder()
- .status(value.code)
- .body(value.message.into())
- .unwrap()
- }
-}
-
-impl From<hyper::Error> for WebhookError {
- fn from(_: hyper::Error) -> Self {
- WebhookError::new(StatusCode::BAD_REQUEST, "invalid request")
- }
-}
-
-impl From<ToStrError> for WebhookError {
- fn from(_: ToStrError) -> Self {
- WebhookError::new(StatusCode::BAD_REQUEST, "invalid request")
- }
-}
}
impl<T: Clone> MakeSvc<T> {
- pub fn new(service: T) -> Self {
+ pub const fn new(service: T) -> Self {
Self { service }
}
}
-use crate::config::WebhookConfig;
+use crate::config::Webhook;
+use anyhow::bail;
use async_nats::Client;
use ed25519_dalek::PublicKey;
-use error::WebhookError;
use hyper::{
body::{to_bytes, Bytes},
service::Service,
- Body, Method, Request, Response, StatusCode,
+ Body, Method, Request, Response,
};
use shared::payloads::{CachePayload, DispatchEventTagged};
-use signature::validate_signature;
+use signature::validate;
use std::{
future::Future,
pin::Pin,
- str::from_utf8,
task::{Context, Poll},
};
use tracing::{debug, error};
gateway::payload::incoming::InteractionCreate,
};
-mod error;
pub mod make_service;
mod signature;
/// Hyper service used to handle the discord webhooks
#[derive(Clone)]
pub struct WebhookService {
- pub config: WebhookConfig,
+ pub config: Webhook,
pub nats: Client,
}
impl WebhookService {
- async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {
+ async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, anyhow::Error> {
if req.method() == Method::POST {
let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {
- sig.to_owned()
+ sig.clone()
} else {
- return Err(WebhookError::new(
- StatusCode::BAD_REQUEST,
- "missing signature header",
- ));
+ bail!("Missing signature header");
};
let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") {
- timestamp.to_owned()
+ timestamp.clone()
} else {
- return Err(WebhookError::new(
- StatusCode::BAD_REQUEST,
- "missing timestamp header",
- ));
+ bail!("Missing timestamp header");
};
let data = to_bytes(req.into_body()).await?;
- if validate_signature(
+ if validate(
&pk,
&[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),
signature.to_str()?,
) {
Ok(data)
} else {
- Err(WebhookError::new(
- StatusCode::UNAUTHORIZED,
- "invalid signature",
- ))
+ bail!("invalid signature");
}
} else {
- Err(WebhookError::new(StatusCode::NOT_FOUND, "not found"))
+ bail!("not found");
}
}
req: Request<Body>,
nats: Client,
pk: PublicKey,
- ) -> Result<Response<Body>, WebhookError> {
- match Self::check_request(req, pk).await {
- Ok(data) => {
- let utf8 = from_utf8(&data);
- match utf8 {
- Ok(data) => match serde_json::from_str::<Interaction>(data) {
- Ok(value) => {
- match value.kind {
- InteractionType::Ping => Ok(Response::builder()
- .header("Content-Type", "application/json")
- .body(r#"{"type":1}"#.into())
- .unwrap()),
- _ => {
- debug!("calling nats");
- // this should hopefully not fail ?
-
- let data = CachePayload {
- data: DispatchEventTagged {
- data: DispatchEvent::InteractionCreate(Box::new(
- InteractionCreate(value),
- )),
- },
- };
-
- let payload = serde_json::to_string(&data).unwrap();
-
- match nats
- .request(
- "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
- Bytes::from(payload),
- )
- .await
- {
- Ok(response) => Ok(Response::builder()
- .header("Content-Type", "application/json")
- .body(Body::from(response.payload))
- .unwrap()),
-
- Err(error) => {
- error!("failed to request nats: {}", error);
- Err(WebhookError::new(
- StatusCode::INTERNAL_SERVER_ERROR,
- "failed to request nats",
- ))
- }
- }
- }
- }
- }
-
- Err(error) => {
- error!("invalid json body: {}", error);
- Err(WebhookError::new(
- StatusCode::BAD_REQUEST,
- "invalid json body",
- ))
- }
- },
-
- Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")),
+ ) -> Result<Response<Body>, anyhow::Error> {
+ let data = Self::check_request(req, pk).await?;
+ let interaction: Interaction = serde_json::from_slice(&data)?;
+
+ if interaction.kind == InteractionType::Ping {
+ Ok(Response::builder()
+ .header("Content-Type", "application/json")
+ .body(r#"{"type":1}"#.into())
+ .unwrap())
+ } else {
+ debug!("calling nats");
+ // this should hopefully not fail ?
+
+ let data = CachePayload {
+ data: DispatchEventTagged {
+ data: DispatchEvent::InteractionCreate(Box::new(InteractionCreate(
+ interaction,
+ ))),
+ },
+ };
+
+ let payload = serde_json::to_string(&data).unwrap();
+
+ match nats
+ .request(
+ "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
+ Bytes::from(payload),
+ )
+ .await
+ {
+ Ok(response) => Ok(Response::builder()
+ .header("Content-Type", "application/json")
+ .body(Body::from(response.payload))
+ .unwrap()),
+
+ Err(error) => {
+ error!("failed to request nats: {}", error);
+ Err(anyhow::anyhow!("internal error"))
}
}
- Err(error) => Err(error),
}
}
}
/// Implementation of the service
impl Service<hyper::Request<Body>> for WebhookService {
type Response = hyper::Response<Body>;
- type Error = hyper::Error;
+ type Error = anyhow::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
match response {
Ok(r) => Ok(r),
- Err(e) => Ok(e.into()),
+ Err(e) => Err(e),
}
})
}
use ed25519_dalek::{PublicKey, Signature, Verifier};
#[inline]
-pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool {
+pub fn validate(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool {
let mut slice: [u8; Signature::BYTE_SIZE] = [0; Signature::BYTE_SIZE];
let signature_result = hex::decode_to_slice(hex_signature, &mut slice);
- let mut result = false;
if signature_result.is_ok() {
- result = public_key.verify(data, &Signature::from(slice)).is_ok();
+ public_key.verify(data, &Signature::from(slice)).is_ok()
+ } else {
+ false
}
-
- result
}
-use crate::handler::signature::validate_signature;
+use crate::handler::signature::validate;
use ed25519_dalek::PublicKey;
#[test]
fn validate_signature_test() {
let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002";
- let content = "message de test incroyable".as_bytes().to_vec();
+ let content = b"message de test incroyable";
let public_key = PublicKey::from_bytes(
&hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap(),
)
.unwrap();
- assert!(validate_signature(&public_key, &content, signature))
+ assert!(validate(&public_key, content, signature));
}
#[test]
)
.unwrap();
- let content = "ceci est un test qui ne fonctionnera pas!"
- .as_bytes()
- .to_vec();
- assert!(!validate_signature(&public_key, &content, signature))
+ let content = b"ceci est un test qui ne fonctionnera pas!";
+ assert!(!validate(&public_key, content, signature));
}
#[test]
)
.unwrap();
- let content = "ceci est un test qui ne fonctionnera pas!"
- .as_bytes()
- .to_vec();
- assert!(!validate_signature(&public_key, &content, signature))
+ let content = b"ceci est un test qui ne fonctionnera pas!";
+ assert!(!validate(&public_key, content, signature));
}
+#![deny(
+ clippy::all,
+ clippy::correctness,
+ clippy::suspicious,
+ clippy::style,
+ clippy::complexity,
+ clippy::perf,
+ clippy::pedantic,
+ clippy::nursery,
+ unsafe_code
+)]
+
mod config;
mod handler;
use std::{future::Future, pin::Pin};
use crate::{
- config::WebhookConfig,
+ config::Webhook,
handler::{make_service::MakeSvc, WebhookService},
};
use async_nats::Client;
pub struct WebhookServer {}
impl Component for WebhookServer {
- type Config = WebhookConfig;
+ type Config = Webhook;
const SERVICE_NAME: &'static str = "webhook";
fn start(
+#![deny(
+ clippy::all,
+ clippy::correctness,
+ clippy::suspicious,
+ clippy::style,
+ clippy::complexity,
+ clippy::perf,
+ clippy::pedantic,
+ clippy::nursery,
+)]
+
use anyhow::Result;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace::{self};
use tracing::info;
#[derive(Debug, Deserialize, Clone)]
-pub struct Settings<T: Clone + DeserializeOwned + Default> {
+pub struct Settings<T: Clone + DeserializeOwned> {
#[serde(skip_deserializing)]
pub config: T,
- pub nats: crate::nats::NatsConfiguration,
- pub redis: crate::redis::RedisConfiguration,
+ pub nats: crate::nats::Configuration,
+ pub redis: crate::redis::Configuration,
}
impl<T: Clone + DeserializeOwned + Default> Settings<T> {
- pub fn new(service_name: &str) -> Result<Settings<T>> {
+ /// # Errors
+ /// Fails it the config could not be deserialized to `Self::T`
+ pub fn new(service_name: &str) -> Result<Self> {
let mut builder = Config::builder();
builder = builder.add_source(File::with_name("config/default"));
let mode = env::var("ENV").unwrap_or_else(|_| "development".into());
info!("Configuration Environment: {}", mode);
- builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false));
+ builder = builder.add_source(File::with_name(&format!("config/{mode}")).required(false));
builder = builder.add_source(File::with_name("config/local").required(false));
let env = Environment::with_prefix("NOVA").separator("__");
builder = builder.add_source(env);
let config = builder.build()?;
- let mut settings: Settings<T> = config.clone().try_deserialize()?;
+ let mut settings: Self = config.clone().try_deserialize()?;
// try to load the config
settings.config = config.get::<T>(service_name)?;
-/// This crate is all the utilities shared by the nova rust projects
-/// It includes logging, config and protocols.
+#![deny(
+ clippy::all,
+ clippy::correctness,
+ clippy::suspicious,
+ clippy::style,
+ clippy::complexity,
+ clippy::perf,
+ clippy::pedantic,
+ clippy::nursery,
+)]
+
pub mod config;
pub mod nats;
pub mod payloads;
use serde::Deserialize;
#[derive(Clone, Debug, Deserialize)]
-pub struct NatsConfigurationClientCert {
- pub cert: String,
- pub key: String,
-}
-
-#[derive(Clone, Debug, Deserialize)]
-pub struct NatsConfigurationTls {
- pub mtu: Option<usize>,
-}
-
-#[derive(Clone, Debug, Deserialize)]
-pub struct NatsConfiguration {
+pub struct Configuration {
pub host: String,
}
-impl From<NatsConfiguration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>> {
- fn from(value: NatsConfiguration) -> Self {
+impl From<Configuration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>> {
+ fn from(value: Configuration) -> Self {
Box::pin(async move { Ok(async_nats::connect(value.host).await?) })
}
}
let tagged = DispatchEventTaggedSerialized::deserialize(deserializer)?;
let deserializer_seed = DispatchEventWithTypeDeserializer::new(&tagged.kind);
let dispatch_event = deserializer_seed.deserialize(tagged.data).unwrap();
- Ok(DispatchEventTagged {
+ Ok(Self {
data: dispatch_event,
})
}
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, Deserialize)]
-pub struct RedisConfiguration {
+pub struct Configuration {
pub url: String,
}
-impl From<RedisConfiguration>
+impl From<Configuration>
for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>> + Send>>
{
- fn from(value: RedisConfiguration) -> Self {
+ fn from(value: Configuration) -> Self {
Box::pin(async move {
let con = Client::open(value.url)?;
let (multiplex, ready) = con.create_multiplexed_tokio_connection().await?;