diff options
Diffstat (limited to 'webhook/src/handler/handler.rs')
| -rw-r--r-- | webhook/src/handler/handler.rs | 166 |
1 files changed, 103 insertions, 63 deletions
diff --git a/webhook/src/handler/handler.rs b/webhook/src/handler/handler.rs index b993aaa..4af2ba6 100644 --- a/webhook/src/handler/handler.rs +++ b/webhook/src/handler/handler.rs @@ -1,20 +1,32 @@ +use super::error::WebhookError; use super::{signature::validate_signature, types::Interaction}; use crate::config::Config; -use hyper::{Body, Method, Request, Response, StatusCode, body::{to_bytes, Bytes}, service::Service}; -use log::{error, info, trace}; -use nats::Connection; +use common::log::{debug, error, info}; +use common::nats_crate::Connection; +use hyper::{ + body::{to_bytes, Bytes}, + service::Service, + Body, Method, Request, Response, StatusCode, +}; use serde::{Deserialize, Serialize}; -use std::{future::Future, io::{Error, ErrorKind}, pin::Pin, str::from_utf8, sync::Arc, task::{Context, Poll}}; +use std::{ + future::Future, + pin::Pin, + str::from_utf8, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; /// Hyper service used to handle the discord webhooks #[derive(Clone)] pub struct HandlerService { - pub config: Config, + pub config: Arc<Config>, pub nats: Arc<Connection>, } impl HandlerService { - async fn check_request(&self, req: Request<Body>) -> Result<Bytes, Error> { + async fn check_request(&self, req: Request<Body>) -> Result<Bytes, WebhookError> { if req.method() == Method::POST { let headers = req.headers().clone(); let signature = headers.get("X-Signature-Ed25519"); @@ -30,25 +42,96 @@ impl HandlerService { ) { Ok(data) } else { - Err(Error::new( - ErrorKind::InvalidData, - "invalid signature specified", + Err(WebhookError::new( + StatusCode::UNAUTHORIZED, + "invalid signature", )) } } else { - Err(Error::new( - ErrorKind::BrokenPipe, + Err(WebhookError::new( + StatusCode::BAD_REQUEST, "failed to read signature", )) } } else { - Err(Error::new(ErrorKind::BrokenPipe, "unable to read body")) + Err(WebhookError::new( + StatusCode::BAD_REQUEST, + "unable to read body", + )) } } else { - Err(Error::new(ErrorKind::InvalidData, "missing headers")) + Err(WebhookError::new( + StatusCode::UNAUTHORIZED, + "missing signature headers", + )) } } else { - Err(Error::new(ErrorKind::InvalidData, "invalid method")) + Err(WebhookError::new(StatusCode::NOT_FOUND, "not found")) + } + } + + async fn process_request( + &mut self, + req: Request<Body>, + ) -> Result<Response<Body>, WebhookError> { + match self.check_request(req).await { + Ok(data) => { + let utf8 = from_utf8(&data); + match utf8 { + Ok(data) => match serde_json::from_str::<Interaction>(data) { + Ok(value) => { + if value.t == 1 { + info!("sending pong"); + // a ping must be responded with another ping + return Ok(Response::builder() + .header("Content-Type", "application/json") + .body(serde_json::to_string(&Ping { t: 1 }).unwrap().into()) + .unwrap()); + } else { + debug!("calling nats"); + // this should hopefully not fail ? + let payload = + serde_json::to_string(&common::payloads::CachePayload { + tracing: common::payloads::Tracing { + node_id: "".to_string(), + span: None, + }, + operation: "".to_string(), + data: value, + }) + .unwrap(); + + match self.nats.request_timeout( + "nova.cache.dispatch.interaction", + payload, + Duration::from_secs(2), + ) { + Ok(response) => Ok(Response::builder() + .header("Content-Type", "application/json") + .body(Body::from(response.data)) + .unwrap()), + + Err(error) => { + error!("failed to request nats: {}", error); + Err(WebhookError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "failed to request nats", + )) + } + } + } + } + + Err(_) => Err(WebhookError::new( + StatusCode::BAD_REQUEST, + "invalid json body", + )), + }, + + Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")), + } + } + Err(error) => Err(error), } } } @@ -56,7 +139,7 @@ impl HandlerService { #[derive(Debug, Serialize, Deserialize)] pub struct Ping { #[serde(rename = "type")] - t: i32 + t: i32, } /// Implementation of the service @@ -70,56 +153,13 @@ impl Service<Request<Body>> for HandlerService { } fn call(&mut self, req: Request<Body>) -> Self::Future { - let self_clone = self.clone(); - + let mut clone = self.clone(); Box::pin(async move { - match self_clone.check_request(req).await { - Ok(data) => { - let value: Interaction = serde_json::from_str(from_utf8(&data).unwrap()).unwrap(); - trace!("received value: {:?}", value); - - match value.t { - 1 => { - info!("sending pong"); - // a ping must be responded with another ping - return Ok(Response::builder().header("Content-Type", "application/json").body(serde_json::to_string(&Ping { - t: 1 - }).unwrap().into()).unwrap()); - }, - _ => { - let payload = serde_json::to_string(&common::payloads::CachePayload { - tracing: common::payloads::Tracing { - node_id: "".to_string(), - span: None, - }, - data: value, - }).unwrap(); + let response = clone.process_request(req).await; - match self_clone.nats.request("nova.cache.dispatch.interaction", payload) { - Ok(response) => { - Ok( - Response::builder() - .header("Content-Type", "application/json") - .body(from_utf8(&response.data).unwrap().to_string().into()) - .unwrap() - ) - }, - Err(error) => { - error!("failed to request nats: {}", error); - Ok( - Response::builder() - .status(500) - .body("an internal server error occured".to_string().into()) - .unwrap() - ) - } - } - }, - } - }, - Err(error) => { - Ok(Response::builder().status(StatusCode::UNAUTHORIZED).body(error.to_string().into()).unwrap()) - } + match response { + Ok(r) => Ok(r), + Err(e) => Ok(e.into()), } }) } |
