diff options
Diffstat (limited to 'exes')
| -rw-r--r-- | exes/gateway/src/main.rs | 72 | ||||
| -rw-r--r-- | exes/rest/src/config.rs | 2 | ||||
| -rw-r--r-- | exes/rest/src/handler.rs | 25 | ||||
| -rw-r--r-- | exes/rest/src/main.rs | 21 | ||||
| -rw-r--r-- | exes/rest/src/ratelimit_client/mod.rs | 2 | ||||
| -rw-r--r-- | exes/webhook/Cargo.toml | 1 | ||||
| -rw-r--r-- | exes/webhook/src/config.rs | 2 | ||||
| -rw-r--r-- | exes/webhook/src/main.rs | 11 |
8 files changed, 95 insertions, 41 deletions
diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index 7957b08..f2a4f93 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -6,18 +6,24 @@ use shared::{ nats_crate::Client, payloads::{CachePayload, DispatchEventTagged, Tracing}, }; +use tokio::sync::oneshot; use std::{convert::TryFrom, pin::Pin}; use twilight_gateway::{Event, Shard}; mod config; -use futures::{Future, StreamExt}; +use futures::{Future, StreamExt, select}; use twilight_model::gateway::event::DispatchEvent; +use futures::FutureExt; struct GatewayServer {} impl Component for GatewayServer { type Config = GatewayConfig; const SERVICE_NAME: &'static str = "gateway"; - fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> { + fn start( + &self, + settings: Settings<Self::Config>, + stop: oneshot::Receiver<()>, + ) -> AnyhowResultFuture<()> { Box::pin(async move { let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) .shard(settings.shard, settings.shard_total)? @@ -29,34 +35,48 @@ impl Component for GatewayServer { shard.start().await?; - while let Some(event) = events.next().await { - match event { - Event::Ready(ready) => { - info!("Logged in as {}", ready.user.name); - } + let mut stop = stop.fuse(); + loop { - _ => { - let name = event.kind().name(); - if let Ok(dispatch_event) = DispatchEvent::try_from(event) { - let data = CachePayload { - tracing: Tracing { - node_id: "".to_string(), - span: None, - }, - data: DispatchEventTagged { - data: dispatch_event, - }, - }; - let value = serde_json::to_string(&data)?; - debug!("nats send: {}", value); - let bytes = bytes::Bytes::from(value); - nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) - .await?; + select! { + event = events.next().fuse() => { + if let Some(event) = event { + match event { + Event::Ready(ready) => { + info!("Logged in as {}", ready.user.name); + } + + _ => { + let name = event.kind().name(); + if let Ok(dispatch_event) = DispatchEvent::try_from(event) { + let data = CachePayload { + tracing: Tracing { + node_id: "".to_string(), + span: None, + }, + data: DispatchEventTagged { + data: dispatch_event, + }, + }; + let value = serde_json::to_string(&data)?; + debug!("nats send: {}", value); + let bytes = bytes::Bytes::from(value); + nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) + .await?; + } + } + } + } else { + break } - } - } + }, + _ = stop => break + }; } + info!("stopping shard..."); + shard.shutdown(); + Ok(()) }) } diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs index 9261de2..5c2698b 100644 --- a/exes/rest/src/config.rs +++ b/exes/rest/src/config.rs @@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use serde::Deserialize; fn default_listening_address() -> SocketAddr { - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)) + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080)) } #[derive(Debug, Deserialize, Clone)] diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index 8b0dd52..ea81ade 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -3,6 +3,7 @@ use std::{ convert::TryFrom, hash::{Hash, Hasher}, str::FromStr, + time::Instant, }; use anyhow::bail; @@ -38,7 +39,7 @@ fn normalize_path(request_path: &str) -> (&str, &str) { pub async fn handle_request( client: Client<HttpsConnector<HttpConnector>, Body>, ratelimiter: RemoteRatelimiter, - token: String, + token: &str, mut request: Request<Body>, ) -> Result<Response<Body>, anyhow::Error> { let (hash, uri_string) = { @@ -57,7 +58,7 @@ pub async fn handle_request( let request_path = request.uri().path(); let (api_path, trimmed_path) = normalize_path(&request_path); - let mut uri_string = format!("http://192.168.0.27:8000{}{}", api_path, trimmed_path); + let mut uri_string = format!("https://discord.com{}{}", api_path, trimmed_path); if let Some(query) = request.uri().query() { uri_string.push('?'); uri_string.push_str(query); @@ -79,6 +80,7 @@ pub async fn handle_request( (hash.finish().to_string(), uri_string) }; + let start_ticket_request = Instant::now(); let header_sender = match ratelimiter.ticket(hash).await { Ok(sender) => sender, Err(e) => { @@ -86,6 +88,7 @@ pub async fn handle_request( bail!("failed to reteive ticket"); } }; + let time_took_ticket = Instant::now() - start_ticket_request; request.headers_mut().insert( AUTHORIZATION, @@ -106,9 +109,7 @@ pub async fn handle_request( request.headers_mut().remove(AUTHORIZATION); request.headers_mut().append( AUTHORIZATION, - HeaderValue::from_static( - "Bot ODA3MTg4MzM1NzE3Mzg0MjEy.G3sXFM.8gY2sVYDAq2WuPWwDskAAEFLfTg8htooxME-LE", - ), + HeaderValue::from_str(&format!("Bot {}", token))?, ); let uri = match Uri::from_str(&uri_string) { @@ -119,14 +120,26 @@ pub async fn handle_request( } }; *request.uri_mut() = uri; - let resp = match client.request(request).await { + + let start_upstream_req = Instant::now(); + let mut resp = match client.request(request).await { Ok(response) => response, Err(e) => { error!("Error when requesting the Discord API: {:?}", e); bail!("failed to request the discord api"); } }; + let upstream_time_took = Instant::now() - start_upstream_req; + resp.headers_mut().append( + "X-TicketRequest-Ms", + HeaderValue::from_str(&time_took_ticket.as_millis().to_string()).unwrap(), + ); + resp.headers_mut().append( + "X-Upstream-Ms", + HeaderValue::from_str(&upstream_time_took.as_millis().to_string()).unwrap(), + ); + let ratelimit_headers = resp .headers() .into_iter() diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs index 8d014ab..07d835c 100644 --- a/exes/rest/src/main.rs +++ b/exes/rest/src/main.rs @@ -9,7 +9,8 @@ use hyper::{ use hyper_tls::HttpsConnector; use leash::{ignite, AnyhowResultFuture, Component}; use shared::config::Settings; -use std::convert::Infallible; +use std::{convert::Infallible, sync::Arc}; +use tokio::sync::oneshot; mod config; mod handler; @@ -20,21 +21,29 @@ impl Component for ReverseProxyServer { type Config = ReverseProxyConfig; const SERVICE_NAME: &'static str = "rest"; - fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> { + fn start( + &self, + settings: Settings<Self::Config>, + stop: oneshot::Receiver<()>, + ) -> AnyhowResultFuture<()> { Box::pin(async move { // Client to the remote ratelimiters let ratelimiter = ratelimit_client::RemoteRatelimiter::new(); let client = Client::builder().build(HttpsConnector::new()); + let token = Arc::new(settings.discord.token.clone()); let service_fn = make_service_fn(move |_: &AddrStream| { let client = client.clone(); let ratelimiter = ratelimiter.clone(); + let token = token.clone(); async move { Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { let client = client.clone(); let ratelimiter = ratelimiter.clone(); + let token = token.clone(); async move { - handle_request(client, ratelimiter, "token".to_string(), request).await + let token = token.as_str(); + handle_request(client, ratelimiter, token, request).await } })) } @@ -42,7 +51,11 @@ impl Component for ReverseProxyServer { let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn); - server.await?; + server + .with_graceful_shutdown(async { + stop.await.expect("should not fail"); + }) + .await?; Ok(()) }) diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index 8263d15..87737dd 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -64,7 +64,7 @@ impl RemoteRatelimiter { obj_clone.get_ratelimiters().await.unwrap(); tokio::select! { () = &mut sleep => { - println!("timer elapsed"); + debug!("timer elapsed"); }, _ = tx.recv() => {} } diff --git a/exes/webhook/Cargo.toml b/exes/webhook/Cargo.toml index 12a6608..589b5bd 100644 --- a/exes/webhook/Cargo.toml +++ b/exes/webhook/Cargo.toml @@ -17,6 +17,7 @@ lazy_static = "1.4.0" ed25519-dalek = "1" twilight-model = { version = "0.14" } anyhow = "1.0.68" +futures-util = "0.3.25" [[bin]] name = "webhook" diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs index 68f6a5f..e98de13 100644 --- a/exes/webhook/src/config.rs +++ b/exes/webhook/src/config.rs @@ -4,7 +4,7 @@ use ed25519_dalek::PublicKey; use serde::{Deserialize, Deserializer}; fn default_listening_address() -> SocketAddr { - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080)) + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080)) } #[derive(Debug, Deserialize, Clone, Copy)] diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs index efd4147..0215e51 100644 --- a/exes/webhook/src/main.rs +++ b/exes/webhook/src/main.rs @@ -9,6 +9,7 @@ use crate::{ use hyper::Server; use leash::{ignite, AnyhowResultFuture, Component}; use shared::{config::Settings, log::info, nats_crate::Client}; +use tokio::sync::oneshot; #[derive(Clone, Copy)] struct WebhookServer {} @@ -17,7 +18,11 @@ impl Component for WebhookServer { type Config = WebhookConfig; const SERVICE_NAME: &'static str = "webhook"; - fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> { + fn start( + &self, + settings: Settings<Self::Config>, + stop: oneshot::Receiver<()>, + ) -> AnyhowResultFuture<()> { Box::pin(async move { info!("Starting server on {}", settings.server.listening_adress); @@ -33,7 +38,9 @@ impl Component for WebhookServer { let server = Server::bind(&bind).serve(make_service); - server.await?; + server.with_graceful_shutdown(async { + stop.await.expect("should not fail"); + }).await?; Ok(()) }) |
