summaryrefslogtreecommitdiff
path: root/webhook/src/handler/handler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'webhook/src/handler/handler.rs')
-rw-r--r--webhook/src/handler/handler.rs147
1 files changed, 82 insertions, 65 deletions
diff --git a/webhook/src/handler/handler.rs b/webhook/src/handler/handler.rs
index b3dc8a6..bcce81d 100644
--- a/webhook/src/handler/handler.rs
+++ b/webhook/src/handler/handler.rs
@@ -1,14 +1,22 @@
+use super::error::WebhookError;
use super::{signature::validate_signature, types::Interaction};
use crate::config::Config;
+use common::log::{debug, error, info};
+use common::nats_crate::Connection;
use hyper::{
body::{to_bytes, Bytes},
service::Service,
Body, Method, Request, Response, StatusCode,
};
-use common::log::{error, info, trace};
-use common::nats_crate::Connection;
use serde::{Deserialize, Serialize};
-use std::{future::Future, io::{Error, ErrorKind}, pin::Pin, str::from_utf8, sync::Arc, task::{Context, Poll}, time::Duration};
+use std::{
+ future::Future,
+ pin::Pin,
+ str::from_utf8,
+ sync::Arc,
+ task::{Context, Poll},
+ time::Duration,
+};
/// Hyper service used to handle the discord webhooks
#[derive(Clone)]
@@ -18,7 +26,7 @@ pub struct HandlerService {
}
impl HandlerService {
- async fn check_request(&self, req: Request<Body>) -> Result<Bytes, Error> {
+ async fn check_request(&self, req: Request<Body>) -> Result<Bytes, WebhookError> {
if req.method() == Method::POST {
let headers = req.headers().clone();
let signature = headers.get("X-Signature-Ed25519");
@@ -34,25 +42,79 @@ impl HandlerService {
) {
Ok(data)
} else {
- Err(Error::new(
- ErrorKind::InvalidData,
- "invalid signature specified",
- ))
+ Err(WebhookError::new(StatusCode::UNAUTHORIZED, "invalid signature"))
}
} else {
- Err(Error::new(
- ErrorKind::BrokenPipe,
- "failed to read signature",
- ))
+ Err(WebhookError::new(StatusCode::BAD_REQUEST, "failed to read signature"))
}
} else {
- Err(Error::new(ErrorKind::BrokenPipe, "unable to read body"))
+ Err(WebhookError::new(StatusCode::BAD_REQUEST, "unable to read body"))
}
} else {
- Err(Error::new(ErrorKind::InvalidData, "missing headers"))
+ Err(WebhookError::new(StatusCode::UNAUTHORIZED, "missing signature headers"))
}
} else {
- Err(Error::new(ErrorKind::InvalidData, "invalid method"))
+ Err(WebhookError::new(StatusCode::NOT_FOUND, "not found"))
+ }
+ }
+
+
+ async fn process_request(&mut self, req: Request<Body>) -> Result<Response<Body>, WebhookError> {
+ match self.check_request(req).await {
+ Ok(data) => {
+ let utf8 = from_utf8(&data);
+ match utf8 {
+ Ok(data) => match serde_json::from_str::<Interaction>(data) {
+ Ok(value) => match value.t {
+ 1 => {
+ info!("sending pong");
+ // a ping must be responded with another ping
+ return Ok(Response::builder()
+ .header("Content-Type", "application/json")
+ .body(serde_json::to_string(&Ping { t: 1 }).unwrap().into())
+ .unwrap());
+ }
+ _ => {
+ debug!("calling nats");
+ // this should hopefully not fail ?
+ let payload =
+ serde_json::to_string(&common::payloads::CachePayload {
+ tracing: common::payloads::Tracing {
+ node_id: "".to_string(),
+ span: None,
+ },
+ operation: "".to_string(),
+ data: value,
+ })
+ .unwrap();
+
+ match self.nats.request_timeout(
+ "nova.cache.dispatch.interaction",
+ payload,
+ Duration::from_secs(2),
+ ) {
+ Ok(response) => Ok(Response::builder()
+ .header("Content-Type", "application/json")
+ .body(
+ Body::from(response.data)
+ )
+ .unwrap()),
+
+ Err(error) => {
+ error!("failed to request nats: {}", error);
+ Err(WebhookError::new(StatusCode::INTERNAL_SERVER_ERROR, "failed to request nats"))
+ }
+ }
+ }
+ },
+
+ Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "invalid json body")),
+ },
+
+ Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")),
+ }
+ }
+ Err(error) => Err(error),
}
}
}
@@ -74,58 +136,13 @@ impl Service<Request<Body>> for HandlerService {
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
- let self_clone = self.clone();
-
+ let mut clone = self.clone();
Box::pin(async move {
- match self_clone.check_request(req).await {
- Ok(data) => {
- let value: Interaction =
- serde_json::from_str(from_utf8(&data).unwrap()).unwrap();
- trace!("received value: {:?}", value);
-
- match value.t {
- 1 => {
- info!("sending pong");
- // a ping must be responded with another ping
- return Ok(Response::builder()
- .header("Content-Type", "application/json")
- .body(serde_json::to_string(&Ping { t: 1 }).unwrap().into())
- .unwrap());
- }
- _ => {
- let payload = serde_json::to_string(&common::payloads::CachePayload {
- tracing: common::payloads::Tracing {
- node_id: "".to_string(),
- span: None,
- },
- operation: "".to_string(),
- data: value,
- })
- .unwrap();
+ let response = clone.process_request(req).await;
- match self_clone
- .nats
- .request_timeout("nova.cache.dispatch.interaction", payload, Duration::from_secs(2))
- {
- Ok(response) => Ok(Response::builder()
- .header("Content-Type", "application/json")
- .body(from_utf8(&response.data).unwrap().to_string().into())
- .unwrap()),
- Err(error) => {
- error!("failed to request nats: {}", error);
- Ok(Response::builder()
- .status(500)
- .body("an internal server error occured".to_string().into())
- .unwrap())
- }
- }
- }
- }
- }
- Err(error) => Ok(Response::builder()
- .status(StatusCode::UNAUTHORIZED)
- .body(error.to_string().into())
- .unwrap()),
+ match response {
+ Ok(r) => Ok(r),
+ Err(e) => Ok(e.into())
}
})
}