.idea\r
config.yml\r
\r
-config/
\ No newline at end of file
+config/*\r
+build/\r
+*.yml\r
# It is not intended for manual editing.
version = 3
+[[package]]
+name = "addr2line"
+version = "0.19.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97"
+dependencies = [
+ "gimli",
+]
+
[[package]]
name = "adler"
version = "1.0.2"
]
[[package]]
-name = "all"
+name = "all-in-one"
version = "0.1.0"
dependencies = [
"anyhow",
"cache",
"cbindgen",
"config",
+ "ctrlc",
"gateway",
"leash",
"libc",
version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
+dependencies = [
+ "backtrace",
+]
[[package]]
name = "arc-swap"
"tower-service",
]
+[[package]]
+name = "backtrace"
+version = "0.3.67"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca"
+dependencies = [
+ "addr2line",
+ "cc",
+ "cfg-if",
+ "libc",
+ "miniz_oxide",
+ "object",
+ "rustc-demangle",
+]
+
[[package]]
name = "base64"
version = "0.13.1"
"typenum",
]
+[[package]]
+name = "ctrlc"
+version = "3.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1631ca6e3c59112501a9d87fd86f21591ff77acd31331e8a73f8d80a65bbdd71"
+dependencies = [
+ "nix",
+ "windows-sys 0.42.0",
+]
+
[[package]]
name = "curve25519-dalek"
version = "3.2.0"
"wasi 0.11.0+wasi-snapshot-preview1",
]
+[[package]]
+name = "gimli"
+version = "0.27.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dec7af912d60cdbd3677c1af9352ebae6fb8394d165568a2234df0fa00f87793"
+
[[package]]
name = "glob"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
+[[package]]
+name = "nix"
+version = "0.26.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "46a58d1d356c6597d08cde02c2f09d785b09e28711837b1ed667dc652c08a694"
+dependencies = [
+ "bitflags",
+ "cfg-if",
+ "libc",
+ "static_assertions",
+]
+
[[package]]
name = "nkeys"
version = "0.2.0"
"libc",
]
+[[package]]
+name = "object"
+version = "0.30.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2b8c786513eb403643f2a88c244c2aaa270ef2153f55094587d0c48a3cf22a83"
+dependencies = [
+ "memchr",
+]
+
[[package]]
name = "once_cell"
version = "1.17.0"
"ordered-multimap",
]
+[[package]]
+name = "rustc-demangle"
+version = "0.1.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
+
[[package]]
name = "rustix"
version = "0.36.6"
"der",
]
+[[package]]
+name = "static_assertions"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
+
[[package]]
name = "strsim"
version = "0.10.0"
"exes/rest/",\r
"exes/webhook/",\r
"exes/ratelimit/",\r
- "exes/all",\r
+ "exes/all-in-one/",\r
\r
"libs/proto/",\r
"libs/shared/",\r
--- /dev/null
+# Creates the bin folder for build artifacts
+build/{bin,lib}:
+ @mkdir -p build/{lib,bin}
+
+# Builds all rust targets
+build/lib/liball_in_one.a build/bin/{cache,gateway,ratelimit,rest,webhook}: build/{bin,lib}
+ @echo "Building rust project"
+ cargo build --release
+ @cp target/release/liball_in_one.a build/lib
+ @cp target/release/{cache,gateway,ratelimit,rest,webhook} build/bin
+
+# Generated by a rust build script.
+internal/pkg/all-in-one/all-in-one.h: build/lib/liball_in_one.a
+
+# All in one program build
+build/bin/nova: build/lib/liball_in_one.a internal/pkg/all-in-one/all-in-one.h
+ go build -a -ldflags '-s' -o build/bin/nova cmd/nova/nova.go
+
+all: build/{lib,bin}/nova
+
+.PHONY: all
\ No newline at end of file
--- /dev/null
+package main
+
+import (
+ "os"
+ "os/signal"
+ "syscall"
+
+ allinone "github.com/discordnova/nova/internal/pkg/all-in-one"
+)
+
+func main() {
+ allInOne, err := allinone.NewAllInOne()
+ if err != nil {
+ panic(err)
+ }
+ err = allInOne.Start()
+ if err != nil {
+ panic(err)
+ }
+ // Wait for a SIGINT
+ c := make(chan os.Signal, 1)
+ signal.Notify(c,
+ syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT)
+ <-c
+
+ allInOne.Stop()
+
+ println("Arret de nova all in one")
+}
+++ /dev/null
-gateway:
- token: ODA3MTg4MzM1NzE3Mzg0MjEy.Gtk5vu.Ejt9d70tnB9W_tXYMUATsBU24nqwQjlUZy7QUo
- intents: 3276799
- shard: 0
- shard_total: 1
-
-rest:
- discord:
- token: ODA3MTg4MzM1NzE3Mzg0MjEy.Gtk5vu.Ejt9d70tnB9W_tXYMUATsBU24nqwQjlUZy7QUo
- server:
- listening_adress: 0.0.0.0:8090
-
-webhook:
- discord:
- public_key: a3d9368eda990e11ca501655d219a2d88591de93d32037f62f453a8587a46ff5
- client_id: 807188335717384212
- server:
- listening_adress: 0.0.0.0:8091
-
-cache:
- toggles:
- - channels_cache
- - guilds_cache
- - guild_schedules_cache
- - stage_instances_cache
- - integrations_cache
- - members_cache
- - bans_cache
- - reactions_cache
- - messages_cache
- - threads_cache
- - invites_cache
- - roles_cache
- - automoderation_cache
- - voice_states_cache
-
-ratelimiter:
-
-
-# Prometheus monitoring configuration
-monitoring:
- enabled: false
- address: 0.0.0.0
- port: 9000
-
-# Nats broker configuration
-nats:
- host: nats
-
-redis:
- url: redis://redis
redis:
image: redis
+ ports:
+ - 6379:6379
cache:
image: ghcr.io/discordnova/nova/cache
--- /dev/null
+[package]
+name = "all-in-one"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+libc = "0.2.139"
+leash = { path = "../../libs/leash" }
+shared = { path = "../../libs/shared" }
+
+cache = { path = "../cache" }
+gateway = { path = "../gateway" }
+ratelimit = { path = "../ratelimit" }
+rest = { path = "../rest" }
+webhook = { path = "../webhook" }
+ctrlc = "3.2.4"
+
+tokio = { version = "1.23.1", features = ["rt"] }
+serde = "1.0.152"
+serde_json = "1.0.91"
+anyhow = { version = "1.0.68", features = ["backtrace"] }
+
+tracing = "0.1.37"
+
+config = "0.13.3"
+
+tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
+tracing-opentelemetry = "0.18.0"
+opentelemetry = { version ="0.18.0", features = ["rt-tokio"] }
+opentelemetry-otlp = { version = "0.11.0" }
+
+[lib]
+crate-type = ["staticlib", "rlib"]
+
+[build-dependencies]
+cbindgen = "0.24.3"
\ No newline at end of file
--- /dev/null
+clean:
+ rm ./build/*
+
+library:
+ cargo build --release
+
+build: library
+ cp ../../target/release/liball_in_one.a ./build
+ go build -a -ldflags '-s' -o build/all-in-one
+
+all: library build
+
+run: all-in-one
+ ./build/all-in-one
+
--- /dev/null
+extern crate cbindgen;
+
+use cbindgen::{Config, Language};
+use std::env;
+use std::error::Error;
+use std::path::PathBuf;
+
+/// Generates the headers for the go program.
+fn main() -> Result<(), Box<dyn Error>> {
+ let crate_dir = env::var("CARGO_MANIFEST_DIR")?;
+ let package_name = env::var("CARGO_PKG_NAME")?;
+
+ // We export the header file to build/{package_name}.h
+ let output_file = PathBuf::from("../../internal/pkg/all-in-one")
+ .join(format!("{}.h", package_name))
+ .display()
+ .to_string();
+
+ let config = Config {
+ language: Language::C,
+ ..Default::default()
+ };
+
+ cbindgen::generate_with_config(crate_dir, config)?.write_to_file(output_file);
+
+ Ok(())
+}
--- /dev/null
+package main
+
+/*
+#cgo LDFLAGS: -L./build -lall_in_one -lz -lm
+#include "./build/all-in-one.h"
+*/
+import "C"
+
+import (
+ "fmt"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "github.com/Jeffail/gabs"
+ "github.com/alicebob/miniredis/v2"
+
+ server "github.com/nats-io/nats-server/v2/server"
+)
+
+func main() {
+ // Intialise les logs de la librarie Rust
+ C.init_logs()
+ // Charge la configuration
+ str := C.GoString(C.load_config())
+
+ // Démarre une instance MiniRedis
+ mr := miniredis.NewMiniRedis()
+ err := mr.Start()
+
+ if err != nil {
+ panic(err)
+ }
+
+ // Démarre un serveur Nats
+ opts := &server.Options{}
+ opts.Host = "0.0.0.0"
+ ns, err := server.NewServer(opts)
+
+ if err != nil {
+ panic(err)
+ }
+
+ go ns.Start()
+
+ if !ns.ReadyForConnections(4 * time.Second) {
+ panic("not ready for connection")
+ }
+
+ // Edite le json de configuration donné
+ // Et injecte la configuration des servers Nats et MiniRedis
+ json, _ := gabs.ParseJSON([]byte(str))
+ json.Set(fmt.Sprintf("redis://%s", mr.Addr()), "redis", "url")
+ json.Set("localhost", "nats", "host")
+ json.Set(1, "webhook", "discord", "client_id")
+
+ // Démarre une instance de nova
+ instance := C.start_instance(C.CString(json.String()))
+
+ // Wait for a SIGINT
+ c := make(chan os.Signal, 1)
+ signal.Notify(c,
+ syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT)
+ <-c
+
+ println("Arret de nova all in one")
+ C.stop_instance(instance)
+}
--- /dev/null
+use std::cell::RefCell;
+
+use anyhow::Result;
+use libc::c_int;
+use tracing::error;
+
+thread_local! {
+ pub static ERROR_HANDLER: std::cell::RefCell<Option<unsafe extern "C" fn(libc::c_int, *mut libc::c_char)>> = RefCell::new(None);
+}
+
+/// Update the most recent error, clearing whatever may have been there before.
+pub fn stacktrace(err: anyhow::Error) -> String {
+ format!("{err}")
+}
+
+pub fn wrap_result<T, F>(func: F) -> Option<T>
+where
+ F: Fn() -> Result<T>,
+{
+ let result = func();
+
+ match result {
+ Ok(ok) => Some(ok),
+ Err(error) => {
+ // Call the handler
+ handle_error(error);
+ None
+ }
+ }
+}
+
+pub fn handle_error(error: anyhow::Error) {
+ ERROR_HANDLER.with(|val| {
+ let mut stacktrace = stacktrace(error);
+
+ error!("Error emitted: {}", stacktrace);
+ if let Some(func) = *val.borrow() {
+
+ // Call the error handler
+ unsafe {
+ func(
+ stacktrace.len() as c_int + 1,
+ stacktrace.as_mut_ptr() as *mut i8,
+ );
+ }
+ }
+ });
+}
+
+#[cfg(test)]
+mod tests {
+ // todo
+}
--- /dev/null
+use std::{
+ ffi::{c_char, c_int, CString},
+ mem::take,
+ ptr,
+ str::FromStr,
+ time::Duration,
+};
+
+use gateway::GatewayServer;
+use opentelemetry::{global::set_text_map_propagator, sdk::propagation::TraceContextPropagator};
+use ratelimit::RatelimiterServerComponent;
+use rest::ReverseProxyServer;
+use tokio::{runtime::Runtime, sync::mpsc};
+use tracing::{debug, error};
+use tracing_subscriber::{
+ filter::Directive, fmt, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
+ EnvFilter,
+};
+use webhook::WebhookServer;
+
+use crate::{
+ errors::{handle_error, wrap_result, ERROR_HANDLER},
+ utils::{load_config_file, start_component, AllInOneInstance},
+};
+
+#[no_mangle]
+pub unsafe extern "C" fn set_error_handler(func: unsafe extern "C" fn(c_int, *mut c_char)) {
+ debug!("Setting error handler");
+ ERROR_HANDLER.with(|prev| {
+ *prev.borrow_mut() = Some(func);
+ });
+}
+
+#[no_mangle]
+/// Loads the config json using the nova shared config loader
+pub extern "C" fn load_config() -> *mut c_char {
+ wrap_result(move || {
+ let config = serde_json::to_string(&load_config_file()?)?;
+ let c_str_song = CString::new(config)?;
+ Ok(c_str_song.into_raw())
+ })
+ .or(Some(ptr::null::<i8>() as *mut i8))
+ .expect("something has gone terribly wrong")
+}
+
+#[no_mangle]
+pub extern "C" fn stop_instance(instance: *mut AllInOneInstance) {
+ wrap_result(move || {
+ let mut instance = unsafe { Box::from_raw(instance) };
+ let handles = take(&mut instance.handles);
+ instance.runtime.block_on(async move {
+ for (name, sender, join) in handles {
+ debug!("Halting component {}", name);
+ let _ = sender
+ .send(())
+ .or_else(|_| Err(error!("Component {} is not online", name)));
+ match join.await {
+ Ok(_) => {}
+ Err(error) => error!("Task for component {} panic'ed {}", name, error),
+ };
+ debug!("Component {} halted", name);
+ }
+ });
+
+ instance.runtime.shutdown_timeout(Duration::from_secs(5));
+
+ Ok(())
+ });
+}
+
+#[no_mangle]
+pub extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneInstance {
+ wrap_result(move || {
+ let value = unsafe { CString::from_raw(config) };
+ let json = value.to_str()?;
+
+ // Main stop signal for this instance
+ let (error_sender, mut errors) = mpsc::channel(50);
+ let mut handles = vec![];
+
+ let runtime = Runtime::new()?;
+
+ // Setup the tracing system
+ set_text_map_propagator(TraceContextPropagator::new());
+ tracing_subscriber::registry()
+ .with(fmt::layer())
+ .with(
+ EnvFilter::builder()
+ .with_default_directive(Directive::from_str("info").unwrap())
+ .from_env()
+ .unwrap(),
+ )
+ .init();
+
+ // Error handling task
+ runtime.spawn(async move {
+ while let Some(error) = errors.recv().await {
+ handle_error(error)
+ }
+ });
+
+ handles.push(start_component::<GatewayServer>(
+ json,
+ error_sender.clone(),
+ &runtime,
+ )?);
+
+ std::thread::sleep(Duration::from_secs(1));
+
+ handles.push(start_component::<RatelimiterServerComponent>(
+ json,
+ error_sender.clone(),
+ &runtime,
+ )?);
+
+ std::thread::sleep(Duration::from_secs(1));
+
+ handles.push(start_component::<ReverseProxyServer>(
+ json,
+ error_sender.clone(),
+ &runtime,
+ )?);
+
+ std::thread::sleep(Duration::from_secs(1));
+
+ handles.push(start_component::<WebhookServer>(
+ json,
+ error_sender.clone(),
+ &runtime,
+ )?);
+
+ let all_in_one = Box::into_raw(Box::new(AllInOneInstance { runtime, handles }));
+
+ Ok(all_in_one)
+ })
+ .or(Some(ptr::null::<i8>() as *mut AllInOneInstance))
+ .expect("something has gone terribly wrong")
+}
--- /dev/null
+pub mod utils;
+pub mod errors;
+pub mod ffi;
--- /dev/null
+use all_in_one::ffi::{create_instance, load_config, stop_instance};
+use std::sync::mpsc::channel;
+use ctrlc;
+
+fn main() {
+ let c = load_config();
+ let comp = create_instance(c);
+
+ // wait for signal
+ let (tx, rx) = channel();
+
+ ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel."))
+ .expect("Error setting Ctrl-C handler");
+
+ rx.recv().unwrap();
+
+ println!("Exiting.");
+
+ stop_instance(comp);
+}
\ No newline at end of file
--- /dev/null
+use anyhow::Result;
+use config::{Config, Environment, File};
+use leash::Component;
+use serde::de::DeserializeOwned;
+use serde_json::Value;
+use shared::config::Settings;
+use tokio::{
+ runtime::Runtime,
+ sync::{mpsc, oneshot::Sender},
+ task::JoinHandle,
+};
+use tracing::{
+ debug,
+ log::{error, info},
+};
+
+/// Represents a all in one instance
+pub struct AllInOneInstance {
+ pub runtime: Runtime,
+ pub(crate) handles: Vec<(&'static str, Sender<()>, JoinHandle<()>)>,
+}
+
+/// Loads the settings from a component using a string
+fn load_settings_for<T: Default + DeserializeOwned + Clone>(
+ settings: &str,
+ name: &str,
+) -> Result<Settings<T>> {
+ let value: Value = serde_json::from_str(settings)?;
+ let section: T = serde_json::from_value(value.get(name).unwrap().to_owned())?;
+ let mut settings: Settings<T> = serde_json::from_value(value)?;
+ settings.config = section;
+
+ Ok(settings)
+}
+
+pub(crate) fn start_component<T: Component>(
+ json: &str,
+ error_sender: mpsc::Sender<anyhow::Error>,
+ runtime: &Runtime,
+) -> Result<(&'static str, Sender<()>, JoinHandle<()>)> {
+ let name = T::SERVICE_NAME;
+ let instance = T::new();
+
+ // We setup stop signals
+ let (stop, signal) = tokio::sync::oneshot::channel();
+ let settings = load_settings_for(json, name)?;
+
+ let handle = runtime.spawn(async move {
+ debug!("starting component {}", name);
+ match instance.start(settings, signal).await {
+ Ok(_) => info!("Component {} gracefully exited", name),
+ Err(error) => {
+ error!("Component {} exited with error {}", name, error);
+ error_sender
+ .send(error)
+ .await
+ .expect("Couldn't send the error notification to the error mpsc");
+ }
+ }
+ });
+
+ Ok((name, stop, handle))
+}
+
+pub(crate) fn load_config_file() -> Result<Value> {
+ let mut builder = Config::builder();
+
+ builder = builder.add_source(File::with_name("config/default"));
+ let mode = std::env::var("ENV").unwrap_or_else(|_| "development".into());
+ info!("Configuration Environment: {}", mode);
+
+ builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false));
+ builder = builder.add_source(File::with_name("config/local").required(false));
+
+ let env = Environment::with_prefix("NOVA").separator("__");
+ // we can configure each component using environment variables
+ builder = builder.add_source(env);
+
+ let config: Value = builder.build()?.try_deserialize()?;
+
+ Ok(config)
+}
+++ /dev/null
-build/*
-!build/.gitkeep
-config/
\ No newline at end of file
+++ /dev/null
-[package]
-name = "all"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-libc = "0.2.139"
-leash = { path = "../../libs/leash" }
-shared = { path = "../../libs/shared" }
-
-cache = { path = "../cache" }
-gateway = { path = "../gateway" }
-ratelimit = { path = "../ratelimit" }
-rest = { path = "../rest" }
-webhook = { path = "../webhook" }
-
-tokio = { version = "1.23.1", features = ["rt"] }
-serde = "1.0.152"
-serde_json = "1.0.91"
-anyhow = "1.0.68"
-
-tracing = "0.1.37"
-
-config = "0.13.3"
-
-tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
-tracing-opentelemetry = "0.18.0"
-opentelemetry = { version ="0.18.0", features = ["rt-tokio"] }
-opentelemetry-otlp = { version = "0.11.0" }
-
-[lib]
-crate-type = ["staticlib"]
-
-[build-dependencies]
-cbindgen = "0.24.3"
\ No newline at end of file
+++ /dev/null
-clean:
- rm ./build/*
-
-library:
- cargo build --release
-
-build: library
- cp ../../target/release/liball.a ./build
- go build -a -ldflags '-s' -o build/all
-
-all: library build
-
-run: all
- ./build/all
+++ /dev/null
-extern crate cbindgen;
-
-use cbindgen::{Config, Language};
-use std::env;
-use std::path::PathBuf;
-
-fn main() {
- let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
-
- let package_name = env::var("CARGO_PKG_NAME").unwrap();
- let output_file = PathBuf::from("./build")
- .join(format!("{}.h", package_name))
- .display()
- .to_string();
-
- let config = Config {
- language: Language::C,
- ..Default::default()
- };
-
- cbindgen::generate_with_config(crate_dir, config)
- .unwrap()
- .write_to_file(output_file);
-}
+++ /dev/null
-package main
-
-/*
-#cgo LDFLAGS: -L./build -lall -lz -lm
-#include "./build/all.h"
-*/
-import "C"
-
-import (
- "fmt"
- "os"
- "os/signal"
- "syscall"
- "time"
-
- "github.com/Jeffail/gabs"
- "github.com/alicebob/miniredis/v2"
-
- server "github.com/nats-io/nats-server/v2/server"
-)
-
-func main() {
- // Intialise les logs de la librarie Rust
- C.init_logs()
- // Charge la configuration
- str := C.GoString(C.load_config())
-
- // Démarre une instance MiniRedis
- mr := miniredis.NewMiniRedis()
- err := mr.Start()
-
- if err != nil {
- panic(err)
- }
-
- // Démarre un serveur Nats
- opts := &server.Options{}
- opts.Host = "0.0.0.0"
- ns, err := server.NewServer(opts)
-
- if err != nil {
- panic(err)
- }
-
- go ns.Start()
-
- if !ns.ReadyForConnections(4 * time.Second) {
- panic("not ready for connection")
- }
-
- // Edite le json de configuration donné
- // Et injecte la configuration des servers Nats et MiniRedis
- json, _ := gabs.ParseJSON([]byte(str))
- json.Set(fmt.Sprintf("redis://%s", mr.Addr()), "redis", "url")
- json.Set("localhost", "nats", "host")
- json.Set(1, "webhook", "discord", "client_id")
-
- // Démarre une instance de nova
- instance := C.start_instance(C.CString(json.String()))
-
- // Wait for a SIGINT
- c := make(chan os.Signal, 1)
- signal.Notify(c,
- syscall.SIGHUP,
- syscall.SIGINT,
- syscall.SIGTERM,
- syscall.SIGQUIT)
- <-c
-
- println("Arret de nova all in one")
- C.stop_instance(instance)
-}
+++ /dev/null
-#![allow(clippy::missing_safety_doc)]
-
-extern crate libc;
-use anyhow::Result;
-use config::{Config, Environment, File};
-use gateway::GatewayServer;
-use leash::Component;
-use opentelemetry::{
- global,
- sdk::{propagation::TraceContextPropagator, trace, Resource},
- KeyValue,
-};
-use opentelemetry_otlp::WithExportConfig;
-use ratelimit::RatelimiterServerComponent;
-use rest::ReverseProxyServer;
-use serde::de::DeserializeOwned;
-use serde_json::Value;
-use shared::config::Settings;
-use std::{
- env,
- ffi::{CStr, CString},
- str::FromStr,
- time::Duration,
-};
-use tokio::{
- runtime::Runtime,
- sync::oneshot::{self, Sender},
- task::JoinHandle,
-};
-use tracing::info;
-use tracing_subscriber::{
- filter::Directive, fmt, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
- EnvFilter,
-};
-use webhook::WebhookServer;
-
-pub struct AllInOneInstance {
- pub stop: Sender<Sender<()>>,
- pub runtime: Runtime,
-}
-
-fn load_settings_for<T: Default + DeserializeOwned + Clone>(
- settings: &str,
- name: &str,
-) -> Result<Settings<T>> {
- let value: Value = serde_json::from_str(settings)?;
- let section: T = serde_json::from_value(value.get(name).unwrap().to_owned())?;
- let mut settings: Settings<T> = serde_json::from_value(value)?;
- settings.config = section;
-
- Ok(settings)
-}
-
-// Start a component
-async fn start_component<T: Component>(
- settings: String,
- aio: &mut Vec<Sender<()>>,
-) -> JoinHandle<()> {
- let name = T::SERVICE_NAME;
- let instance = T::new();
-
- let (stop, signal) = oneshot::channel();
-
- aio.push(stop);
-
- tokio::spawn(async move {
- let config = load_settings_for::<<T as Component>::Config>(&settings, name).unwrap();
- instance.start(config, signal).await.unwrap();
- })
-}
-
-#[no_mangle]
-/// Loads the config json using the nova shared config loader
-pub extern "C" fn load_config() -> *const libc::c_char {
- let mut builder = Config::builder();
-
- builder = builder.add_source(File::with_name("config/default"));
- let mode = env::var("ENV").unwrap_or_else(|_| "development".into());
- info!("Configuration Environment: {}", mode);
-
- builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false));
- builder = builder.add_source(File::with_name("config/local").required(false));
-
- let env = Environment::with_prefix("NOVA").separator("__");
- // we can configure each component using environment variables
- builder = builder.add_source(env);
-
- let config: Value = builder.build().unwrap().try_deserialize().unwrap();
- let s = serde_json::to_string(&config).unwrap();
-
- let c_str_song = CString::new(s).unwrap();
- c_str_song.into_raw()
-}
-
-#[no_mangle]
-/// Initialise les logs des composants de nova
-/// Utilise la crate `pretty_log_env`
-pub extern "C" fn init_logs() {}
-
-#[no_mangle]
-/// Stops a nova instance
-pub unsafe extern "C" fn stop_instance(instance: *mut AllInOneInstance) {
- let instance = Box::from_raw(instance);
- let (tell_ready, ready) = oneshot::channel();
- instance.stop.send(tell_ready).unwrap();
- ready.blocking_recv().unwrap();
- instance.runtime.shutdown_timeout(Duration::from_secs(5));
-}
-
-#[no_mangle]
-/// Initialized a new nova instance and an async runtime (tokio reactor)
-/// Dont forget to stop this instance using `stop_instance`
-pub unsafe extern "C" fn start_instance(config: *const libc::c_char) -> *mut AllInOneInstance {
- let buf_name = unsafe { CStr::from_ptr(config).to_bytes() };
- let settings = String::from_utf8(buf_name.to_vec()).unwrap();
- let (stop, trigger_stop) = oneshot::channel();
-
- // Initialize a tokio runtime
- let rt = Runtime::new().unwrap();
- rt.block_on(async move {
- global::set_text_map_propagator(TraceContextPropagator::new());
- let tracer =
- opentelemetry_otlp::new_pipeline()
- .tracing()
- .with_trace_config(trace::config().with_resource(Resource::new(vec![
- KeyValue::new("service.name", "all-in-one"),
- ])))
- .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env())
- .install_batch(opentelemetry::runtime::Tokio)
- .unwrap();
-
- let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
-
- tracing_subscriber::registry()
- .with(fmt::layer())
- .with(telemetry)
- .with(
- EnvFilter::builder()
- .with_default_directive(Directive::from_str("info").unwrap())
- .from_env()
- .unwrap(),
- )
- .init();
- // Start the gateway server
-
- let mut aio = vec![];
- let mut handles = vec![];
-
- // Start components
- handles.push(start_component::<GatewayServer>(settings.clone(), &mut aio).await);
- handles
- .push(start_component::<RatelimiterServerComponent>(settings.clone(), &mut aio).await);
- handles.push(start_component::<ReverseProxyServer>(settings.clone(), &mut aio).await);
- handles.push(start_component::<WebhookServer>(settings.clone(), &mut aio).await);
-
- // wait for exit
- let done: Sender<()> = trigger_stop.await.unwrap();
-
- // Tell all the threads to stop.
- while let Some(stop_signal) = aio.pop() {
- stop_signal.send(()).unwrap();
- }
-
- // Wait for all the threads to finish.
- while let Some(handle) = handles.pop() {
- handle.await.unwrap();
- }
-
- done.send(()).unwrap();
- });
- Box::into_raw(Box::new(AllInOneInstance { stop, runtime: rt }))
-}
}
pub struct GatewayServer {}
+
impl Component for GatewayServer {
type Config = GatewayConfig;
const SERVICE_NAME: &'static str = "gateway";
--- /dev/null
+use tokio::sync::{
+ mpsc::{self, UnboundedReceiver, UnboundedSender},
+ oneshot::Sender,
+ Mutex,
+};
+
+/// Queue of ratelimit requests for a bucket.
+#[derive(Debug)]
+pub struct AsyncQueue {
+ /// Receiver for the ratelimit requests.
+ rx: Mutex<UnboundedReceiver<Sender<()>>>,
+ /// Sender for the ratelimit requests.
+ tx: UnboundedSender<Sender<()>>,
+}
+
+impl AsyncQueue {
+ /// Add a new ratelimit request to the queue.
+ pub fn push(&self, tx: Sender<()>) {
+ let _sent = self.tx.send(tx);
+ }
+
+ /// Receive the first incoming ratelimit request.
+ pub async fn pop(&self) -> Option<Sender<()>> {
+ let mut rx = self.rx.lock().await;
+
+ rx.recv().await
+ }
+}
+
+impl Default for AsyncQueue {
+ fn default() -> Self {
+ let (tx, rx) = mpsc::unbounded_channel();
+
+ Self {
+ rx: Mutex::new(rx),
+ tx,
+ }
+ }
+}
--- /dev/null
+use std::{
+ sync::atomic::{AtomicU64, Ordering},
+ time::{Duration, SystemTime, UNIX_EPOCH},
+};
+
+#[derive(Default, Debug)]
+pub struct AtomicInstant(AtomicU64);
+
+impl AtomicInstant {
+ pub const fn empty() -> Self {
+ Self(AtomicU64::new(0))
+ }
+
+ pub fn elapsed(&self) -> Duration {
+ Duration::from_millis(
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64 as u64
+ - self.0.load(Ordering::SeqCst),
+ )
+ }
+
+ pub fn as_millis(&self) -> u64 {
+ self.0.load(Ordering::SeqCst)
+ }
+
+ pub fn set_millis(&self, millis: u64) {
+ self.0.store(millis, Ordering::SeqCst);
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.as_millis() == 0
+ }
+}
--- /dev/null
+use std::{
+ sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc,
+ },
+ time::Duration,
+};
+use tokio::{sync::oneshot, task::JoinHandle};
+use tracing::debug;
+use twilight_http_ratelimiting::headers::Present;
+
+use super::{async_queue::AsyncQueue, atomic_instant::AtomicInstant, redis_lock::RedisLock};
+
+#[derive(Clone, Debug)]
+pub enum TimeRemaining {
+ Finished,
+ NotStarted,
+ Some(Duration),
+}
+
+#[derive(Debug)]
+pub struct Bucket {
+ pub limit: AtomicU64,
+ /// Queue associated with this bucket.
+ pub queue: AsyncQueue,
+ /// Number of tickets remaining.
+ pub remaining: AtomicU64,
+ /// Duration after the [`Self::last_update`] time the bucket will refresh.
+ pub reset_after: AtomicU64,
+ /// When the bucket's ratelimit refresh countdown started (unix millis)
+ pub last_update: AtomicInstant,
+
+ pub tasks: Vec<JoinHandle<()>>,
+}
+
+impl Drop for Bucket {
+ fn drop(&mut self) {
+ for join in &self.tasks {
+ join.abort();
+ }
+ }
+}
+
+impl Bucket {
+ /// Create a new bucket for the specified [`Path`].
+ pub fn new(global: Arc<RedisLock>) -> Arc<Self> {
+ let tasks = vec![];
+
+ let this = Arc::new(Self {
+ limit: AtomicU64::new(u64::max_value()),
+ queue: AsyncQueue::default(),
+ remaining: AtomicU64::new(u64::max_value()),
+ reset_after: AtomicU64::new(u64::max_value()),
+ last_update: AtomicInstant::empty(),
+ tasks,
+ });
+
+ // Run with 4 dequeue tasks
+ for _ in 0..4 {
+ let this = this.clone();
+ let global = global.clone();
+ tokio::spawn(async move {
+ while let Some(element) = this.queue.pop().await {
+ // we need to wait
+ if let Some(duration) = global.locked_for().await {
+ tokio::time::sleep(duration).await;
+ }
+
+ if this.remaining() == 0 {
+ debug!("0 tickets remaining, we have to wait.");
+
+ match this.time_remaining() {
+ TimeRemaining::Finished => {
+ this.try_reset();
+ }
+ TimeRemaining::Some(duration) => {
+ debug!(milliseconds=%duration.as_millis(), "waiting for ratelimit");
+ tokio::time::sleep(duration).await;
+
+ this.try_reset();
+ }
+ TimeRemaining::NotStarted => {}
+ }
+ }
+
+ this.remaining.fetch_sub(1, Ordering::Relaxed);
+ let _ = element.send(()).map_err(|_| { debug!("response channel was closed.") });
+ }
+ });
+ }
+
+ this
+ }
+
+ /// Total number of tickets allotted in a cycle.
+ pub fn limit(&self) -> u64 {
+ self.limit.load(Ordering::Relaxed)
+ }
+
+ /// Number of tickets remaining.
+ pub fn remaining(&self) -> u64 {
+ self.remaining.load(Ordering::Relaxed)
+ }
+
+ /// Duration after the [`started_at`] time the bucket will refresh.
+ ///
+ /// [`started_at`]: Self::started_at
+ pub fn reset_after(&self) -> u64 {
+ self.reset_after.load(Ordering::Relaxed)
+ }
+
+ /// Time remaining until this bucket will reset.
+ pub fn time_remaining(&self) -> TimeRemaining {
+ let reset_after = self.reset_after();
+ let last_update = &self.last_update;
+
+ if last_update.is_empty() {
+ let elapsed = last_update.elapsed();
+
+ if elapsed > Duration::from_millis(reset_after) {
+ return TimeRemaining::Finished;
+ }
+
+ TimeRemaining::Some(Duration::from_millis(reset_after) - elapsed)
+ } else {
+ return TimeRemaining::NotStarted;
+ }
+ }
+
+ /// Try to reset this bucket's [`started_at`] value if it has finished.
+ ///
+ /// Returns whether resetting was possible.
+ ///
+ /// [`started_at`]: Self::started_at
+ pub fn try_reset(&self) -> bool {
+ if self.last_update.is_empty() {
+ return false;
+ }
+
+ if let TimeRemaining::Finished = self.time_remaining() {
+ self.remaining.store(self.limit(), Ordering::Relaxed);
+ self.last_update.set_millis(0);
+
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Update this bucket's ratelimit data after a request has been made.
+ pub fn update(&self, ratelimits: Present, time: u64) {
+ let bucket_limit = self.limit();
+
+ if self.last_update.is_empty() {
+ self.last_update.set_millis(time);
+ }
+
+ if bucket_limit != ratelimits.limit() && bucket_limit == u64::max_value() {
+ self.reset_after
+ .store(ratelimits.reset_after(), Ordering::SeqCst);
+ self.limit.store(ratelimits.limit(), Ordering::SeqCst);
+ }
+
+ self.remaining
+ .store(ratelimits.remaining(), Ordering::Relaxed);
+ }
+
+ pub async fn ticket(&self) -> oneshot::Receiver<()> {
+ let (tx, rx) = oneshot::channel();
+ self.queue.push(tx);
+ rx
+ }
+}
--- /dev/null
+pub mod bucket;
+pub mod redis_lock;
+pub mod atomic_instant;
+pub mod async_queue;
\ No newline at end of file
--- /dev/null
+use std::{
+ sync::{atomic::AtomicU64, Arc},
+ time::{Duration, SystemTime},
+};
+
+use redis::{aio::MultiplexedConnection, AsyncCommands};
+use tokio::sync::Mutex;
+use tracing::debug;
+
+/// This is flawed and needs to be replaced sometime with the real RedisLock algorithm
+#[derive(Debug)]
+pub struct RedisLock {
+ redis: Mutex<MultiplexedConnection>,
+ is_locked: AtomicU64,
+}
+
+impl RedisLock {
+ /// Set the global ratelimit as exhausted.
+ pub async fn lock_for(self: &Arc<Self>, duration: Duration) {
+ debug!("locking globally for {}", duration.as_secs());
+ let _: () = self
+ .redis
+ .lock()
+ .await
+ .set_ex(
+ "nova:rls:lock",
+ 1,
+ (duration.as_secs() + 1).try_into().unwrap(),
+ )
+ .await
+ .unwrap();
+
+ self.is_locked.store(
+ (SystemTime::now() + duration)
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64,
+ std::sync::atomic::Ordering::Relaxed,
+ );
+ }
+
+ pub async fn locked_for(self: &Arc<Self>) -> Option<Duration> {
+ let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed);
+ if load != 0 {
+ if load
+ > SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64
+ {
+ return Some(Duration::from_millis(load));
+ } else {
+ self.is_locked
+ .store(0, std::sync::atomic::Ordering::Relaxed);
+ }
+ }
+
+ let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await;
+ match result {
+ Ok(remaining_time) => {
+ if remaining_time > 0 {
+ let duration = Duration::from_secs(remaining_time as u64);
+ debug!("external global lock detected, locking");
+ self.lock_for(duration).await;
+ Some(duration)
+ } else {
+ None
+ }
+ }
+ Err(error) => {
+ debug!("redis call failed: {}", error);
+
+ None
+ }
+ }
+ }
+
+ pub fn new(redis: MultiplexedConnection) -> Arc<Self> {
+ Arc::new(Self {
+ redis: Mutex::new(redis),
+ is_locked: AtomicU64::new(0),
+ })
+ }
+}
-use opentelemetry::{global, propagation::Extractor};
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use opentelemetry::global;
+use opentelemetry::propagation::Extractor;
+use proto::nova::ratelimit::ratelimiter::HeadersSubmitRequest;
use proto::nova::ratelimit::ratelimiter::{
- ratelimiter_server::Ratelimiter, BucketSubmitTicketRequest, BucketSubmitTicketResponse,
+ ratelimiter_server::Ratelimiter, BucketSubmitTicketRequest,
};
-use std::pin::Pin;
-use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
-use tonic::{Request, Response, Status, Streaming};
-use tracing::{debug, debug_span, info, Instrument};
+use tokio::sync::RwLock;
+use tonic::Response;
+use tracing::debug;
use tracing_opentelemetry::OpenTelemetrySpanExt;
-use twilight_http_ratelimiting::{ticket::TicketReceiver, RatelimitHeaders};
+use twilight_http_ratelimiting::RatelimitHeaders;
-use crate::redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter;
+use crate::buckets::bucket::Bucket;
+use crate::buckets::redis_lock::RedisLock;
pub struct RLServer {
- ratelimiter: RedisGlobalLocalBucketRatelimiter,
+ global: Arc<RedisLock>,
+ buckets: RwLock<HashMap<String, Arc<Bucket>>>,
}
impl RLServer {
- pub fn new(ratelimiter: RedisGlobalLocalBucketRatelimiter) -> Self {
- Self { ratelimiter }
+ pub fn new(redis_lock: Arc<RedisLock>) -> Self {
+ Self {
+ global: redis_lock,
+ buckets: RwLock::new(HashMap::new()),
+ }
}
}
#[tonic::async_trait]
impl Ratelimiter for RLServer {
- type SubmitTicketStream =
- Pin<Box<dyn Stream<Item = Result<BucketSubmitTicketResponse, Status>> + Send>>;
-
- async fn submit_ticket(
+ async fn submit_headers(
&self,
- req: Request<Streaming<BucketSubmitTicketRequest>>,
- ) -> Result<Response<Self::SubmitTicketStream>, Status> {
+ request: tonic::Request<HeadersSubmitRequest>,
+ ) -> Result<tonic::Response<()>, tonic::Status> {
let parent_cx =
- global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata())));
+ global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(request.metadata())));
// Generate a tracing span as usual
let span = tracing::span!(tracing::Level::INFO, "request process");
-
- // Assign parent trace from external context
span.set_parent(parent_cx);
- let mut in_stream = req.into_inner();
- let (tx, rx) = mpsc::channel(128);
- let imrl = self.ratelimiter.clone();
-
- // this spawn here is required if you want to handle connection error.
- // If we just map `in_stream` and write it back as `out_stream` the `out_stream`
- // will be drooped when connection error occurs and error will never be propagated
- // to mapped version of `in_stream`.
- tokio::spawn(async move {
- let mut receiver: Option<TicketReceiver> = None;
- while let Some(result) = in_stream.next().await {
- let result = result.unwrap();
-
- match result.data.unwrap() {
- proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Path(path) => {
- let span = debug_span!("requesting ticket");
- let a = imrl.ticket(path).instrument(span).await.unwrap();
- receiver = Some(a);
-
- tx.send(Ok(BucketSubmitTicketResponse {
- accepted: 1
- })).await.unwrap();
- },
- proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Headers(b) => {
- if let Some(recv) = receiver {
- let span = debug_span!("waiting for headers data");
- let recv = recv.instrument(span).await.unwrap();
- let rheaders = RatelimitHeaders::from_pairs(b.headers.iter().map(|f| (f.0.as_str(), f.1.as_bytes()))).unwrap();
-
- recv.headers(Some(rheaders)).unwrap();
- break;
- }
- },
- }
+ let data = request.into_inner();
+
+ let ratelimit_headers = RatelimitHeaders::from_pairs(
+ data.headers.iter().map(|f| (f.0 as &str, f.1.as_bytes())),
+ )
+ .unwrap();
+
+ let bucket: Arc<Bucket> = if self.buckets.read().await.contains_key(&data.path) {
+ self.buckets
+ .read()
+ .await
+ .get(&data.path)
+ .expect("impossible")
+ .clone()
+ } else {
+ let bucket = Bucket::new(self.global.clone());
+ self.buckets.write().await.insert(data.path, bucket.clone());
+ bucket
+ };
+
+ match ratelimit_headers {
+ RatelimitHeaders::Global(global) => {
+ // If we are globally ratelimited, we lock using the redis lock
+ // This is using redis because a global ratelimit should be executed in all
+ // ratelimit workers.
+ debug!("global ratelimit headers detected: {}", global.retry_after());
+ self.global
+ .lock_for(Duration::from_secs(global.retry_after()))
+ .await;
+ }
+ RatelimitHeaders::None => {}
+ RatelimitHeaders::Present(present) => {
+ // we should update the bucket.
+ bucket.update(present, data.precise_time);
}
+ _ => unreachable!(),
+ };
- debug!("\tstream ended");
- info!("request terminated");
- }.instrument(span));
+ Ok(Response::new(()))
+ }
- // echo just write the same data that was received
- let out_stream = ReceiverStream::new(rx);
+ async fn submit_ticket(
+ &self,
+ request: tonic::Request<BucketSubmitTicketRequest>,
+ ) -> Result<tonic::Response<()>, tonic::Status> {
+ let parent_cx =
+ global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(request.metadata())));
+ // Generate a tracing span as usual
+ let span = tracing::span!(tracing::Level::INFO, "request process");
+ span.set_parent(parent_cx);
- Ok(Response::new(
- Box::pin(out_stream) as Self::SubmitTicketStream
- ))
+ let data = request.into_inner();
+
+ let bucket: Arc<Bucket> = if self.buckets.read().await.contains_key(&data.path) {
+ self.buckets
+ .read()
+ .await
+ .get(&data.path)
+ .expect("impossible")
+ .clone()
+ } else {
+ let bucket = Bucket::new(self.global.clone());
+ self.buckets.write().await.insert(data.path, bucket.clone());
+ bucket
+ };
+
+ // wait for the ticket to be accepted
+ bucket.ticket().await;
+
+ Ok(Response::new(()))
}
}
+use buckets::redis_lock::RedisLock;
use config::RatelimitServerConfig;
use grpc::RLServer;
use leash::{AnyhowResultFuture, Component};
use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer;
use redis::aio::MultiplexedConnection;
-use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter;
use shared::config::Settings;
use std::future::Future;
-use std::{net::ToSocketAddrs, pin::Pin};
+use std::{pin::Pin};
use tokio::sync::oneshot;
use tonic::transport::Server;
mod config;
mod grpc;
-mod redis_global_local_bucket_ratelimiter;
+mod buckets;
pub struct RatelimiterServerComponent {}
impl Component for RatelimiterServerComponent {
>::into(settings.redis)
.await?;
- let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis));
+ let server = RLServer::new(RedisLock::new(redis));
Server::builder()
.add_service(RatelimiterServer::new(server))
+++ /dev/null
-//! [`Bucket`] management used by the [`super::InMemoryRatelimiter`] internally.
-//! Each bucket has an associated [`BucketQueue`] to queue an API request, which is
-//! consumed by the [`BucketQueueTask`] that manages the ratelimit for the bucket
-//! and respects the global ratelimit.
-
-use super::RedisLockPair;
-use std::{
- collections::HashMap,
- mem,
- sync::{
- atomic::{AtomicU64, Ordering},
- Arc, Mutex,
- },
- time::{Duration, Instant},
-};
-use tokio::{
- sync::{
- mpsc::{self, UnboundedReceiver, UnboundedSender},
- Mutex as AsyncMutex,
- },
- time::{sleep, timeout},
-};
-use twilight_http_ratelimiting::{headers::RatelimitHeaders, ticket::TicketNotifier};
-
-/// Time remaining until a bucket will reset.
-#[derive(Clone, Debug)]
-pub enum TimeRemaining {
- /// Bucket has already reset.
- Finished,
- /// Bucket's ratelimit refresh countdown has not started yet.
- NotStarted,
- /// Amount of time until the bucket resets.
- Some(Duration),
-}
-
-/// Ratelimit information for a bucket used in the [`super::InMemoryRatelimiter`].
-///
-/// A generic version not specific to this ratelimiter is [`crate::Bucket`].
-#[derive(Debug)]
-pub struct Bucket {
- /// Total number of tickets allotted in a cycle.
- pub limit: AtomicU64,
- /// Path this ratelimit applies to.
- pub path: String,
- /// Queue associated with this bucket.
- pub queue: BucketQueue,
- /// Number of tickets remaining.
- pub remaining: AtomicU64,
- /// Duration after the [`Self::started_at`] time the bucket will refresh.
- pub reset_after: AtomicU64,
- /// When the bucket's ratelimit refresh countdown started.
- pub started_at: Mutex<Option<Instant>>,
-}
-
-impl Bucket {
- /// Create a new bucket for the specified [`Path`].
- pub fn new(path: String) -> Self {
- Self {
- limit: AtomicU64::new(u64::max_value()),
- path,
- queue: BucketQueue::default(),
- remaining: AtomicU64::new(u64::max_value()),
- reset_after: AtomicU64::new(u64::max_value()),
- started_at: Mutex::new(None),
- }
- }
-
- /// Total number of tickets allotted in a cycle.
- pub fn limit(&self) -> u64 {
- self.limit.load(Ordering::Relaxed)
- }
-
- /// Number of tickets remaining.
- pub fn remaining(&self) -> u64 {
- self.remaining.load(Ordering::Relaxed)
- }
-
- /// Duration after the [`started_at`] time the bucket will refresh.
- ///
- /// [`started_at`]: Self::started_at
- pub fn reset_after(&self) -> u64 {
- self.reset_after.load(Ordering::Relaxed)
- }
-
- /// Time remaining until this bucket will reset.
- pub fn time_remaining(&self) -> TimeRemaining {
- let reset_after = self.reset_after();
- let maybe_started_at = *self.started_at.lock().expect("bucket poisoned");
-
- let started_at = if let Some(started_at) = maybe_started_at {
- started_at
- } else {
- return TimeRemaining::NotStarted;
- };
-
- let elapsed = started_at.elapsed();
-
- if elapsed > Duration::from_millis(reset_after) {
- return TimeRemaining::Finished;
- }
-
- TimeRemaining::Some(Duration::from_millis(reset_after) - elapsed)
- }
-
- /// Try to reset this bucket's [`started_at`] value if it has finished.
- ///
- /// Returns whether resetting was possible.
- ///
- /// [`started_at`]: Self::started_at
- pub fn try_reset(&self) -> bool {
- if self.started_at.lock().expect("bucket poisoned").is_none() {
- return false;
- }
-
- if let TimeRemaining::Finished = self.time_remaining() {
- self.remaining.store(self.limit(), Ordering::Relaxed);
- *self.started_at.lock().expect("bucket poisoned") = None;
-
- true
- } else {
- false
- }
- }
-
- /// Update this bucket's ratelimit data after a request has been made.
- pub fn update(&self, ratelimits: Option<(u64, u64, u64)>) {
- let bucket_limit = self.limit();
-
- {
- let mut started_at = self.started_at.lock().expect("bucket poisoned");
-
- if started_at.is_none() {
- started_at.replace(Instant::now());
- }
- }
-
- if let Some((limit, remaining, reset_after)) = ratelimits {
- if bucket_limit != limit && bucket_limit == u64::max_value() {
- self.reset_after.store(reset_after, Ordering::SeqCst);
- self.limit.store(limit, Ordering::SeqCst);
- }
-
- self.remaining.store(remaining, Ordering::Relaxed);
- } else {
- self.remaining.fetch_sub(1, Ordering::Relaxed);
- }
- }
-}
-
-/// Queue of ratelimit requests for a bucket.
-#[derive(Debug)]
-pub struct BucketQueue {
- /// Receiver for the ratelimit requests.
- rx: AsyncMutex<UnboundedReceiver<TicketNotifier>>,
- /// Sender for the ratelimit requests.
- tx: UnboundedSender<TicketNotifier>,
-}
-
-impl BucketQueue {
- /// Add a new ratelimit request to the queue.
- pub fn push(&self, tx: TicketNotifier) {
- let _sent = self.tx.send(tx);
- }
-
- /// Receive the first incoming ratelimit request.
- pub async fn pop(&self, timeout_duration: Duration) -> Option<TicketNotifier> {
- let mut rx = self.rx.lock().await;
-
- timeout(timeout_duration, rx.recv()).await.ok().flatten()
- }
-}
-
-impl Default for BucketQueue {
- fn default() -> Self {
- let (tx, rx) = mpsc::unbounded_channel();
-
- Self {
- rx: AsyncMutex::new(rx),
- tx,
- }
- }
-}
-
-/// A background task that handles ratelimit requests to a [`Bucket`]
-/// and processes them in order, keeping track of both the global and
-/// the [`Path`]-specific ratelimits.
-pub(super) struct BucketQueueTask {
- /// The [`Bucket`] managed by this task.
- bucket: Arc<Bucket>,
- /// All buckets managed by the associated [`super::InMemoryRatelimiter`].
- buckets: Arc<Mutex<HashMap<String, Arc<Bucket>>>>,
- /// Global ratelimit data.
- global: Arc<RedisLockPair>,
- /// The [`Path`] this [`Bucket`] belongs to.
- path: String,
-}
-
-impl BucketQueueTask {
- /// Timeout to wait for response headers after initiating a request.
- const WAIT: Duration = Duration::from_secs(10);
-
- /// Create a new task to manage the ratelimit for a [`Bucket`].
- pub fn new(
- bucket: Arc<Bucket>,
- buckets: Arc<Mutex<HashMap<String, Arc<Bucket>>>>,
- global: Arc<RedisLockPair>,
- path: String,
- ) -> Self {
- Self {
- bucket,
- buckets,
- global,
- path,
- }
- }
-
- /// Process incoming ratelimit requests to this bucket and update the state
- /// based on received [`RatelimitHeaders`].
- #[tracing::instrument(name = "background queue task", skip(self), fields(path = ?self.path))]
- pub async fn run(self) {
- while let Some(queue_tx) = self.next().await {
- if self.global.is_locked().await {
- mem::drop(self.global.0.lock().await);
- }
-
- let ticket_headers = if let Some(ticket_headers) = queue_tx.available() {
- ticket_headers
- } else {
- continue;
- };
-
- tracing::debug!("starting to wait for response headers");
-
- match timeout(Self::WAIT, ticket_headers).await {
- Ok(Ok(Some(headers))) => self.handle_headers(&headers).await,
- Ok(Ok(None)) => {
- tracing::debug!("request aborted");
- }
- Ok(Err(_)) => {
- tracing::debug!("ticket channel closed");
- }
- Err(_) => {
- tracing::debug!("receiver timed out");
- }
- }
- }
-
- tracing::debug!("bucket appears finished, removing");
-
- self.buckets
- .lock()
- .expect("ratelimit buckets poisoned")
- .remove(&self.path);
- }
-
- /// Update the bucket's ratelimit state.
- async fn handle_headers(&self, headers: &RatelimitHeaders) {
- let ratelimits = match headers {
- RatelimitHeaders::Global(global) => {
- self.lock_global(Duration::from_secs(global.retry_after()))
- .await;
-
- None
- }
- RatelimitHeaders::None => return,
- RatelimitHeaders::Present(present) => {
- Some((present.limit(), present.remaining(), present.reset_after()))
- }
- _ => unreachable!(),
- };
-
- tracing::debug!(path=?self.path, "updating bucket");
- self.bucket.update(ratelimits);
- }
-
- /// Lock the global ratelimit for a specified duration.
- async fn lock_global(&self, wait: Duration) {
- tracing::debug!(path=?self.path, "request got global ratelimited");
- self.global.lock_for(wait).await;
- }
-
- /// Get the next [`TicketNotifier`] in the queue.
- async fn next(&self) -> Option<TicketNotifier> {
- tracing::debug!(path=?self.path, "starting to get next in queue");
-
- self.wait_if_needed().await;
-
- self.bucket.queue.pop(Self::WAIT).await
- }
-
- /// Wait for this bucket to refresh if it isn't ready yet.
- #[tracing::instrument(name = "waiting for bucket to refresh", skip(self), fields(path = ?self.path))]
- async fn wait_if_needed(&self) {
- let wait = {
- if self.bucket.remaining() > 0 {
- return;
- }
-
- tracing::debug!("0 tickets remaining, may have to wait");
-
- match self.bucket.time_remaining() {
- TimeRemaining::Finished => {
- self.bucket.try_reset();
-
- return;
- }
- TimeRemaining::NotStarted => return,
- TimeRemaining::Some(dur) => dur,
- }
- };
-
- tracing::debug!(
- milliseconds=%wait.as_millis(),
- "waiting for ratelimit to pass",
- );
-
- sleep(wait).await;
-
- tracing::debug!("done waiting for ratelimit to pass");
-
- self.bucket.try_reset();
- }
-}
+++ /dev/null
-use self::bucket::{Bucket, BucketQueueTask};
-use redis::aio::MultiplexedConnection;
-use redis::AsyncCommands;
-use tokio::sync::Mutex;
-use twilight_http_ratelimiting::ticket::{self, TicketNotifier};
-use twilight_http_ratelimiting::GetTicketFuture;
-mod bucket;
-use std::future;
-use std::{
- collections::hash_map::{Entry, HashMap},
- sync::Arc,
- time::Duration,
-};
-
-#[derive(Debug)]
-struct RedisLockPair(Mutex<MultiplexedConnection>);
-
-impl RedisLockPair {
- /// Set the global ratelimit as exhausted.
- pub async fn lock_for(&self, duration: Duration) {
- let _: () = self
- .0
- .lock()
- .await
- .set_ex(
- "nova:rls:lock",
- 1,
- (duration.as_secs() + 1).try_into().unwrap(),
- )
- .await
- .unwrap();
- }
-
- pub async fn is_locked(&self) -> bool {
- self.0.lock().await.exists("nova:rls:lock").await.unwrap()
- }
-}
-
-#[derive(Clone, Debug)]
-pub struct RedisGlobalLocalBucketRatelimiter {
- buckets: Arc<std::sync::Mutex<HashMap<String, Arc<Bucket>>>>,
-
- global: Arc<RedisLockPair>,
-}
-
-impl RedisGlobalLocalBucketRatelimiter {
- #[must_use]
- pub fn new(redis: MultiplexedConnection) -> Self {
- Self {
- buckets: Arc::default(),
- global: Arc::new(RedisLockPair(Mutex::new(redis))),
- }
- }
-
- fn entry(&self, path: String, tx: TicketNotifier) -> Option<Arc<Bucket>> {
- let mut buckets = self.buckets.lock().expect("buckets poisoned");
-
- match buckets.entry(path.clone()) {
- Entry::Occupied(bucket) => {
- tracing::debug!("got existing bucket: {path:?}");
-
- bucket.get().queue.push(tx);
-
- tracing::debug!("added request into bucket queue: {path:?}");
-
- None
- }
- Entry::Vacant(entry) => {
- tracing::debug!("making new bucket for path: {path:?}");
-
- let bucket = Bucket::new(path);
- bucket.queue.push(tx);
-
- let bucket = Arc::new(bucket);
- entry.insert(Arc::clone(&bucket));
-
- Some(bucket)
- }
- }
- }
-
- pub fn ticket(&self, path: String) -> GetTicketFuture {
- tracing::debug!("getting bucket for path: {path:?}");
-
- let (tx, rx) = ticket::channel();
-
- if let Some(bucket) = self.entry(path.clone(), tx) {
- tokio::spawn(
- BucketQueueTask::new(
- bucket,
- Arc::clone(&self.buckets),
- Arc::clone(&self.global),
- path,
- )
- .run(),
- );
- }
-
- Box::pin(future::ready(Ok(rx)))
- }
-}
convert::TryFrom,
hash::{Hash, Hasher},
str::FromStr,
+ sync::Arc,
};
-use tracing::{debug_span, error, instrument, Instrument};
+use tracing::{debug_span, error, info_span, instrument, Instrument};
use twilight_http_ratelimiting::{Method, Path};
use crate::ratelimit_client::RemoteRatelimiter;
#[instrument]
pub async fn handle_request(
client: Client<HttpsConnector<HttpConnector>, Body>,
- ratelimiter: RemoteRatelimiter,
- token: &str,
+ ratelimiter: Arc<RemoteRatelimiter>,
+ token: String,
mut request: Request<Body>,
) -> Result<Response<Body>, anyhow::Error> {
let (hash, uri_string) = {
(hash.finish().to_string(), uri_string)
};
+ // waits for the request to be authorized
+ ratelimiter
+ .ticket(hash.clone())
+ .instrument(debug_span!("ticket validation request"))
+ .await?;
- let span = debug_span!("ticket validation request");
- let header_sender = match span
- .in_scope(|| ratelimiter.ticket(hash))
- .await
- {
- Ok(sender) => sender,
- Err(e) => {
- error!("Failed to receive ticket for ratelimiting: {:?}", e);
- bail!("failed to reteive ticket");
- }
- };
-
request
.headers_mut()
.insert(HOST, HeaderValue::from_static("discord.com"));
}
};
- let ratelimit_headers = resp
+ let headers = resp
.headers()
.into_iter()
- .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string()))
+ .map(|(k, v)| (k.to_string(), v.to_str().map(|f| f.to_string())))
+ .filter(|f| f.1.is_ok())
+ .map(|f| (f.0, f.1.expect("errors should be filtered")))
.collect();
- if header_sender.send(ratelimit_headers).is_err() {
- error!("Error when sending ratelimit headers to ratelimiter");
- };
+ let _ = ratelimiter
+ .submit_headers(hash, headers)
+ .instrument(info_span!("submitting headers"))
+ .await;
Ok(resp)
}
Body, Client, Request, Server,
};
use leash::{AnyhowResultFuture, Component};
-use opentelemetry::{global, trace::Tracer};
+use opentelemetry::{global};
use opentelemetry_http::HeaderExtractor;
use shared::config::Settings;
use std::{convert::Infallible, sync::Arc};
use tokio::sync::oneshot;
+use tracing_opentelemetry::OpenTelemetrySpanExt;
mod config;
mod handler;
) -> AnyhowResultFuture<()> {
Box::pin(async move {
// Client to the remote ratelimiters
- let ratelimiter = ratelimit_client::RemoteRatelimiter::new(settings.config.clone());
+ let ratelimiter = Arc::new(ratelimit_client::RemoteRatelimiter::new(
+ settings.config.clone(),
+ ));
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_only()
.build();
let client: Client<_, hyper::Body> = Client::builder().build(https);
- let token = Arc::new(settings.discord.token.clone());
+ let token = settings.config.discord.token.clone();
+
let service_fn = make_service_fn(move |_: &AddrStream| {
let client = client.clone();
let ratelimiter = ratelimiter.clone();
let token = token.clone();
async move {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
+ let token = token.clone();
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(request.headers()))
});
- let _span =
- global::tracer("").start_with_context("handle_request", &parent_cx);
+
+ let span = tracing::span!(tracing::Level::INFO, "request process");
+ span.set_parent(parent_cx);
let client = client.clone();
let ratelimiter = ratelimiter.clone();
- let token = token.clone();
+
async move {
- let token = token.as_str();
+ let token = token.clone();
+ let ratelimiter = ratelimiter.clone();
handle_request(client, ratelimiter, token, request).await
}
}))
}
});
- let server = Server::bind(&settings.config.server.listening_adress).http1_only(true).serve(service_fn);
+ let server = Server::bind(&settings.config.server.listening_adress)
+ .http1_only(true)
+ .serve(service_fn);
server
.with_graceful_shutdown(async {
use crate::config::ReverseProxyConfig;
use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode};
+use anyhow::anyhow;
use opentelemetry::global;
-use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};
-use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest;
+use proto::nova::ratelimit::ratelimiter::{BucketSubmitTicketRequest, HeadersSubmitRequest};
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
-use std::time::UNIX_EPOCH;
use std::time::{Duration, SystemTime};
-use tokio::sync::oneshot::{self};
-use tokio::sync::{broadcast, mpsc, RwLock};
-use tokio_stream::wrappers::ReceiverStream;
+use tokio::sync::{broadcast, RwLock};
use tonic::Request;
-use tracing::{debug, debug_span, instrument, Instrument, Span};
+use tracing::{debug, error, info_span, instrument, trace_span, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
mod remote_hashring;
impl Drop for RemoteRatelimiter {
fn drop(&mut self) {
- self.stop.clone().send(()).unwrap();
+ let _ = self
+ .stop
+ .clone()
+ .send(())
+ .map_err(|_| error!("ratelimiter was already stopped"));
}
}
-type IssueTicket = Pin<
- Box<
- dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>>
- + Send
- + 'static,
- >,
->;
-
impl RemoteRatelimiter {
async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> {
// get list of dns responses
- let responses = dns_lookup::lookup_host(&self.config.ratelimiter_address)
- .unwrap()
+ let responses = dns_lookup::lookup_host(&self.config.ratelimiter_address)?
.into_iter()
- .map(|f| f.to_string());
+ .filter(|address| address.is_ipv4())
+ .map(|address| address.to_string());
let mut write = self.remotes.write().await;
// Task to update the ratelimiters in the background
tokio::spawn(async move {
loop {
+ debug!("refreshing");
+
+ match obj_clone.get_ratelimiters().await {
+ Ok(_) => {
+ debug!("refreshed ratelimiting servers")
+ }
+ Err(err) => {
+ error!("refreshing ratelimiting servers failed {}", err);
+ }
+ }
+
let sleep = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(sleep);
-
- debug!("refreshing");
- obj_clone.get_ratelimiters().await.unwrap();
tokio::select! {
() = &mut sleep => {
debug!("timer elapsed");
}
#[instrument(name = "ticket task")]
- pub fn ticket(&self, path: String) -> IssueTicket {
+ pub fn ticket(
+ &self,
+ path: String,
+ ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>> {
let remotes = self.remotes.clone();
- let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
Box::pin(
async move {
- // Get node managing this path
- let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
-
- // Buffers for the gRPC streaming channel.
- let (send, remote) = mpsc::channel(5);
- let (do_request, wait) = oneshot::channel();
- // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
- let stream = ReceiverStream::new(remote);
-
- let mut request = Request::new(stream);
-
- let span = debug_span!("remote request");
+ // Getting the node managing this path
+ let mut node = remotes
+ .write()
+ .instrument(trace_span!("acquiring ring lock"))
+ .await
+ .get(&path)
+ .and_then(|node| Some(node.clone()))
+ .ok_or_else(|| {
+ anyhow!(
+ "did not compute ratelimit because no ratelimiter nodes are detected"
+ )
+ })?;
+
+ // Initialize span for tracing (headers injection)
+ let span = info_span!("remote request");
let context = span.context();
+ let mut request = Request::new(BucketSubmitTicketRequest { path });
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()))
});
- // Start the grpc streaming
- let ticket = node.submit_ticket(request).await?;
-
- // First, send the request
- send.send(BucketSubmitTicketRequest {
- data: Some(Data::Path(path)),
- })
- .await?;
-
- // We continuously listen for events in the channel.
- let span = debug_span!("stream worker");
- tokio::spawn(
- async move {
- let span = debug_span!("waiting for ticket upstream");
- let message = ticket
- .into_inner()
- .message()
- .instrument(span)
- .await
- .unwrap()
- .unwrap();
-
- if message.accepted == 1 {
- debug!("request ticket was accepted");
- do_request.send(()).unwrap();
- let span = debug_span!("waiting for response headers");
- let headers = rx.instrument(span).await.unwrap();
-
- send.send(BucketSubmitTicketRequest {
- data: Some(Data::Headers(Headers {
- precise_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("time went backwards")
- .as_millis()
- as u64,
- headers,
- })),
- })
- .await
- .unwrap();
- }
- }
- .instrument(span),
- );
-
- // Wait for the message to be sent
- wait.await?;
+ // Requesting
+ node.submit_ticket(request)
+ .instrument(info_span!("waiting for ticket response"))
+ .await?;
- Ok(tx)
+ Ok(())
}
.instrument(Span::current()),
)
}
+
+ pub fn submit_headers(
+ &self,
+ path: String,
+ headers: HashMap<String, String>,
+ ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>> {
+ let remotes = self.remotes.clone();
+ Box::pin(async move {
+ let mut node = remotes
+ .write()
+ .instrument(trace_span!("acquiring ring lock"))
+ .await
+ .get(&path)
+ .and_then(|node| Some(node.clone()))
+ .ok_or_else(|| {
+ anyhow!("did not compute ratelimit because no ratelimiter nodes are detected")
+ })?;
+
+ let span = info_span!("remote request");
+ let context = span.context();
+ let time = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)?
+ .as_millis();
+ let mut request = Request::new(HeadersSubmitRequest {
+ path,
+ precise_time: time as u64,
+ headers,
+ });
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()))
+ });
+
+ node.submit_headers(request).await?;
+
+ Ok(())
+ })
+ }
}
use std::ops::Deref;
use std::ops::DerefMut;
use tonic::transport::Channel;
+use tracing::debug;
#[derive(Debug, Clone)]
pub struct VNode {
impl VNode {
pub async fn new(address: String, port: u16) -> Result<Self, tonic::transport::Error> {
- let client =
- RatelimiterClient::connect(format!("http://{}:{}", address.clone(), port)).await?;
+ let host = format!("http://{}:{}", address.clone(), port);
+ debug!("connecting to {}", host);
+ let client = RatelimiterClient::connect(host).await?;
Ok(VNode { client, address })
}
}
-unsafe impl Send for VNode {}
-
#[repr(transparent)]
#[derive(Default)]
pub struct HashRingWrapper(hashring::HashRing<VNode>);
pub struct Discord {
#[serde(deserialize_with = "deserialize_pk")]
pub public_key: PublicKey,
- pub client_id: u32,
}
#[derive(Debug, Deserialize, Clone, Default, Copy)]
use ed25519_dalek::{PublicKey, Signature, Verifier};
+#[inline]
pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool {
let mut slice: [u8; Signature::BYTE_SIZE] = [0; Signature::BYTE_SIZE];
let signature_result = hex::decode_to_slice(hex_signature, &mut slice);
--- /dev/null
+all-in-one.h
--- /dev/null
+package allinone
+
+/*
+#cgo LDFLAGS: -L../../../build/lib -lall_in_one -lz -lm
+#include "./all-in-one.h"
+#include "./error_handler.h"
+*/
+import "C"
+import (
+ "fmt"
+ "time"
+ "unsafe"
+
+ "github.com/Jeffail/gabs"
+ "github.com/alicebob/miniredis/v2"
+ "github.com/nats-io/nats-server/v2/server"
+)
+
+type AllInOne struct {
+ redis *miniredis.Miniredis
+ nats *server.Server
+ instance *C.AllInOneInstance
+}
+
+//export goErrorHandler
+func goErrorHandler(size C.int, start *C.char) {
+ dest := make([]byte, size)
+ copy(dest, (*(*[1024]byte)(unsafe.Pointer(start)))[:size:size])
+
+ println("Error from all in one runner: %s", string(dest))
+}
+
+func NewAllInOne() (*AllInOne, error) {
+ redis := miniredis.NewMiniRedis()
+ nats, err := server.NewServer(&server.Options{})
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &AllInOne{
+ redis: redis,
+ nats: nats,
+ }, nil
+}
+
+func (s *AllInOne) Start() error {
+ err := s.redis.Start()
+ if err != nil {
+ return err
+ }
+
+ go s.nats.Start()
+
+ if !s.nats.ReadyForConnections(5 * time.Second) {
+ return fmt.Errorf("nats server didn't start after 5 seconds, please check if there is another service listening on the same port as nats")
+ }
+ handler := C.ErrorHandler(C.allInOneErrorHandler)
+ // Set the error handler
+ C.set_error_handler(handler)
+ config := C.GoString(C.load_config())
+
+ json, _ := gabs.ParseJSON([]byte(config))
+ json.Set(fmt.Sprintf("redis://%s", s.redis.Addr()), "redis", "url")
+ json.Set("localhost", "nats", "host")
+ json.Set(1, "webhook", "discord", "client_id")
+
+ a := ""
+ a += ("Starting nova All-in-one!\n")
+ a += fmt.Sprintf(" * Rest proxy running on : http://%s\n", json.Path("rest.server.listening_adress").Data().(string))
+ a += fmt.Sprintf(" * Webhook server running on : http://%s\n", json.Path("webhook.server.listening_adress").Data().(string))
+ a += fmt.Sprintf(" * Ratelimiter server running on : grpc://%s\n", json.Path("ratelimiter.server.listening_adress").Data().(string))
+ a += (" * The gateway server should be running\n")
+ a += (" * The cache server should be running\n")
+ a += (" * Servers\n")
+ a += fmt.Sprintf(" * Running MiniREDIS on %s\n", s.redis.Addr())
+ a += fmt.Sprintf(" * Running NATS on %s\n", s.nats.ClientURL())
+ s.instance = C.create_instance(C.CString(json.String()))
+
+ print(a)
+
+ return nil
+}
+
+func (s *AllInOne) Stop() {
+ C.stop_instance(s.instance)
+}
--- /dev/null
+extern void goErrorHandler(int, char*);
+
+typedef void (*ErrorHandler)(int, char*);
+
+__attribute__((weak))
+void allInOneErrorHandler(int size, char* string) {
+ goErrorHandler(size, string);
+}
\ No newline at end of file
use serde::{de::DeserializeOwned, Deserialize};
use std::{env, ops::Deref};
use tracing::info;
+use anyhow::Result;
-use crate::error::GenericError;
#[derive(Debug, Deserialize, Clone)]
pub struct Settings<T: Clone + DeserializeOwned + Default> {
#[serde(skip_deserializing)]
}
impl<T: Clone + DeserializeOwned + Default> Settings<T> {
- pub fn new(service_name: &str) -> Result<Settings<T>, GenericError> {
+ pub fn new(service_name: &str) -> Result<Settings<T>> {
let mut builder = Config::builder();
builder = builder.add_source(File::with_name("config/default"));
+++ /dev/null
-use config::ConfigError;
-use std::{fmt::Debug, io};
-use thiserror::Error;
-
-#[derive(Debug, Error)]
-pub enum GenericError {
- #[error("invalid configuration")]
- InvalidConfiguration(#[from] ConfigError),
-
- #[error("invalid parameter `{0}`")]
- InvalidParameter(String),
-
- #[error("step `{0}` failed")]
- StepFailed(String),
-
- #[error("io error")]
- Io(#[from] io::Error),
-}
/// This crate is all the utilities shared by the nova rust projects
/// It includes logging, config and protocols.
pub mod config;
-pub mod error;
pub mod nats;
pub mod payloads;
pub mod redis;
syntax = "proto3";
+import "google/protobuf/empty.proto";
+
package nova.ratelimit.ratelimiter;
service Ratelimiter {
- rpc SubmitTicket(stream BucketSubmitTicketRequest) returns (stream BucketSubmitTicketResponse);
+ rpc SubmitTicket(BucketSubmitTicketRequest) returns (google.protobuf.Empty);
+ rpc SubmitHeaders(HeadersSubmitRequest) returns (google.protobuf.Empty);
}
message BucketSubmitTicketRequest {
- oneof data {
- string path = 1;
- Headers headers = 2;
- }
-
- message Headers {
- map<string, string> headers = 1;
- uint64 precise_time = 2;
- }
-
+ string path = 1;
}
-message BucketSubmitTicketResponse {
- int64 accepted = 1;
-}
+message HeadersSubmitRequest {
+ map<string, string> headers = 1;
+ uint64 precise_time = 2;
+ string path = 3;
+}
\ No newline at end of file