summaryrefslogtreecommitdiff
path: root/exes
diff options
context:
space:
mode:
authorMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-02 19:53:53 +0400
committerMatthieuCoder <matthieu@matthieu-dev.xyz>2023-01-02 19:53:53 +0400
commitf152af136f24f309cd95e645cbc2e06b776a01d7 (patch)
tree32e8c97ec897a23fc317f20a5881cc7c5b24e04e /exes
parent867e7d7a0c80e0c8c6855d3d0c3232b171f53d69 (diff)
add token from config and change the signal handler to SIGTERM
Diffstat (limited to 'exes')
-rw-r--r--exes/gateway/src/main.rs72
-rw-r--r--exes/rest/src/config.rs2
-rw-r--r--exes/rest/src/handler.rs25
-rw-r--r--exes/rest/src/main.rs21
-rw-r--r--exes/rest/src/ratelimit_client/mod.rs2
-rw-r--r--exes/webhook/Cargo.toml1
-rw-r--r--exes/webhook/src/config.rs2
-rw-r--r--exes/webhook/src/main.rs11
8 files changed, 95 insertions, 41 deletions
diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs
index 7957b08..f2a4f93 100644
--- a/exes/gateway/src/main.rs
+++ b/exes/gateway/src/main.rs
@@ -6,18 +6,24 @@ use shared::{
nats_crate::Client,
payloads::{CachePayload, DispatchEventTagged, Tracing},
};
+use tokio::sync::oneshot;
use std::{convert::TryFrom, pin::Pin};
use twilight_gateway::{Event, Shard};
mod config;
-use futures::{Future, StreamExt};
+use futures::{Future, StreamExt, select};
use twilight_model::gateway::event::DispatchEvent;
+use futures::FutureExt;
struct GatewayServer {}
impl Component for GatewayServer {
type Config = GatewayConfig;
const SERVICE_NAME: &'static str = "gateway";
- fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+ fn start(
+ &self,
+ settings: Settings<Self::Config>,
+ stop: oneshot::Receiver<()>,
+ ) -> AnyhowResultFuture<()> {
Box::pin(async move {
let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
.shard(settings.shard, settings.shard_total)?
@@ -29,34 +35,48 @@ impl Component for GatewayServer {
shard.start().await?;
- while let Some(event) = events.next().await {
- match event {
- Event::Ready(ready) => {
- info!("Logged in as {}", ready.user.name);
- }
+ let mut stop = stop.fuse();
+ loop {
- _ => {
- let name = event.kind().name();
- if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
- let data = CachePayload {
- tracing: Tracing {
- node_id: "".to_string(),
- span: None,
- },
- data: DispatchEventTagged {
- data: dispatch_event,
- },
- };
- let value = serde_json::to_string(&data)?;
- debug!("nats send: {}", value);
- let bytes = bytes::Bytes::from(value);
- nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
- .await?;
+ select! {
+ event = events.next().fuse() => {
+ if let Some(event) = event {
+ match event {
+ Event::Ready(ready) => {
+ info!("Logged in as {}", ready.user.name);
+ }
+
+ _ => {
+ let name = event.kind().name();
+ if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+ let data = CachePayload {
+ tracing: Tracing {
+ node_id: "".to_string(),
+ span: None,
+ },
+ data: DispatchEventTagged {
+ data: dispatch_event,
+ },
+ };
+ let value = serde_json::to_string(&data)?;
+ debug!("nats send: {}", value);
+ let bytes = bytes::Bytes::from(value);
+ nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+ .await?;
+ }
+ }
+ }
+ } else {
+ break
}
- }
- }
+ },
+ _ = stop => break
+ };
}
+ info!("stopping shard...");
+ shard.shutdown();
+
Ok(())
})
}
diff --git a/exes/rest/src/config.rs b/exes/rest/src/config.rs
index 9261de2..5c2698b 100644
--- a/exes/rest/src/config.rs
+++ b/exes/rest/src/config.rs
@@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use serde::Deserialize;
fn default_listening_address() -> SocketAddr {
- SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080))
+ SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080))
}
#[derive(Debug, Deserialize, Clone)]
diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs
index 8b0dd52..ea81ade 100644
--- a/exes/rest/src/handler.rs
+++ b/exes/rest/src/handler.rs
@@ -3,6 +3,7 @@ use std::{
convert::TryFrom,
hash::{Hash, Hasher},
str::FromStr,
+ time::Instant,
};
use anyhow::bail;
@@ -38,7 +39,7 @@ fn normalize_path(request_path: &str) -> (&str, &str) {
pub async fn handle_request(
client: Client<HttpsConnector<HttpConnector>, Body>,
ratelimiter: RemoteRatelimiter,
- token: String,
+ token: &str,
mut request: Request<Body>,
) -> Result<Response<Body>, anyhow::Error> {
let (hash, uri_string) = {
@@ -57,7 +58,7 @@ pub async fn handle_request(
let request_path = request.uri().path();
let (api_path, trimmed_path) = normalize_path(&request_path);
- let mut uri_string = format!("http://192.168.0.27:8000{}{}", api_path, trimmed_path);
+ let mut uri_string = format!("https://discord.com{}{}", api_path, trimmed_path);
if let Some(query) = request.uri().query() {
uri_string.push('?');
uri_string.push_str(query);
@@ -79,6 +80,7 @@ pub async fn handle_request(
(hash.finish().to_string(), uri_string)
};
+ let start_ticket_request = Instant::now();
let header_sender = match ratelimiter.ticket(hash).await {
Ok(sender) => sender,
Err(e) => {
@@ -86,6 +88,7 @@ pub async fn handle_request(
bail!("failed to reteive ticket");
}
};
+ let time_took_ticket = Instant::now() - start_ticket_request;
request.headers_mut().insert(
AUTHORIZATION,
@@ -106,9 +109,7 @@ pub async fn handle_request(
request.headers_mut().remove(AUTHORIZATION);
request.headers_mut().append(
AUTHORIZATION,
- HeaderValue::from_static(
- "Bot ODA3MTg4MzM1NzE3Mzg0MjEy.G3sXFM.8gY2sVYDAq2WuPWwDskAAEFLfTg8htooxME-LE",
- ),
+ HeaderValue::from_str(&format!("Bot {}", token))?,
);
let uri = match Uri::from_str(&uri_string) {
@@ -119,14 +120,26 @@ pub async fn handle_request(
}
};
*request.uri_mut() = uri;
- let resp = match client.request(request).await {
+
+ let start_upstream_req = Instant::now();
+ let mut resp = match client.request(request).await {
Ok(response) => response,
Err(e) => {
error!("Error when requesting the Discord API: {:?}", e);
bail!("failed to request the discord api");
}
};
+ let upstream_time_took = Instant::now() - start_upstream_req;
+ resp.headers_mut().append(
+ "X-TicketRequest-Ms",
+ HeaderValue::from_str(&time_took_ticket.as_millis().to_string()).unwrap(),
+ );
+ resp.headers_mut().append(
+ "X-Upstream-Ms",
+ HeaderValue::from_str(&upstream_time_took.as_millis().to_string()).unwrap(),
+ );
+
let ratelimit_headers = resp
.headers()
.into_iter()
diff --git a/exes/rest/src/main.rs b/exes/rest/src/main.rs
index 8d014ab..07d835c 100644
--- a/exes/rest/src/main.rs
+++ b/exes/rest/src/main.rs
@@ -9,7 +9,8 @@ use hyper::{
use hyper_tls::HttpsConnector;
use leash::{ignite, AnyhowResultFuture, Component};
use shared::config::Settings;
-use std::convert::Infallible;
+use std::{convert::Infallible, sync::Arc};
+use tokio::sync::oneshot;
mod config;
mod handler;
@@ -20,21 +21,29 @@ impl Component for ReverseProxyServer {
type Config = ReverseProxyConfig;
const SERVICE_NAME: &'static str = "rest";
- fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+ fn start(
+ &self,
+ settings: Settings<Self::Config>,
+ stop: oneshot::Receiver<()>,
+ ) -> AnyhowResultFuture<()> {
Box::pin(async move {
// Client to the remote ratelimiters
let ratelimiter = ratelimit_client::RemoteRatelimiter::new();
let client = Client::builder().build(HttpsConnector::new());
+ let token = Arc::new(settings.discord.token.clone());
let service_fn = make_service_fn(move |_: &AddrStream| {
let client = client.clone();
let ratelimiter = ratelimiter.clone();
+ let token = token.clone();
async move {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
let client = client.clone();
let ratelimiter = ratelimiter.clone();
+ let token = token.clone();
async move {
- handle_request(client, ratelimiter, "token".to_string(), request).await
+ let token = token.as_str();
+ handle_request(client, ratelimiter, token, request).await
}
}))
}
@@ -42,7 +51,11 @@ impl Component for ReverseProxyServer {
let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn);
- server.await?;
+ server
+ .with_graceful_shutdown(async {
+ stop.await.expect("should not fail");
+ })
+ .await?;
Ok(())
})
diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs
index 8263d15..87737dd 100644
--- a/exes/rest/src/ratelimit_client/mod.rs
+++ b/exes/rest/src/ratelimit_client/mod.rs
@@ -64,7 +64,7 @@ impl RemoteRatelimiter {
obj_clone.get_ratelimiters().await.unwrap();
tokio::select! {
() = &mut sleep => {
- println!("timer elapsed");
+ debug!("timer elapsed");
},
_ = tx.recv() => {}
}
diff --git a/exes/webhook/Cargo.toml b/exes/webhook/Cargo.toml
index 12a6608..589b5bd 100644
--- a/exes/webhook/Cargo.toml
+++ b/exes/webhook/Cargo.toml
@@ -17,6 +17,7 @@ lazy_static = "1.4.0"
ed25519-dalek = "1"
twilight-model = { version = "0.14" }
anyhow = "1.0.68"
+futures-util = "0.3.25"
[[bin]]
name = "webhook"
diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs
index 68f6a5f..e98de13 100644
--- a/exes/webhook/src/config.rs
+++ b/exes/webhook/src/config.rs
@@ -4,7 +4,7 @@ use ed25519_dalek::PublicKey;
use serde::{Deserialize, Deserializer};
fn default_listening_address() -> SocketAddr {
- SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080))
+ SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080))
}
#[derive(Debug, Deserialize, Clone, Copy)]
diff --git a/exes/webhook/src/main.rs b/exes/webhook/src/main.rs
index efd4147..0215e51 100644
--- a/exes/webhook/src/main.rs
+++ b/exes/webhook/src/main.rs
@@ -9,6 +9,7 @@ use crate::{
use hyper::Server;
use leash::{ignite, AnyhowResultFuture, Component};
use shared::{config::Settings, log::info, nats_crate::Client};
+use tokio::sync::oneshot;
#[derive(Clone, Copy)]
struct WebhookServer {}
@@ -17,7 +18,11 @@ impl Component for WebhookServer {
type Config = WebhookConfig;
const SERVICE_NAME: &'static str = "webhook";
- fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+ fn start(
+ &self,
+ settings: Settings<Self::Config>,
+ stop: oneshot::Receiver<()>,
+ ) -> AnyhowResultFuture<()> {
Box::pin(async move {
info!("Starting server on {}", settings.server.listening_adress);
@@ -33,7 +38,9 @@ impl Component for WebhookServer {
let server = Server::bind(&bind).serve(make_service);
- server.await?;
+ server.with_graceful_shutdown(async {
+ stop.await.expect("should not fail");
+ }).await?;
Ok(())
})