diff options
| author | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-03 00:14:27 +0400 | 
|---|---|---|
| committer | MatthieuCoder <matthieu@matthieu-dev.xyz> | 2023-01-03 00:14:27 +0400 | 
| commit | 91a27342dbee8ca0478f862df93bf502337f4c6e (patch) | |
| tree | af5f4818d300934d8c9ea14896fe756fea13d578 /exes | |
| parent | c3e47ff0b1b06ff26830e78b7e37e212d1e47200 (diff) | |
add all in one binary
Diffstat (limited to 'exes')
| -rw-r--r-- | exes/all/.gitignore | 3 | ||||
| -rw-r--r-- | exes/all/Cargo.toml | 31 | ||||
| -rw-r--r-- | exes/all/Makefile | 14 | ||||
| -rw-r--r-- | exes/all/build.rs | 26 | ||||
| -rw-r--r-- | exes/all/build/.gitkeep | 0 | ||||
| -rw-r--r-- | exes/all/main.go | 72 | ||||
| -rw-r--r-- | exes/all/src/lib.rs | 137 | ||||
| -rw-r--r-- | exes/cache/src/main.rs | 2 | ||||
| -rw-r--r-- | exes/gateway/src/lib.rs | 87 | ||||
| -rw-r--r-- | exes/gateway/src/main.rs | 89 | ||||
| -rw-r--r-- | exes/ratelimit/Cargo.toml | 2 | ||||
| -rw-r--r-- | exes/ratelimit/src/lib.rs | 45 | ||||
| -rw-r--r-- | exes/ratelimit/src/main.rs | 47 | ||||
| -rw-r--r-- | exes/rest/src/config.rs | 2 | ||||
| -rw-r--r-- | exes/rest/src/lib.rs | 67 | ||||
| -rw-r--r-- | exes/rest/src/main.rs | 69 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/mod.rs | 6 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/remote_hashring.rs | 2 | ||||
| -rw-r--r-- | exes/webhook/src/config.rs | 2 | ||||
| -rw-r--r-- | exes/webhook/src/lib.rs | 54 | ||||
| -rw-r--r-- | exes/webhook/src/main.rs | 54 | 
21 files changed, 552 insertions, 259 deletions
diff --git a/exes/all/.gitignore b/exes/all/.gitignore new file mode 100644 index 0000000..27531c7 --- /dev/null +++ b/exes/all/.gitignore @@ -0,0 +1,3 @@ +build/* +!build/.gitkeep +config/
\ No newline at end of file diff --git a/exes/all/Cargo.toml b/exes/all/Cargo.toml new file mode 100644 index 0000000..d6976ef --- /dev/null +++ b/exes/all/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "all" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +libc = "0.2.139" +leash = { path = "../../libs/leash" } +shared = { path = "../../libs/shared" } + +cache = { path = "../cache" } +gateway = { path = "../gateway" } +ratelimit = { path = "../ratelimit" } +rest = { path = "../rest" } +webhook = { path = "../webhook" } + +tokio = { version = "1.23.0", features = ["full"] } +serde = "1.0.152" +serde_json = "1.0.91" +anyhow = "1.0.68" + +config = "0.13.3" +pretty_env_logger = "0.4.0" + +[lib] +crate-type = ["cdylib"] + +[build-dependencies] +cbindgen = "0.24.3"
\ No newline at end of file diff --git a/exes/all/Makefile b/exes/all/Makefile new file mode 100644 index 0000000..07c02d9 --- /dev/null +++ b/exes/all/Makefile @@ -0,0 +1,14 @@ +clean: +	rm ./build/libhello.so + +library: +	cargo build --release + +build: library +	cp ../../target/release/liball.so ./build +	go build -ldflags="-r build" -o build/all + +all: library build + +run: all +	./build/all diff --git a/exes/all/build.rs b/exes/all/build.rs new file mode 100644 index 0000000..e779d53 --- /dev/null +++ b/exes/all/build.rs @@ -0,0 +1,26 @@ +extern crate cbindgen; + +use std::env; +use std::path::PathBuf; +use cbindgen::{Config, Language}; + + +fn main() { +    let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); + +    let package_name = env::var("CARGO_PKG_NAME").unwrap(); +    let output_file = PathBuf::from("./build") +        .join(format!("{}.h", package_name)) +        .display() +        .to_string(); + +    let config = Config { +        namespace: Some(String::from("ffi")), +        language: Language::C, +        ..Default::default() +    }; + +    cbindgen::generate_with_config(&crate_dir, config) +      .unwrap() +      .write_to_file(&output_file); +} diff --git a/exes/all/build/.gitkeep b/exes/all/build/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/exes/all/build/.gitkeep diff --git a/exes/all/main.go b/exes/all/main.go new file mode 100644 index 0000000..0d15450 --- /dev/null +++ b/exes/all/main.go @@ -0,0 +1,72 @@ +package main + +/* +#cgo LDFLAGS: -Lbuild -lall +#include "./build/all.h" +*/ +import "C" + +import ( +	"fmt" +	"os" +	"os/signal" +	"syscall" +	"time" + +	"github.com/Jeffail/gabs" +	"github.com/alicebob/miniredis/v2" + +	server "github.com/nats-io/nats-server/v2/server" +) + +func main() { +	// Intialise les logs de la librarie Rust +	C.init_logs() +	// Charge la configuration +	str := C.GoString(C.load_config()) + +	// Démarre une instance MiniRedis +	mr := miniredis.NewMiniRedis() +	err := mr.Start() + +	if err != nil { +		panic(err) +	} + +	// Démarre un serveur Nats +	opts := &server.Options{} +	opts.Host = "0.0.0.0" +	ns, err := server.NewServer(opts) + +	if err != nil { +		panic(err) +	} + +	go ns.Start() + +	if !ns.ReadyForConnections(4 * time.Second) { +		panic("not ready for connection") +	} + +	// Edite le json de configuration donné +	// Et injecte la configuration des servers Nats et MiniRedis +	json, _ := gabs.ParseJSON([]byte(str)) +	json.Set(fmt.Sprintf("redis://%s", mr.Addr()), "redis", "url") +	json.Set("localhost", "nats", "host") +	json.Set(1, "webhook", "discord", "client_id") + +	// Démarre une instance de nova +	instance := C.start_instance(C.CString(json.String())) + +	// Wait for a SIGINT +	c := make(chan os.Signal, 1) +	signal.Notify(c, +		syscall.SIGHUP, +		syscall.SIGINT, +		syscall.SIGTERM, +		syscall.SIGQUIT) +	<-c + +	println("Arret de nova all in one") +	C.stop_instance(instance) +} diff --git a/exes/all/src/lib.rs b/exes/all/src/lib.rs new file mode 100644 index 0000000..f6b527d --- /dev/null +++ b/exes/all/src/lib.rs @@ -0,0 +1,137 @@ +extern crate libc; +use anyhow::Result; +use config::{Config, Environment, File}; +use gateway::GatewayServer; +use leash::Component; +use ratelimit::RatelimiterServerComponent; +use rest::ReverseProxyServer; +use serde::de::DeserializeOwned; +use serde_json::Value; +use shared::{config::Settings, log::info}; +use std::{ +    env, +    ffi::{CStr, CString}, +    time::Duration, +}; +use tokio::{ +    runtime::Runtime, +    sync::oneshot::{self, Sender}, +    task::JoinHandle, +}; +use webhook::WebhookServer; + +pub struct AllInOneInstance { +    pub stop: Sender<Sender<()>>, +    pub runtime: Runtime, +} + +fn load_settings_for<T: Default + DeserializeOwned + Clone>( +    settings: &str, +    name: &str, +) -> Result<Settings<T>> { +    let value: Value = serde_json::from_str(settings)?; +    let section: T = serde_json::from_value(value.get(name).unwrap().to_owned())?; +    let mut settings: Settings<T> = serde_json::from_value(value)?; +    settings.config = section; + +    Ok(settings) +} + +// Start a component +async fn start_component<T: Component>( +    settings: String, +    aio: &mut Vec<Sender<()>>, +) -> JoinHandle<()> { +    let name = T::SERVICE_NAME; +    let instance = T::new(); + +    let (stop, signal) = oneshot::channel(); + +    aio.push(stop); + +    tokio::spawn(async move { +        let config = load_settings_for::<<T as Component>::Config>(&settings, name).unwrap(); +        instance.start(config, signal).await.unwrap(); +    }) +} + +#[no_mangle] +/// Loads the config json using the nova shared config loader +pub extern "C" fn load_config() -> *const libc::c_char { +    let mut builder = Config::builder(); + +    builder = builder.add_source(File::with_name("config/default")); +    let mode = env::var("ENV").unwrap_or_else(|_| "development".into()); +    info!("Configuration Environment: {}", mode); + +    builder = builder.add_source(File::with_name(&format!("config/{}", mode)).required(false)); +    builder = builder.add_source(File::with_name("config/local").required(false)); + +    let env = Environment::with_prefix("NOVA").separator("__"); +    // we can configure each component using environment variables +    builder = builder.add_source(env); + +    let config: Value = builder.build().unwrap().try_deserialize().unwrap(); +    let s = serde_json::to_string(&config).unwrap(); + +    let c_str_song = CString::new(s).unwrap(); +    c_str_song.into_raw() +} + +#[no_mangle] +/// Initialise les logs des composants de nova +/// Utilise la crate `pretty_log_env` +pub extern "C" fn init_logs() { +    pretty_env_logger::init(); +} + +#[no_mangle] +/// Stops a nova instance +pub unsafe extern "C" fn stop_instance(instance: *mut AllInOneInstance) { +    let instance = Box::from_raw(instance); +    let (tell_ready, ready) = oneshot::channel(); +    instance.stop.send(tell_ready).unwrap(); +    ready.blocking_recv().unwrap(); +    instance.runtime.shutdown_timeout(Duration::from_secs(5)); +} + +#[no_mangle] +/// Initialized a new nova instance and an async runtime (tokio reactor) +/// Dont forget to stop this instance using `stop_instance` +pub extern "C" fn start_instance(config: *const libc::c_char) -> *mut AllInOneInstance { +    let buf_name = unsafe { CStr::from_ptr(config).to_bytes() }; +    let settings = String::from_utf8(buf_name.to_vec()).unwrap(); +    let (stop, trigger_stop) = oneshot::channel(); + +    // Initialize a tokio runtime +    let rt = Runtime::new().unwrap(); +    rt.block_on(async move { +        // Start the gateway server + +        let mut aio = vec![]; +        let mut handles = vec![]; + +        // Start components +        handles.push(start_component::<GatewayServer>(settings.clone(), &mut aio).await); +        handles +            .push(start_component::<RatelimiterServerComponent>(settings.clone(), &mut aio).await); +        handles.push(start_component::<ReverseProxyServer>(settings.clone(), &mut aio).await); +        handles.push(start_component::<WebhookServer>(settings.clone(), &mut aio).await); + +        // wait for exit +        let done: Sender<()> = trigger_stop.await.unwrap(); + +        // Tell all the threads to stop. +        while let Some(stop_signal) = aio.pop() { +            stop_signal.send(()).unwrap(); +        } + +        // Wait for all the threads to finish. +        while let Some(handle) = handles.pop() { +            handle.await.unwrap(); +        } + +        done.send(()).unwrap(); +    }); +    Box::into_raw(Box::new(AllInOneInstance { stop, runtime: rt })) +} diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs index 312f960..5240a6a 100644 --- a/exes/cache/src/main.rs +++ b/exes/cache/src/main.rs @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {      let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();      info!("loaded configuration: {:?}", settings);      let nats = -        Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats).await?; +        Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats).await?;      // let redis: redis::Client = settings.redis.into();      let mut cache = Cache::default(); diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs new file mode 100644 index 0000000..d7a4cee --- /dev/null +++ b/exes/gateway/src/lib.rs @@ -0,0 +1,87 @@ +use config::GatewayConfig; +use leash::{AnyhowResultFuture, Component}; +use shared::{ +    config::Settings, +    log::{debug, info}, +    nats_crate::Client, +    payloads::{CachePayload, DispatchEventTagged, Tracing}, +}; +use std::{convert::TryFrom, pin::Pin}; +use tokio::sync::oneshot; +use twilight_gateway::{Event, Shard}; +pub mod config; +use futures::FutureExt; +use futures::{select, Future, StreamExt}; +use twilight_model::gateway::event::DispatchEvent; + +pub struct GatewayServer {} +impl Component for GatewayServer { +    type Config = GatewayConfig; +    const SERVICE_NAME: &'static str = "gateway"; + +    fn start( +        &self, +        settings: Settings<Self::Config>, +        stop: oneshot::Receiver<()>, +    ) -> AnyhowResultFuture<()> { +        Box::pin(async move { +            let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) +                .shard(settings.shard, settings.shard_total)? +                .build(); + +            let nats = Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into( +                settings.nats, +            ) +            .await?; + +            shard.start().await?; + +            let mut stop = stop.fuse(); +            loop { +                select! { +                    event = events.next().fuse() => { +                        if let Some(event) = event { +                            match event { +                                Event::Ready(ready) => { +                                    info!("Logged in as {}", ready.user.name); +                                } + +                                _ => { +                                    let name = event.kind().name(); +                                    if let Ok(dispatch_event) = DispatchEvent::try_from(event) { +                                        let data = CachePayload { +                                            tracing: Tracing { +                                                node_id: "".to_string(), +                                                span: None, +                                            }, +                                            data: DispatchEventTagged { +                                                data: dispatch_event, +                                            }, +                                        }; +                                        let value = serde_json::to_string(&data)?; +                                        debug!("nats send: {}", value); +                                        let bytes = bytes::Bytes::from(value); +                                        nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) +                                            .await?; +                                    } +                                } +                            } +                        } else { +                            break +                        } +                    }, +                    _ = stop => break +                }; +            } + +            info!("stopping shard..."); +            shard.shutdown(); + +            Ok(()) +        }) +    } + +    fn new() -> Self { +        Self {} +    } +} diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index f2a4f93..2e18f9c 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -1,89 +1,4 @@ -use config::GatewayConfig; -use leash::{ignite, AnyhowResultFuture, Component}; -use shared::{ -    config::Settings, -    log::{debug, info}, -    nats_crate::Client, -    payloads::{CachePayload, DispatchEventTagged, Tracing}, -}; -use tokio::sync::oneshot; -use std::{convert::TryFrom, pin::Pin}; -use twilight_gateway::{Event, Shard}; -mod config; -use futures::{Future, StreamExt, select}; -use twilight_model::gateway::event::DispatchEvent; -use futures::FutureExt; - -struct GatewayServer {} -impl Component for GatewayServer { -    type Config = GatewayConfig; -    const SERVICE_NAME: &'static str = "gateway"; - -    fn start( -        &self, -        settings: Settings<Self::Config>, -        stop: oneshot::Receiver<()>, -    ) -> AnyhowResultFuture<()> { -        Box::pin(async move { -            let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) -                .shard(settings.shard, settings.shard_total)? -                .build(); - -            let nats = -                Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats) -                    .await?; - -            shard.start().await?; - -            let mut stop = stop.fuse(); -            loop { - -                select! { -                    event = events.next().fuse() => { -                        if let Some(event) = event { -                            match event { -                                Event::Ready(ready) => { -                                    info!("Logged in as {}", ready.user.name); -                                } -             -                                _ => { -                                    let name = event.kind().name(); -                                    if let Ok(dispatch_event) = DispatchEvent::try_from(event) { -                                        let data = CachePayload { -                                            tracing: Tracing { -                                                node_id: "".to_string(), -                                                span: None, -                                            }, -                                            data: DispatchEventTagged { -                                                data: dispatch_event, -                                            }, -                                        }; -                                        let value = serde_json::to_string(&data)?; -                                        debug!("nats send: {}", value); -                                        let bytes = bytes::Bytes::from(value); -                                        nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) -                                            .await?; -                                    } -                                } -                            } -                        } else { -                            break -                        } -                    }, -                    _ = stop => break -                }; -            } - -            info!("stopping shard..."); -            shard.shutdown(); - -            Ok(()) -        }) -    } - -    fn new() -> Self { -        Self {} -    } -} +use leash::ignite; +use gateway::GatewayServer;  ignite!(GatewayServer); diff --git a/exes/ratelimit/Cargo.toml b/exes/ratelimit/Cargo.toml index a28d2d0..82ca9f6 100644 --- a/exes/ratelimit/Cargo.toml +++ b/exes/ratelimit/Cargo.toml @@ -18,4 +18,4 @@ futures-util = "0.3.17"  tracing = "*"  serde_json = { version = "1.0" }  tonic = "0.8.3" -tokio-stream = "0.1.11"
\ No newline at end of file +tokio-stream = "0.1.11" diff --git a/exes/ratelimit/src/lib.rs b/exes/ratelimit/src/lib.rs new file mode 100644 index 0000000..6d6d608 --- /dev/null +++ b/exes/ratelimit/src/lib.rs @@ -0,0 +1,45 @@ +use std::net::ToSocketAddrs; + +use futures_util::FutureExt; +use grpc::RLServer; +use leash::{AnyhowResultFuture, Component}; +use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer; +use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter; +use shared::{config::Settings, redis_crate::Client}; +use tokio::sync::oneshot; +use tonic::transport::Server; + +mod grpc; +mod redis_global_local_bucket_ratelimiter; + +pub struct RatelimiterServerComponent {} +impl Component for RatelimiterServerComponent { +    type Config = (); +    const SERVICE_NAME: &'static str = "ratelimiter"; + +    fn start( +        &self, +        settings: Settings<Self::Config>, +        stop: oneshot::Receiver<()>, +    ) -> AnyhowResultFuture<()> { +        Box::pin(async move { +            // let config = Arc::new(settings.config); +            let redis: Client = settings.redis.into(); +            let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis.into())); + +            Server::builder() +                .add_service(RatelimiterServer::new(server)) +                .serve_with_shutdown( +                    "0.0.0.0:8093".to_socket_addrs().unwrap().next().unwrap(), +                    stop.map(|_| ()), +                ) +                .await?; + +            Ok(()) +        }) +    } + +    fn new() -> Self { +        Self {} +    } +} diff --git a/exes/ratelimit/src/main.rs b/exes/ratelimit/src/main.rs index 0ceded2..2de812b 100644 --- a/exes/ratelimit/src/main.rs +++ b/exes/ratelimit/src/main.rs @@ -1,47 +1,4 @@ -use std::net::ToSocketAddrs; - -use futures_util::FutureExt; -use grpc::RLServer; -use leash::{ignite, AnyhowResultFuture, Component}; -use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer; -use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter; -use shared::{config::Settings, redis_crate::Client}; -use tokio::sync::oneshot; -use tonic::transport::Server; - -mod grpc; -mod redis_global_local_bucket_ratelimiter; - -struct RatelimiterServerComponent {} -impl Component for RatelimiterServerComponent { -    type Config = (); -    const SERVICE_NAME: &'static str = "ratelimiter"; - -    fn start( -        &self, -        settings: Settings<Self::Config>, -        stop: oneshot::Receiver<()>, -    ) -> AnyhowResultFuture<()> { -        Box::pin(async move { -            // let config = Arc::new(settings.config); -            let redis: Client = settings.redis.into(); -            let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis.into())); - -            Server::builder() -                .add_service(RatelimiterServer::new(server)) -                .serve_with_shutdown( -                    "0.0.0.0:8080".to_socket_addrs().unwrap().next().unwrap(), -                    stop.map(|_| ()), -                ) -                .await?; - -            Ok(()) -        }) -    } - -    fn new() -> Self { -        Self {} -    } -} +use leash::ignite; +use ratelimit::RatelimiterServerComponent;  ignite!(RatelimiterServerComponent); diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 5c2698b..4e27a30 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};  use serde::Deserialize;  fn default_listening_address() -> SocketAddr { -    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080)) +    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8090))  }  #[derive(Debug, Deserialize, Clone)] diff --git a/exes/rest/src/lib.rs b/exes/rest/src/lib.rs new file mode 100644 index 0000000..02721cc --- /dev/null +++ b/exes/rest/src/lib.rs @@ -0,0 +1,67 @@ +use config::ReverseProxyConfig; + +use handler::handle_request; +use hyper::{ +    server::conn::AddrStream, +    service::{make_service_fn, service_fn}, +    Body, Client, Request, Server, +}; +use hyper_tls::HttpsConnector; +use leash::{AnyhowResultFuture, Component}; +use shared::config::Settings; +use std::{convert::Infallible, sync::Arc}; +use tokio::sync::oneshot; + +mod config; +mod handler; +mod ratelimit_client; + +pub struct ReverseProxyServer {} +impl Component for ReverseProxyServer { +    type Config = ReverseProxyConfig; +    const SERVICE_NAME: &'static str = "rest"; + +    fn start( +        &self, +        settings: Settings<Self::Config>, +        stop: oneshot::Receiver<()>, +    ) -> AnyhowResultFuture<()> { +        Box::pin(async move { +            // Client to the remote ratelimiters +            let ratelimiter = ratelimit_client::RemoteRatelimiter::new(); +            let client = Client::builder().build(HttpsConnector::new()); + +            let token = Arc::new(settings.discord.token.clone()); +            let service_fn = make_service_fn(move |_: &AddrStream| { +                let client = client.clone(); +                let ratelimiter = ratelimiter.clone(); +                let token = token.clone(); +                async move { +                    Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { +                        let client = client.clone(); +                        let ratelimiter = ratelimiter.clone(); +                        let token = token.clone(); +                        async move { +                            let token = token.as_str(); +                            handle_request(client, ratelimiter, token, request).await +                        } +                    })) +                } +            }); + +            let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn); + +            server +                .with_graceful_shutdown(async { +                    stop.await.expect("should not fail"); +                }) +                .await?; + +            Ok(()) +        }) +    } + +    fn new() -> Self { +        Self {} +    } +}
\ No newline at end of file diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs index 07d835c..fe8ada7 100644 --- a/exes/rest/src/main.rs +++ b/exes/rest/src/main.rs @@ -1,69 +1,4 @@ -use config::ReverseProxyConfig; - -use handler::handle_request; -use hyper::{ -    server::conn::AddrStream, -    service::{make_service_fn, service_fn}, -    Body, Client, Request, Server, -}; -use hyper_tls::HttpsConnector; -use leash::{ignite, AnyhowResultFuture, Component}; -use shared::config::Settings; -use std::{convert::Infallible, sync::Arc}; -use tokio::sync::oneshot; - -mod config; -mod handler; -mod ratelimit_client; - -struct ReverseProxyServer {} -impl Component for ReverseProxyServer { -    type Config = ReverseProxyConfig; -    const SERVICE_NAME: &'static str = "rest"; - -    fn start( -        &self, -        settings: Settings<Self::Config>, -        stop: oneshot::Receiver<()>, -    ) -> AnyhowResultFuture<()> { -        Box::pin(async move { -            // Client to the remote ratelimiters -            let ratelimiter = ratelimit_client::RemoteRatelimiter::new(); -            let client = Client::builder().build(HttpsConnector::new()); - -            let token = Arc::new(settings.discord.token.clone()); -            let service_fn = make_service_fn(move |_: &AddrStream| { -                let client = client.clone(); -                let ratelimiter = ratelimiter.clone(); -                let token = token.clone(); -                async move { -                    Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { -                        let client = client.clone(); -                        let ratelimiter = ratelimiter.clone(); -                        let token = token.clone(); -                        async move { -                            let token = token.as_str(); -                            handle_request(client, ratelimiter, token, request).await -                        } -                    })) -                } -            }); - -            let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn); - -            server -                .with_graceful_shutdown(async { -                    stop.await.expect("should not fail"); -                }) -                .await?; - -            Ok(()) -        }) -    } - -    fn new() -> Self { -        Self {} -    } -} +use leash::ignite; +use rest::ReverseProxyServer;  ignite!(ReverseProxyServer); diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index 87737dd..afaf2b7 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -30,14 +30,14 @@ impl Drop for RemoteRatelimiter {  impl RemoteRatelimiter {      async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> {          // get list of dns responses -        let responses = dns_lookup::lookup_host("ratelimit") +        /*let responses = dns_lookup::lookup_host("localhost")              .unwrap()              .into_iter() -            .map(|f| f.to_string()); +            .map(|f| f.to_string());*/          let mut write = self.remotes.write().await; -        for ip in responses { +        for ip in ["localhost"] {              let a = VNode::new(ip.into()).await?;              write.add(a.clone());          } diff --git a/exes/rest/src/ratelimit_client/remote_hashring.rs b/exes/rest/src/ratelimit_client/remote_hashring.rs index b9f7800..4e3fa06 100644 --- a/exes/rest/src/ratelimit_client/remote_hashring.rs +++ b/exes/rest/src/ratelimit_client/remote_hashring.rs @@ -34,7 +34,7 @@ impl Hash for VNode {  impl VNode {      pub async fn new(address: String) -> Result<Self, tonic::transport::Error> { -        let client = RatelimiterClient::connect(format!("http://{}:8080", address.clone())).await?; +        let client = RatelimiterClient::connect(format!("http://{}:8093", address.clone())).await?;          Ok(VNode { client, address })      } diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs index e98de13..56a9b78 100644 --- a/exes/webhook/src/config.rs +++ b/exes/webhook/src/config.rs @@ -4,7 +4,7 @@ use ed25519_dalek::PublicKey;  use serde::{Deserialize, Deserializer};  fn default_listening_address() -> SocketAddr { -    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080)) +    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8091))  }  #[derive(Debug, Deserialize, Clone, Copy)] diff --git a/exes/webhook/src/lib.rs b/exes/webhook/src/lib.rs new file mode 100644 index 0000000..13a4e60 --- /dev/null +++ b/exes/webhook/src/lib.rs @@ -0,0 +1,54 @@ +mod config; +mod handler; +use std::{future::Future, pin::Pin}; + +use crate::{ +    config::WebhookConfig, +    handler::{handler::WebhookService, make_service::MakeSvc}, +}; +use hyper::Server; +use leash::{AnyhowResultFuture, Component}; +use shared::{config::Settings, log::info, nats_crate::Client}; +use tokio::sync::oneshot; + +#[derive(Clone, Copy)] +pub struct WebhookServer {} + +impl Component for WebhookServer { +    type Config = WebhookConfig; +    const SERVICE_NAME: &'static str = "webhook"; + +    fn start( +        &self, +        settings: Settings<Self::Config>, +        stop: oneshot::Receiver<()>, +    ) -> AnyhowResultFuture<()> { +        Box::pin(async move { +            info!("Starting server on {}", settings.server.listening_adress); + +            let bind = settings.server.listening_adress; +            info!("NAts connected!"); +            let nats = +                Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats) + +                    .await?; + +            let make_service = MakeSvc::new(WebhookService { +                config: settings.config, +                nats: nats.clone(), +            }); + +            let server = Server::bind(&bind).serve(make_service); + +            server.with_graceful_shutdown(async { +                stop.await.expect("should not fail"); +            }).await?; + +            Ok(()) +        }) +    } + +    fn new() -> Self { +        Self {} +    } +}
\ No newline at end of file diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs index 0215e51..f531725 100644 --- a/exes/webhook/src/main.rs +++ b/exes/webhook/src/main.rs @@ -1,54 +1,4 @@ -mod config; -mod handler; -use std::{future::Future, pin::Pin}; - -use crate::{ -    config::WebhookConfig, -    handler::{handler::WebhookService, make_service::MakeSvc}, -}; -use hyper::Server; -use leash::{ignite, AnyhowResultFuture, Component}; -use shared::{config::Settings, log::info, nats_crate::Client}; -use tokio::sync::oneshot; - -#[derive(Clone, Copy)] -struct WebhookServer {} - -impl Component for WebhookServer { -    type Config = WebhookConfig; -    const SERVICE_NAME: &'static str = "webhook"; - -    fn start( -        &self, -        settings: Settings<Self::Config>, -        stop: oneshot::Receiver<()>, -    ) -> AnyhowResultFuture<()> { -        Box::pin(async move { -            info!("Starting server on {}", settings.server.listening_adress); - -            let bind = settings.server.listening_adress; -            let nats = -                Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats) -                    .await?; - -            let make_service = MakeSvc::new(WebhookService { -                config: settings.config, -                nats: nats.clone(), -            }); - -            let server = Server::bind(&bind).serve(make_service); - -            server.with_graceful_shutdown(async { -                stop.await.expect("should not fail"); -            }).await?; - -            Ok(()) -        }) -    } - -    fn new() -> Self { -        Self {} -    } -} +use leash::ignite; +use webhook::WebhookServer;  ignite!(WebhookServer);  | 
