diff options
Diffstat (limited to 'libs/all_in_one')
| -rw-r--r-- | libs/all_in_one/Cargo.toml | 38 | ||||
| -rw-r--r-- | libs/all_in_one/build.rs | 27 | ||||
| -rw-r--r-- | libs/all_in_one/src/errors.rs | 53 | ||||
| -rw-r--r-- | libs/all_in_one/src/ffi.rs | 143 | ||||
| -rw-r--r-- | libs/all_in_one/src/lib.rs | 14 | ||||
| -rw-r--r-- | libs/all_in_one/src/utils.rs | 82 |
6 files changed, 357 insertions, 0 deletions
diff --git a/libs/all_in_one/Cargo.toml b/libs/all_in_one/Cargo.toml new file mode 100644 index 0000000..b8c5134 --- /dev/null +++ b/libs/all_in_one/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "all_in_one" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +libc = "0.2.139" +leash = { path = "../../libs/leash" } +shared = { path = "../../libs/shared" } + +cache = { path = "../../exes/cache" } +gateway = { path = "../../exes/gateway" } +ratelimit = { path = "../../exes/ratelimit" } +rest = { path = "../../exes/rest" } +webhook = { path = "../../exes/webhook" } +ctrlc = "3.2.4" + +tokio = { version = "1.23.1", features = ["rt"] } +serde = "1.0.152" +serde_json = "1.0.91" +anyhow = { version = "1.0.68", features = ["backtrace"] } + +tracing = "0.1.37" + +config = "0.13.3" + +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } +tracing-opentelemetry = "0.18.0" +opentelemetry = { version ="0.18.0", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.11.0" } + +[lib] +crate-type = ["staticlib", "rlib"] + +[build-dependencies] +cbindgen = "0.24.3"
\ No newline at end of file diff --git a/libs/all_in_one/build.rs b/libs/all_in_one/build.rs new file mode 100644 index 0000000..25a4b99 --- /dev/null +++ b/libs/all_in_one/build.rs @@ -0,0 +1,27 @@ +extern crate cbindgen; + +use cbindgen::{Config, Language}; +use std::env; +use std::error::Error; +use std::path::PathBuf; + +/// Generates the headers for the go program. +fn main() -> Result<(), Box<dyn Error>> { + let crate_dir = env::var("CARGO_MANIFEST_DIR")?; + let package_name = env::var("CARGO_PKG_NAME")?; + + // We export the header file to build/{package_name}.h + let output_file = PathBuf::from("../../internal/pkg/all-in-one") + .join(format!("{}.h", package_name)) + .display() + .to_string(); + + let config = Config { + language: Language::C, + ..Default::default() + }; + + cbindgen::generate_with_config(crate_dir, config)?.write_to_file(output_file); + + Ok(()) +} diff --git a/libs/all_in_one/src/errors.rs b/libs/all_in_one/src/errors.rs new file mode 100644 index 0000000..d2c7444 --- /dev/null +++ b/libs/all_in_one/src/errors.rs @@ -0,0 +1,53 @@ +use std::cell::RefCell; + +use anyhow::Result; +use tracing::error; + +thread_local! { + pub static ERROR_HANDLER: std::cell::RefCell<Option<unsafe extern "C" fn(libc::c_int, *mut libc::c_char)>> = RefCell::new(None); +} + +/// Update the most recent error, clearing whatever may have been there before. +#[must_use] pub fn stacktrace(err: &anyhow::Error) -> String { + format!("{err}") +} + +pub fn wrap_result<T, F>(func: F) -> Option<T> +where + F: Fn() -> Result<T>, +{ + let result = func(); + + match result { + Ok(ok) => Some(ok), + Err(error) => { + // Call the handler + handle_error(&error); + None + } + } +} + +/// # Panics +/// Panics if the stacktrace size is > than an i32 +pub fn handle_error(error: &anyhow::Error) { + ERROR_HANDLER.with(|val| { + let mut stacktrace = stacktrace(error); + + error!("Error emitted: {}", stacktrace); + if let Some(func) = *val.borrow() { + // Call the error handler + unsafe { + func( + (stacktrace.len() + 1).try_into().unwrap(), + stacktrace.as_mut_ptr().cast::<libc::c_char>(), + ); + } + } + }); +} + +#[cfg(test)] +mod tests { + // todo +} diff --git a/libs/all_in_one/src/ffi.rs b/libs/all_in_one/src/ffi.rs new file mode 100644 index 0000000..b4c853d --- /dev/null +++ b/libs/all_in_one/src/ffi.rs @@ -0,0 +1,143 @@ +#![allow(clippy::missing_safety_doc)] +use std::{ + ffi::{c_char, c_int, CString}, + mem::take, + ptr, + str::FromStr, + time::Duration, +}; + +use gateway::GatewayServer; +use opentelemetry::{global::set_text_map_propagator, sdk::propagation::TraceContextPropagator}; +use ratelimit::RatelimiterServerComponent; +use rest::ReverseProxyServer; +use tokio::{runtime::Runtime, sync::mpsc}; +use tracing::{debug, error}; +use tracing_subscriber::{ + filter::Directive, fmt, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, + EnvFilter, +}; +use webhook::WebhookServer; + +use crate::{ + errors::{handle_error, wrap_result, ERROR_HANDLER}, + utils::{load_config_file, start_component, AllInOneInstance}, +}; + +#[no_mangle] +pub unsafe extern "C" fn set_error_handler(func: unsafe extern "C" fn(c_int, *mut c_char)) { + debug!("Setting error handler"); + ERROR_HANDLER.with(|prev| { + *prev.borrow_mut() = Some(func); + }); +} + +#[no_mangle] +/// Loads the config json using the nova shared config loader +pub extern "C" fn load_config() -> *mut c_char { + wrap_result(move || { + let config = serde_json::to_string(&load_config_file()?)?; + let c_str_song = CString::new(config)?; + Ok(c_str_song.into_raw()) + }) + .or(Some(ptr::null::<c_char>() as *mut c_char)) + .expect("something has gone terribly wrong") +} + +#[no_mangle] +pub unsafe extern "C" fn stop_instance(instance: *mut AllInOneInstance) { + wrap_result(move || { + let mut instance = Box::from_raw(instance); + let handles = take(&mut instance.handles); + instance.runtime.block_on(async move { + for (name, sender, join) in handles { + debug!("Halting component {}", name); + let _ = sender + .send(()) + .map_err(|_| error!("Component {} is not online", name)); + match join.await { + Ok(_) => {} + Err(error) => error!("Task for component {} panic'ed {}", name, error), + }; + debug!("Component {} halted", name); + } + }); + + instance.runtime.shutdown_timeout(Duration::from_secs(5)); + + Ok(()) + }); +} + +/// # Panics +/// Panics if an incorrect `RUST_LOG` variables is specified. +#[no_mangle] +pub unsafe extern "C" fn create_instance(config: *mut c_char) -> *mut AllInOneInstance { + // Returning a null pointer (unaligned) is expected. + #[allow(clippy::cast_ptr_alignment)] + wrap_result(move || { + let value = CString::from_raw(config); + let json = value.to_str()?; + + // Main stop signal for this instance + let (error_sender, mut errors) = mpsc::channel(50); + let mut handles = vec![]; + + let runtime = Runtime::new()?; + + // Setup the tracing system + set_text_map_propagator(TraceContextPropagator::new()); + tracing_subscriber::registry() + .with(fmt::layer()) + .with( + EnvFilter::builder() + .with_default_directive(Directive::from_str("info").expect("")) + .from_env() + .unwrap(), + ) + .init(); + + // Error handling task + runtime.spawn(async move { + while let Some(error) = errors.recv().await { + handle_error(&error); + } + }); + + handles.push(start_component::<GatewayServer>( + json, + error_sender.clone(), + &runtime, + )?); + + std::thread::sleep(Duration::from_secs(1)); + + handles.push(start_component::<RatelimiterServerComponent>( + json, + error_sender.clone(), + &runtime, + )?); + + std::thread::sleep(Duration::from_secs(1)); + + handles.push(start_component::<ReverseProxyServer>( + json, + error_sender.clone(), + &runtime, + )?); + + std::thread::sleep(Duration::from_secs(1)); + + handles.push(start_component::<WebhookServer>( + json, + error_sender, + &runtime, + )?); + + let all_in_one = Box::into_raw(Box::new(AllInOneInstance { runtime, handles })); + + Ok(all_in_one) + }) + .or(Some(ptr::null::<AllInOneInstance>() as *mut AllInOneInstance)) + .expect("something has gone terribly wrong") +} diff --git a/libs/all_in_one/src/lib.rs b/libs/all_in_one/src/lib.rs new file mode 100644 index 0000000..493bf46 --- /dev/null +++ b/libs/all_in_one/src/lib.rs @@ -0,0 +1,14 @@ +#![deny( + clippy::all, + clippy::correctness, + clippy::suspicious, + clippy::style, + clippy::complexity, + clippy::perf, + clippy::pedantic, + clippy::nursery, +)] + +pub mod errors; +pub mod ffi; +pub mod utils; diff --git a/libs/all_in_one/src/utils.rs b/libs/all_in_one/src/utils.rs new file mode 100644 index 0000000..159d98d --- /dev/null +++ b/libs/all_in_one/src/utils.rs @@ -0,0 +1,82 @@ +use anyhow::Result; +use config::{Config, Environment, File}; +use leash::Component; +use serde::de::DeserializeOwned; +use serde_json::Value; +use shared::config::Settings; +use tokio::{ + runtime::Runtime, + sync::{mpsc, oneshot::Sender}, + task::JoinHandle, +}; +use tracing::{ + debug, + log::{error, info}, +}; + +/// Represents a all in one instance +pub struct AllInOneInstance { + pub runtime: Runtime, + pub(crate) handles: Vec<(&'static str, Sender<()>, JoinHandle<()>)>, +} + +/// Loads the settings from a component using a string +fn load_settings_for<T: Default + DeserializeOwned + Clone>( + settings: &str, + name: &str, +) -> Result<Settings<T>> { + let value: Value = serde_json::from_str(settings)?; + let section: T = serde_json::from_value(value.get(name).unwrap().clone())?; + let mut settings: Settings<T> = serde_json::from_value(value)?; + settings.config = section; + + Ok(settings) +} + +pub(crate) fn start_component<T: Component>( + json: &str, + error_sender: mpsc::Sender<anyhow::Error>, + runtime: &Runtime, +) -> Result<(&'static str, Sender<()>, JoinHandle<()>)> { + let name = T::SERVICE_NAME; + let instance = T::new(); + + // We setup stop signals + let (stop, signal) = tokio::sync::oneshot::channel(); + let settings = load_settings_for(json, name)?; + + let handle = runtime.spawn(async move { + debug!("starting component {}", name); + match instance.start(settings, signal).await { + Ok(_) => info!("Component {} gracefully exited", name), + Err(error) => { + error!("Component {} exited with error {}", name, error); + error_sender + .send(error) + .await + .expect("Couldn't send the error notification to the error mpsc"); + } + } + }); + + Ok((name, stop, handle)) +} + +pub(crate) fn load_config_file() -> Result<Value> { + let mut builder = Config::builder(); + + builder = builder.add_source(File::with_name("config/default")); + let mode = std::env::var("ENV").unwrap_or_else(|_| "development".into()); + info!("Configuration Environment: {}", mode); + + builder = builder.add_source(File::with_name(&format!("config/{mode}")).required(false)); + builder = builder.add_source(File::with_name("config/local").required(false)); + + let env = Environment::with_prefix("NOVA").separator("__"); + // we can configure each component using environment variables + builder = builder.add_source(env); + + let config: Value = builder.build()?.try_deserialize()?; + + Ok(config) +} |
