summaryrefslogtreecommitdiff
path: root/common/rust/src
diff options
context:
space:
mode:
Diffstat (limited to 'common/rust/src')
-rw-r--r--common/rust/src/config.rs7
-rw-r--r--common/rust/src/lib.rs4
-rw-r--r--common/rust/src/nats.rs62
-rw-r--r--common/rust/src/payloads.rs9
4 files changed, 77 insertions, 5 deletions
diff --git a/common/rust/src/config.rs b/common/rust/src/config.rs
index 6d8fb33..c158a21 100644
--- a/common/rust/src/config.rs
+++ b/common/rust/src/config.rs
@@ -11,6 +11,7 @@ pub struct Settings<T> {
#[serde(skip_deserializing)]
pub config: T,
pub monitoring: crate::monitoring::MonitoringConfiguration,
+ pub nats: crate::nats::NatsConfiguration,
}
impl<T> Settings<T> where T: Deserialize<'static> + std::default::Default + Clone {
@@ -25,14 +26,12 @@ impl<T> Settings<T> where T: Deserialize<'static> + std::default::Default + Clon
default.merge(File::with_name("config/local").required(false))?;
// we can configure each component using environment variables
- default.merge(Environment::with_prefix(&format!("NOVA_{}", service_name)))?;
+ default.merge(Environment::with_prefix("NOVA").separator("_"))?;
let mut config: Settings<T> = default.clone().try_into().unwrap();
// try to load the config
config.config = default.get::<T>(&service_name).unwrap();
-
- // setup the logger
- pretty_env_logger::init_custom_env(&format!("NOVA_{}_LOG", service_name));
+ pretty_env_logger::init();
// start the monitoring system if needed
crate::monitoring::start_monitoring(&config.monitoring);
diff --git a/common/rust/src/lib.rs b/common/rust/src/lib.rs
index 5122334..24e16ec 100644
--- a/common/rust/src/lib.rs
+++ b/common/rust/src/lib.rs
@@ -1,4 +1,6 @@
/// This crate contains shared code in all the rust projects
/// It contains utilities such as monitoring, logging and more.
pub mod config;
-pub mod monitoring; \ No newline at end of file
+pub mod monitoring;
+pub mod nats;
+pub mod payloads; \ No newline at end of file
diff --git a/common/rust/src/nats.rs b/common/rust/src/nats.rs
new file mode 100644
index 0000000..59b480c
--- /dev/null
+++ b/common/rust/src/nats.rs
@@ -0,0 +1,62 @@
+use nats::{Options, Connection};
+use serde::Deserialize;
+
+#[derive(Clone, Debug, Deserialize)]
+struct NatsConfigurationClientCert {
+ cert: String,
+ key: String
+}
+#[derive(Clone, Debug, Deserialize)]
+struct NatsConfigurationTls {
+ mtu: Option<usize>,
+}
+
+#[derive(Clone, Debug, Deserialize)]
+pub struct NatsConfiguration {
+ client_cert: Option<NatsConfigurationClientCert>,
+ root_cert: Option<Vec<String>>,
+ jetstream_api_prefix: Option<String>,
+ max_reconnects: Option<usize>,
+ reconnect_buffer_size: Option<usize>,
+ tls: Option<NatsConfigurationTls>,
+ client_name: Option<String>,
+ tls_required: Option<bool>,
+ host: String,
+}
+
+///
+impl Into<Connection> for NatsConfiguration {
+ fn into(self) -> Connection {
+ let mut options = Options::new();
+
+ if let Some(client_cert) = self.client_cert {
+ options = options.client_cert(client_cert.cert, client_cert.key);
+ }
+
+ if let Some(root_certs) = self.root_cert {
+ for root_cert in root_certs {
+ options = options.add_root_certificate(root_cert);
+ }
+ }
+
+ if let Some(jetstream_api_prefix) = self.jetstream_api_prefix {
+ options = options.jetstream_api_prefix(jetstream_api_prefix)
+ }
+
+ options = options.max_reconnects(self.max_reconnects);
+ options = options.no_echo();
+ options = options.reconnect_buffer_size(self.reconnect_buffer_size.unwrap_or(64 * 1024));
+ options = options.tls_required(self.tls_required.unwrap_or(false));
+ options = options.with_name(&self.client_name.unwrap_or("Nova".to_string()));
+
+
+ if let Some(tls) = self.tls {
+ let mut config = nats::rustls::ClientConfig::new();
+ config.set_mtu(&tls.mtu);
+ // todo: more options?
+ options = options.tls_client_config(config);
+ }
+
+ options.connect(&self.host).unwrap()
+ }
+}
diff --git a/common/rust/src/payloads.rs b/common/rust/src/payloads.rs
new file mode 100644
index 0000000..1957077
--- /dev/null
+++ b/common/rust/src/payloads.rs
@@ -0,0 +1,9 @@
+use serde::{Deserialize, Serialize};
+
+/// Payload send to the nova cache queues
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))]
+pub struct CachePayload<T> {
+ pub tracing: (),
+ pub data: T
+}