diff options
| author | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-03 00:14:27 +0400 |
|---|---|---|
| committer | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-03 00:14:27 +0400 |
| commit | 91a27342dbee8ca0478f862df93bf502337f4c6e (patch) | |
| tree | af5f4818d300934d8c9ea14896fe756fea13d578 /exes | |
| parent | c3e47ff0b1b06ff26830e78b7e37e212d1e47200 (diff) | |
add all in one binary
Diffstat (limited to 'exes')
| -rw-r--r-- | exes/all/.gitignore | 3 | ||||
| -rw-r--r-- | exes/all/Cargo.toml | 31 | ||||
| -rw-r--r-- | exes/all/Makefile | 14 | ||||
| -rw-r--r-- | exes/all/build.rs | 26 | ||||
| -rw-r--r-- | exes/all/build/.gitkeep | 0 | ||||
| -rw-r--r-- | exes/all/main.go | 72 | ||||
| -rw-r--r-- | exes/all/src/lib.rs | 137 | ||||
| -rw-r--r-- | exes/cache/src/main.rs | 2 | ||||
| -rw-r--r-- | exes/gateway/src/lib.rs | 87 | ||||
| -rw-r--r-- | exes/gateway/src/main.rs | 89 | ||||
| -rw-r--r-- | exes/ratelimit/Cargo.toml | 2 | ||||
| -rw-r--r-- | exes/ratelimit/src/lib.rs | 45 | ||||
| -rw-r--r-- | exes/ratelimit/src/main.rs | 47 | ||||
| -rw-r--r-- | exes/rest/src/config.rs | 2 | ||||
| -rw-r--r-- | exes/rest/src/lib.rs | 67 | ||||
| -rw-r--r-- | exes/rest/src/main.rs | 69 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/mod.rs | 6 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/remote_hashring.rs | 2 | ||||
| -rw-r--r-- | exes/webhook/src/config.rs | 2 | ||||
| -rw-r--r-- | exes/webhook/src/lib.rs | 54 | ||||
| -rw-r--r-- | exes/webhook/src/main.rs | 54 |
21 files changed, 552 insertions, 259 deletions
diff --git a/exes/all/.gitignore b/exes/all/.gitignore new file mode 100644 index 0000000..27531c7 --- /dev/null +++ b/exes/all/.gitignore @@ -0,0 +1,3 @@ +build/* +!build/.gitkeep +config/
\ No newline at end of file diff --git a/exes/all/Cargo.toml b/exes/all/Cargo.toml new file mode 100644 index 0000000..d6976ef --- /dev/null +++ b/exes/all/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "all" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +libc = "0.2.139" +leash = { path = "../../libs/leash" } +shared = { path = "../../libs/shared" } + +cache = { path = "../cache" } +gateway = { path = "../gateway" } +ratelimit = { path = "../ratelimit" } +rest = { path = "../rest" } +webhook = { path = "../webhook" } + +tokio = { version = "1.23.0", features = ["full"] } +serde = "1.0.152" +serde_json = "1.0.91" +anyhow = "1.0.68" + +config = "0.13.3" +pretty_env_logger = "0.4.0" + +[lib] +crate-type = ["cdylib"] + +[build-dependencies] +cbindgen = "0.24.3"
\ No newline at end of file diff --git a/exes/all/Makefile b/exes/all/Makefile new file mode 100644 index 0000000..07c02d9 --- /dev/null +++ b/exes/all/Makefile @@ -0,0 +1,14 @@ +clean: + rm ./build/libhello.so + +library: + cargo build --release + +build: library + cp ../../target/release/liball.so ./build + go build -ldflags="-r build" -o build/all + +all: library build + +run: all + ./build/all diff --git a/exes/all/build.rs b/exes/all/build.rs new file mode 100644 index 0000000..e779d53 --- /dev/null +++ b/exes/all/build.rs @@ -0,0 +1,26 @@ +extern crate cbindgen; + +use std::env; +use std::path::PathBuf; +use cbindgen::{Config, Language}; + + +fn main() { + let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); + + let package_name = env::var("CARGO_PKG_NAME").unwrap(); + let output_file = PathBuf::from("./build") + .join(format!("{}.h", package_name)) + .display() + .to_string(); + + let config = Config { + namespace: Some(String::from("ffi")), + language: Language::C, + ..Default::default() + }; + + cbindgen::generate_with_config(&crate_dir, config) + .unwrap() + .write_to_file(&output_file); +} diff --git a/exes/all/build/.gitkeep b/exes/all/build/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/exes/all/build/.gitkeep diff --git a/exes/all/main.go b/exes/all/main.go new file mode 100644 index 0000000..0d15450 --- /dev/null +++ b/exes/all/main.go @@ -0,0 +1,72 @@ +package main + +/* +#cgo LDFLAGS: -Lbuild -lall +#include "./build/all.h" +*/ +import "C" + +import ( + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/Jeffail/gabs" + "github.com/alicebob/miniredis/v2" + + server "github.com/nats-io/nats-server/v2/server" +) + +func main() { + // Intialise les logs de la librarie Rust + C.init_logs() + // Charge la configuration + str := C.GoString(C.load_config()) + + // Démarre une instance MiniRedis + mr := miniredis.NewMiniRedis() + err := mr.Start() + + if err != nil { + panic(err) + } + + // Démarre un serveur Nats + opts := &server.Options{} + opts.Host = "0.0.0.0" + ns, err := server.NewServer(opts) + + if err != nil { + panic(err) + } + + go ns.Start() + + if !ns.ReadyForConnections(4 * time.Second) { + panic("not ready for connection") + } + + // Edite le json de configuration donné + // Et injecte la configuration des servers Nats et MiniRedis + json, _ := gabs.ParseJSON([]byte(str)) + json.Set(fmt.Sprintf("redis://%s", mr.Addr()), "redis", "url") + json.Set("localhost", "nats", "host") + json.Set(1, "webhook", "discord", "client_id") + + // Démarre une instance de nova + instance := C.start_instance(C.CString(json.String())) + + // Wait for a SIGINT + c := make(chan os.Signal, 1) + signal.Notify(c, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + <-c + + println("Arret de nova all in one") + C.stop_instance(instance) +} diff --git a/exes/all/src/lib.rs b/exes/all/src/lib.rs new file mode 100644 index 0000000..f6b527d --- /dev/null +++ b/exes/all/src/lib.rs @@ -0,0 +1,137 @@ +extern crate libc; +use anyhow::Result; +use config::{Config, Environment, File}; +use gateway::GatewayServer; +use leash::Component; +use ratelimit::RatelimiterServerComponent; +use rest::ReverseProxyServer; +use serde::de::DeserializeOwned; +use serde_json::Value; +use shared::{config::Settings, log::info}; +use std::{ + env, + ffi::{CStr, CString}, + time::Duration, +}; +use tokio::{ + runtime::Runtime, + sync::oneshot::{self, Sender}, + task::JoinHandle, +}; +use webhook::WebhookServer; + +pub struct AllInOneInstance { + pub stop: Sender<Sender<()>>, + pub runtime: Runtime, +} + +fn load_settings_for<T: Default + DeserializeOwned + Clone>( + settings: &str, + name: &str, +) -> Result<Settings<T>> { + let value: Value = serde_json::from_str(settings)?; + let section: T = serde_json::from_value(value.get(name).unwrap().to_owned())?; + let mut settings: Settings<T> = serde_json::from_value(value)?; + settings.config = section; + + Ok(settings) +} + +// Start a component +async fn start_component<T: Component>( + settings: String, + aio: &mut Vec<Sender<()>>, +) -> JoinHandle<()> { + let name = T::SERVICE_NAME; + let instance = T::new(); + + let (stop, signal) = oneshot::channel(); + + aio.push(stop); + + tokio::spawn(async move { + let config = load_settings_for::<<T as Component>::Config>(&settings, name).unwrap(); + instance.start(config, signal).await.unwrap(); + }) +} + +#[no_mangle] +/// Loads the config json using the nova shared config loader +pub extern "C" fn load_config() -> *const libc::c_char { + let mut builder = Config::builder(); + + builder = builder.add_source(File::with_name("config/default")); + let mode = env::var("ENV").unwrap_or_else(|_| "development".into()); + info!("Configuration Environment: {}", mode); + + builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false)); + builder = builder.add_source(File::with_name("config/local").required(false)); + + let env = Environment::with_prefix("NOVA").separator("__"); + // we can configure each component using environment variables + builder = builder.add_source(env); + + let config: Value = builder.build().unwrap().try_deserialize().unwrap(); + let s = serde_json::to_string(&config).unwrap(); + + let c_str_song = CString::new(s).unwrap(); + c_str_song.into_raw() +} + +#[no_mangle] +/// Initialise les logs des composants de nova +/// Utilise la crate `pretty_log_env` +pub extern "C" fn init_logs() { + pretty_env_logger::init(); +} + +#[no_mangle] +/// Stops a nova instance +pub unsafe extern "C" fn stop_instance(instance: *mut AllInOneInstance) { + let instance = Box::from_raw(instance); + let (tell_ready, ready) = oneshot::channel(); + instance.stop.send(tell_ready).unwrap(); + ready.blocking_recv().unwrap(); + instance.runtime.shutdown_timeout(Duration::from_secs(5)); +} + +#[no_mangle] +/// Initialized a new nova instance and an async runtime (tokio reactor) +/// Dont forget to stop this instance using `stop_instance` +pub extern "C" fn start_instance(config: *const libc::c_char) -> *mut AllInOneInstance { + let buf_name = unsafe { CStr::from_ptr(config).to_bytes() }; + let settings = String::from_utf8(buf_name.to_vec()).unwrap(); + let (stop, trigger_stop) = oneshot::channel(); + + // Initialize a tokio runtime + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + // Start the gateway server + + let mut aio = vec![]; + let mut handles = vec![]; + + // Start components + handles.push(start_component::<GatewayServer>(settings.clone(), &mut aio).await); + handles + .push(start_component::<RatelimiterServerComponent>(settings.clone(), &mut aio).await); + handles.push(start_component::<ReverseProxyServer>(settings.clone(), &mut aio).await); + handles.push(start_component::<WebhookServer>(settings.clone(), &mut aio).await); + + // wait for exit + let done: Sender<()> = trigger_stop.await.unwrap(); + + // Tell all the threads to stop. + while let Some(stop_signal) = aio.pop() { + stop_signal.send(()).unwrap(); + } + + // Wait for all the threads to finish. + while let Some(handle) = handles.pop() { + handle.await.unwrap(); + } + + done.send(()).unwrap(); + }); + Box::into_raw(Box::new(AllInOneInstance { stop, runtime: rt })) +} diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs index 312f960..5240a6a 100644 --- a/exes/cache/src/main.rs +++ b/exes/cache/src/main.rs @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap(); info!("loaded configuration: {:?}", settings); let nats = - Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats).await?; + Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats).await?; // let redis: redis::Client = settings.redis.into(); let mut cache = Cache::default(); diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs new file mode 100644 index 0000000..d7a4cee --- /dev/null +++ b/exes/gateway/src/lib.rs @@ -0,0 +1,87 @@ +use config::GatewayConfig; +use leash::{AnyhowResultFuture, Component}; +use shared::{ + config::Settings, + log::{debug, info}, + nats_crate::Client, + payloads::{CachePayload, DispatchEventTagged, Tracing}, +}; +use std::{convert::TryFrom, pin::Pin}; +use tokio::sync::oneshot; +use twilight_gateway::{Event, Shard}; +pub mod config; +use futures::FutureExt; +use futures::{select, Future, StreamExt}; +use twilight_model::gateway::event::DispatchEvent; + +pub struct GatewayServer {} +impl Component for GatewayServer { + type Config = GatewayConfig; + const SERVICE_NAME: &'static str = "gateway"; + + fn start( + &self, + settings: Settings<Self::Config>, + stop: oneshot::Receiver<()>, + ) -> AnyhowResultFuture<()> { + Box::pin(async move { + let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) + .shard(settings.shard, settings.shard_total)? + .build(); + + let nats = Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into( + settings.nats, + ) + .await?; + + shard.start().await?; + + let mut stop = stop.fuse(); + loop { + select! { + event = events.next().fuse() => { + if let Some(event) = event { + match event { + Event::Ready(ready) => { + info!("Logged in as {}", ready.user.name); + } + + _ => { + let name = event.kind().name(); + if let Ok(dispatch_event) = DispatchEvent::try_from(event) { + let data = CachePayload { + tracing: Tracing { + node_id: "".to_string(), + span: None, + }, + data: DispatchEventTagged { + data: dispatch_event, + }, + }; + let value = serde_json::to_string(&data)?; + debug!("nats send: {}", value); + let bytes = bytes::Bytes::from(value); + nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) + .await?; + } + } + } + } else { + break + } + }, + _ = stop => break + }; + } + + info!("stopping shard..."); + shard.shutdown(); + + Ok(()) + }) + } + + fn new() -> Self { + Self {} + } +} diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index f2a4f93..2e18f9c 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -1,89 +1,4 @@ -use config::GatewayConfig; -use leash::{ignite, AnyhowResultFuture, Component}; -use shared::{ - config::Settings, - log::{debug, info}, - nats_crate::Client, - payloads::{CachePayload, DispatchEventTagged, Tracing}, -}; -use tokio::sync::oneshot; -use std::{convert::TryFrom, pin::Pin}; -use twilight_gateway::{Event, Shard}; -mod config; -use futures::{Future, StreamExt, select}; -use twilight_model::gateway::event::DispatchEvent; -use futures::FutureExt; - -struct GatewayServer {} -impl Component for GatewayServer { - type Config = GatewayConfig; - const SERVICE_NAME: &'static str = "gateway"; - - fn start( - &self, - settings: Settings<Self::Config>, - stop: oneshot::Receiver<()>, - ) -> AnyhowResultFuture<()> { - Box::pin(async move { - let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) - .shard(settings.shard, settings.shard_total)? - .build(); - - let nats = - Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats) - .await?; - - shard.start().await?; - - let mut stop = stop.fuse(); - loop { - - select! { - event = events.next().fuse() => { - if let Some(event) = event { - match event { - Event::Ready(ready) => { - info!("Logged in as {}", ready.user.name); - } - - _ => { - let name = event.kind().name(); - if let Ok(dispatch_event) = DispatchEvent::try_from(event) { - let data = CachePayload { - tracing: Tracing { - node_id: "".to_string(), - span: None, - }, - data: DispatchEventTagged { - data: dispatch_event, - }, - }; - let value = serde_json::to_string(&data)?; - debug!("nats send: {}", value); - let bytes = bytes::Bytes::from(value); - nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) - .await?; - } - } - } - } else { - break - } - }, - _ = stop => break - }; - } - - info!("stopping shard..."); - shard.shutdown(); - - Ok(()) - }) - } - - fn new() -> Self { - Self {} - } -} +use leash::ignite; +use gateway::GatewayServer; ignite!(GatewayServer); diff --git a/exes/ratelimit/Cargo.toml b/exes/ratelimit/Cargo.toml index a28d2d0..82ca9f6 100644 --- a/exes/ratelimit/Cargo.toml +++ b/exes/ratelimit/Cargo.toml @@ -18,4 +18,4 @@ futures-util = "0.3.17" tracing = "*" serde_json = { version = "1.0" } tonic = "0.8.3" -tokio-stream = "0.1.11"
\ No newline at end of file +tokio-stream = "0.1.11" diff --git a/exes/ratelimit/src/lib.rs b/exes/ratelimit/src/lib.rs new file mode 100644 index 0000000..6d6d608 --- /dev/null +++ b/exes/ratelimit/src/lib.rs @@ -0,0 +1,45 @@ +use std::net::ToSocketAddrs; + +use futures_util::FutureExt; +use grpc::RLServer; +use leash::{AnyhowResultFuture, Component}; +use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer; +use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter; +use shared::{config::Settings, redis_crate::Client}; +use tokio::sync::oneshot; +use tonic::transport::Server; + +mod grpc; +mod redis_global_local_bucket_ratelimiter; + +pub struct RatelimiterServerComponent {} +impl Component for RatelimiterServerComponent { + type Config = (); + const SERVICE_NAME: &'static str = "ratelimiter"; + + fn start( + &self, + settings: Settings<Self::Config>, + stop: oneshot::Receiver<()>, + ) -> AnyhowResultFuture<()> { + Box::pin(async move { + // let config = Arc::new(settings.config); + let redis: Client = settings.redis.into(); + let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis.into())); + + Server::builder() + .add_service(RatelimiterServer::new(server)) + .serve_with_shutdown( + "0.0.0.0:8093".to_socket_addrs().unwrap().next().unwrap(), + stop.map(|_| ()), + ) + .await?; + + Ok(()) + }) + } + + fn new() -> Self { + Self {} + } +} diff --git a/exes/ratelimit/src/main.rs b/exes/ratelimit/src/main.rs index 0ceded2..2de812b 100644 --- a/exes/ratelimit/src/main.rs +++ b/exes/ratelimit/src/main.rs @@ -1,47 +1,4 @@ -use std::net::ToSocketAddrs; - -use futures_util::FutureExt; -use grpc::RLServer; -use leash::{ignite, AnyhowResultFuture, Component}; -use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer; -use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter; -use shared::{config::Settings, redis_crate::Client}; -use tokio::sync::oneshot; -use tonic::transport::Server; - -mod grpc; -mod redis_global_local_bucket_ratelimiter; - -struct RatelimiterServerComponent {} -impl Component for RatelimiterServerComponent { - type Config = (); - const SERVICE_NAME: &'static str = "ratelimiter"; - - fn start( - &self, - settings: Settings<Self::Config>, - stop: oneshot::Receiver<()>, - ) -> AnyhowResultFuture<()> { - Box::pin(async move { - // let config = Arc::new(settings.config); - let redis: Client = settings.redis.into(); - let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis.into())); - - Server::builder() - .add_service(RatelimiterServer::new(server)) - .serve_with_shutdown( - "0.0.0.0:8080".to_socket_addrs().unwrap().next().unwrap(), - stop.map(|_| ()), - ) - .await?; - - Ok(()) - }) - } - - fn new() -> Self { - Self {} - } -} +use leash::ignite; +use ratelimit::RatelimiterServerComponent; ignite!(RatelimiterServerComponent); diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 5c2698b..4e27a30 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use serde::Deserialize; fn default_listening_address() -> SocketAddr { - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080)) + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8090)) } #[derive(Debug, Deserialize, Clone)] diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs new file mode 100644 index 0000000..02721cc --- /dev/null +++ b/exes/rest/src/lib.rs @@ -0,0 +1,67 @@ +use config::ReverseProxyConfig; + +use handler::handle_request; +use hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Client, Request, Server, +}; +use hyper_tls::HttpsConnector; +use leash::{AnyhowResultFuture, Component}; +use shared::config::Settings; +use std::{convert::Infallible, sync::Arc}; +use tokio::sync::oneshot; + +mod config; +mod handler; +mod ratelimit_client; + +pub struct ReverseProxyServer {} +impl Component for ReverseProxyServer { + type Config = ReverseProxyConfig; + const SERVICE_NAME: &'static str = "rest"; + + fn start( + &self, + settings: Settings<Self::Config>, + stop: oneshot::Receiver<()>, + ) -> AnyhowResultFuture<()> { + Box::pin(async move { + // Client to the remote ratelimiters + let ratelimiter = ratelimit_client::RemoteRatelimiter::new(); + let client = Client::builder().build(HttpsConnector::new()); + + let token = Arc::new(settings.discord.token.clone()); + let service_fn = make_service_fn(move |_: &AddrStream| { + let client = client.clone(); + let ratelimiter = ratelimiter.clone(); + let token = token.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { + let client = client.clone(); + let ratelimiter = ratelimiter.clone(); + let token = token.clone(); + async move { + let token = token.as_str(); + handle_request(client, ratelimiter, token, request).await + } + })) + } + }); + + let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn); + + server + .with_graceful_shutdown(async { + stop.await.expect("should not fail"); + }) + .await?; + + Ok(()) + }) + } + + fn new() -> Self { + Self {} + } +}
\ No newline at end of file diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs index 07d835c..fe8ada7 100644 --- a/exes/rest/src/main.rs +++ b/exes/rest/src/main.rs @@ -1,69 +1,4 @@ -use config::ReverseProxyConfig; - -use handler::handle_request; -use hyper::{ - server::conn::AddrStream, - service::{make_service_fn, service_fn}, - Body, Client, Request, Server, -}; -use hyper_tls::HttpsConnector; -use leash::{ignite, AnyhowResultFuture, Component}; -use shared::config::Settings; -use std::{convert::Infallible, sync::Arc}; -use tokio::sync::oneshot; - -mod config; -mod handler; -mod ratelimit_client; - -struct ReverseProxyServer {} -impl Component for ReverseProxyServer { - type Config = ReverseProxyConfig; - const SERVICE_NAME: &'static str = "rest"; - - fn start( - &self, - settings: Settings<Self::Config>, - stop: oneshot::Receiver<()>, - ) -> AnyhowResultFuture<()> { - Box::pin(async move { - // Client to the remote ratelimiters - let ratelimiter = ratelimit_client::RemoteRatelimiter::new(); - let client = Client::builder().build(HttpsConnector::new()); - - let token = Arc::new(settings.discord.token.clone()); - let service_fn = make_service_fn(move |_: &AddrStream| { - let client = client.clone(); - let ratelimiter = ratelimiter.clone(); - let token = token.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { - let client = client.clone(); - let ratelimiter = ratelimiter.clone(); - let token = token.clone(); - async move { - let token = token.as_str(); - handle_request(client, ratelimiter, token, request).await - } - })) - } - }); - - let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn); - - server - .with_graceful_shutdown(async { - stop.await.expect("should not fail"); - }) - .await?; - - Ok(()) - }) - } - - fn new() -> Self { - Self {} - } -} +use leash::ignite; +use rest::ReverseProxyServer; ignite!(ReverseProxyServer); diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index 87737dd..afaf2b7 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -30,14 +30,14 @@ impl Drop for RemoteRatelimiter { impl RemoteRatelimiter { async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> { // get list of dns responses - let responses = dns_lookup::lookup_host("ratelimit") + /*let responses = dns_lookup::lookup_host("localhost") .unwrap() .into_iter() - .map(|f| f.to_string()); + .map(|f| f.to_string());*/ let mut write = self.remotes.write().await; - for ip in responses { + for ip in ["localhost"] { let a = VNode::new(ip.into()).await?; write.add(a.clone()); } diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs index b9f7800..4e3fa06 100644 --- a/exes/rest/src/ratelimit_client/remote_hashring.rs +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -34,7 +34,7 @@ impl Hash for VNode { impl VNode { pub async fn new(address: String) -> Result<Self, tonic::transport::Error> { - let client = RatelimiterClient::connect(format!("http://{}:8080", address.clone())).await?; + let client = RatelimiterClient::connect(format!("http://{}:8093", address.clone())).await?; Ok(VNode { client, address }) } diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs index e98de13..56a9b78 100644 --- a/exes/webhook/src/config.rs +++ b/exes/webhook/src/config.rs @@ -4,7 +4,7 @@ use ed25519_dalek::PublicKey; use serde::{Deserialize, Deserializer}; fn default_listening_address() -> SocketAddr { - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080)) + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8091)) } #[derive(Debug, Deserialize, Clone, Copy)] diff --git a/exes/webhook/src/lib.rs b/exes/webhook/src/lib.rs new file mode 100644 index 0000000..13a4e60 --- /dev/null +++ b/exes/webhook/src/lib.rs @@ -0,0 +1,54 @@ +mod config; +mod handler; +use std::{future::Future, pin::Pin}; + +use crate::{ + config::WebhookConfig, + handler::{handler::WebhookService, make_service::MakeSvc}, +}; +use hyper::Server; +use leash::{AnyhowResultFuture, Component}; +use shared::{config::Settings, log::info, nats_crate::Client}; +use tokio::sync::oneshot; + +#[derive(Clone, Copy)] +pub struct WebhookServer {} + +impl Component for WebhookServer { + type Config = WebhookConfig; + const SERVICE_NAME: &'static str = "webhook"; + + fn start( + &self, + settings: Settings<Self::Config>, + stop: oneshot::Receiver<()>, + ) -> AnyhowResultFuture<()> { + Box::pin(async move { + info!("Starting server on {}", settings.server.listening_adress); + + let bind = settings.server.listening_adress; + info!("NAts connected!"); + let nats = + Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats) + + .await?; + + let make_service = MakeSvc::new(WebhookService { + config: settings.config, + nats: nats.clone(), + }); + + let server = Server::bind(&bind).serve(make_service); + + server.with_graceful_shutdown(async { + stop.await.expect("should not fail"); + }).await?; + + Ok(()) + }) + } + + fn new() -> Self { + Self {} + } +}
\ No newline at end of file diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs index 0215e51..f531725 100644 --- a/exes/webhook/src/main.rs +++ b/exes/webhook/src/main.rs @@ -1,54 +1,4 @@ -mod config; -mod handler; -use std::{future::Future, pin::Pin}; - -use crate::{ - config::WebhookConfig, - handler::{handler::WebhookService, make_service::MakeSvc}, -}; -use hyper::Server; -use leash::{ignite, AnyhowResultFuture, Component}; -use shared::{config::Settings, log::info, nats_crate::Client}; -use tokio::sync::oneshot; - -#[derive(Clone, Copy)] -struct WebhookServer {} - -impl Component for WebhookServer { - type Config = WebhookConfig; - const SERVICE_NAME: &'static str = "webhook"; - - fn start( - &self, - settings: Settings<Self::Config>, - stop: oneshot::Receiver<()>, - ) -> AnyhowResultFuture<()> { - Box::pin(async move { - info!("Starting server on {}", settings.server.listening_adress); - - let bind = settings.server.listening_adress; - let nats = - Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats) - .await?; - - let make_service = MakeSvc::new(WebhookService { - config: settings.config, - nats: nats.clone(), - }); - - let server = Server::bind(&bind).serve(make_service); - - server.with_graceful_shutdown(async { - stop.await.expect("should not fail"); - }).await?; - - Ok(()) - }) - } - - fn new() -> Self { - Self {} - } -} +use leash::ignite; +use webhook::WebhookServer; ignite!(WebhookServer); |
