diff options
| author | Matthieu <20992787+MatthieuCoder@users.noreply.github.com> | 2021-09-26 01:24:34 +0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-26 01:24:34 +0400 |
| commit | fe11cf23da7e996613b1d8df503c2a085ac40d31 (patch) | |
| tree | e08ed094a63b14cc79975c1f11492a477970f0d3 /common/rust/src | |
| parent | f2e6047c21b3ee814670b17e5901d12ac52a3508 (diff) | |
| parent | 4ad3510c0552aa8b65866590873c7b13bc2d5243 (diff) | |
Merge pull request #6 from discordnova/webhook-receiver
Webhook receiver
Diffstat (limited to 'common/rust/src')
| -rw-r--r-- | common/rust/src/config.rs | 40 | ||||
| -rw-r--r-- | common/rust/src/error.rs | 12 | ||||
| -rw-r--r-- | common/rust/src/lib.rs | 7 | ||||
| -rw-r--r-- | common/rust/src/monitoring.rs | 60 | ||||
| -rw-r--r-- | common/rust/src/nats.rs | 62 | ||||
| -rw-r--r-- | common/rust/src/payloads.rs | 15 |
6 files changed, 196 insertions, 0 deletions
diff --git a/common/rust/src/config.rs b/common/rust/src/config.rs new file mode 100644 index 0000000..c158a21 --- /dev/null +++ b/common/rust/src/config.rs @@ -0,0 +1,40 @@ +use std::env; + +use config::{Config, ConfigError, Environment, File}; +use log::info; +use serde::{Deserialize}; + + +#[derive(Debug, Deserialize, Clone)] +#[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))] +pub struct Settings<T> { + #[serde(skip_deserializing)] + pub config: T, + pub monitoring: crate::monitoring::MonitoringConfiguration, + pub nats: crate::nats::NatsConfiguration, +} + +impl<T> Settings<T> where T: Deserialize<'static> + std::default::Default + Clone { + pub fn new(service_name: &str) -> Result<Settings<T>, ConfigError> { + let mut default = Config::default(); + // this file my be shared with all the components + default.merge(File::with_name("config/default"))?; + let mode = env::var("ENV").unwrap_or_else(|_| "development".into()); + info!("Configuration Environment: {}", mode); + + default.merge(File::with_name(&format!("config/{}", mode)).required(false))?; + default.merge(File::with_name("config/local").required(false))?; + + // we can configure each component using environment variables + default.merge(Environment::with_prefix("NOVA").separator("_"))?; + let mut config: Settings<T> = default.clone().try_into().unwrap(); + + // try to load the config + config.config = default.get::<T>(&service_name).unwrap(); + pretty_env_logger::init(); + + // start the monitoring system if needed + crate::monitoring::start_monitoring(&config.monitoring); + Ok(config) + } +} diff --git a/common/rust/src/error.rs b/common/rust/src/error.rs new file mode 100644 index 0000000..b602940 --- /dev/null +++ b/common/rust/src/error.rs @@ -0,0 +1,12 @@ +use std::fmt; + +#[derive(Debug)] +pub struct NovaError { + pub message: String, +} + +impl fmt::Display for NovaError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "An error occured wihind the nova system: {}", self.message) // user-facing output + } +} diff --git a/common/rust/src/lib.rs b/common/rust/src/lib.rs new file mode 100644 index 0000000..943d7cc --- /dev/null +++ b/common/rust/src/lib.rs @@ -0,0 +1,7 @@ +/// This crate contains shared code in all the rust projects +/// It contains utilities such as monitoring, logging and more. +pub mod config; +pub mod monitoring; +pub mod nats; +pub mod payloads; +pub mod error;
\ No newline at end of file diff --git a/common/rust/src/monitoring.rs b/common/rust/src/monitoring.rs new file mode 100644 index 0000000..ded1d95 --- /dev/null +++ b/common/rust/src/monitoring.rs @@ -0,0 +1,60 @@ +use hyper::{ + Response, Body, Request, Server, + header::{CONTENT_TYPE}, + service::{make_service_fn, service_fn}, +}; +use std::net::ToSocketAddrs; +use prometheus::{Encoder, TextEncoder}; +use log::{info,error}; +use serde::Deserialize; + +#[derive(Clone, Debug, Deserialize)] +/// Options for the monitoring service +pub struct MonitoringConfiguration { + enabled: bool, + address: Option<String>, + port: Option<i32>, +} + +/// Handler for the hyper http server +async fn serve_metrics(_request: Request<Body>) -> Result<Response<Body>, hyper::Error> { + let encoder = TextEncoder::new(); + let metrics = prometheus::gather(); + + let mut buffer = vec![]; + encoder.encode(&metrics, &mut buffer).unwrap(); + + let response = Response::builder() + .status(200) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap(); + Ok(response) +} + +/// Starts a monitoring server on the requested port +pub fn start_monitoring(configuration: &MonitoringConfiguration) { + let config = configuration.clone(); + tokio::task::spawn(async move { + if config.enabled { + let address = format!("{}:{}", + config.address.expect("a listening address must be specified for the metrics server"), + config.port.expect("a listening port must be specified for the metrics server") + ); + info!("Starting monitoring server on {}", address); + + let listen_address = address + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + let server = Server::bind(&listen_address).serve(make_service_fn(|_| async { + Ok::<_, hyper::Error>(service_fn(serve_metrics)) + })); + + if let Err(e) = server.await { + error!("failed to start the monitoring server {}", e); + } + } + }); +}
\ No newline at end of file diff --git a/common/rust/src/nats.rs b/common/rust/src/nats.rs new file mode 100644 index 0000000..59b480c --- /dev/null +++ b/common/rust/src/nats.rs @@ -0,0 +1,62 @@ +use nats::{Options, Connection}; +use serde::Deserialize; + +#[derive(Clone, Debug, Deserialize)] +struct NatsConfigurationClientCert { + cert: String, + key: String +} +#[derive(Clone, Debug, Deserialize)] +struct NatsConfigurationTls { + mtu: Option<usize>, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct NatsConfiguration { + client_cert: Option<NatsConfigurationClientCert>, + root_cert: Option<Vec<String>>, + jetstream_api_prefix: Option<String>, + max_reconnects: Option<usize>, + reconnect_buffer_size: Option<usize>, + tls: Option<NatsConfigurationTls>, + client_name: Option<String>, + tls_required: Option<bool>, + host: String, +} + +/// +impl Into<Connection> for NatsConfiguration { + fn into(self) -> Connection { + let mut options = Options::new(); + + if let Some(client_cert) = self.client_cert { + options = options.client_cert(client_cert.cert, client_cert.key); + } + + if let Some(root_certs) = self.root_cert { + for root_cert in root_certs { + options = options.add_root_certificate(root_cert); + } + } + + if let Some(jetstream_api_prefix) = self.jetstream_api_prefix { + options = options.jetstream_api_prefix(jetstream_api_prefix) + } + + options = options.max_reconnects(self.max_reconnects); + options = options.no_echo(); + options = options.reconnect_buffer_size(self.reconnect_buffer_size.unwrap_or(64 * 1024)); + options = options.tls_required(self.tls_required.unwrap_or(false)); + options = options.with_name(&self.client_name.unwrap_or("Nova".to_string())); + + + if let Some(tls) = self.tls { + let mut config = nats::rustls::ClientConfig::new(); + config.set_mtu(&tls.mtu); + // todo: more options? + options = options.tls_client_config(config); + } + + options.connect(&self.host).unwrap() + } +} diff --git a/common/rust/src/payloads.rs b/common/rust/src/payloads.rs new file mode 100644 index 0000000..6eb35c8 --- /dev/null +++ b/common/rust/src/payloads.rs @@ -0,0 +1,15 @@ +use serde::{Deserialize, Serialize}; + +/// Payload send to the nova cache queues +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))] +pub struct CachePayload<T> { + pub tracing: Tracing, + pub data: T +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Tracing { + pub node_id: String, + pub span: Option<String> +}
\ No newline at end of file |
