summaryrefslogtreecommitdiff
path: root/webhook/src/handler/handler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'webhook/src/handler/handler.rs')
-rw-r--r--webhook/src/handler/handler.rs166
1 files changed, 103 insertions, 63 deletions
diff --git a/webhook/src/handler/handler.rs b/webhook/src/handler/handler.rs
index b993aaa..4af2ba6 100644
--- a/webhook/src/handler/handler.rs
+++ b/webhook/src/handler/handler.rs
@@ -1,20 +1,32 @@
+use super::error::WebhookError;
use super::{signature::validate_signature, types::Interaction};
use crate::config::Config;
-use hyper::{Body, Method, Request, Response, StatusCode, body::{to_bytes, Bytes}, service::Service};
-use log::{error, info, trace};
-use nats::Connection;
+use common::log::{debug, error, info};
+use common::nats_crate::Connection;
+use hyper::{
+ body::{to_bytes, Bytes},
+ service::Service,
+ Body, Method, Request, Response, StatusCode,
+};
use serde::{Deserialize, Serialize};
-use std::{future::Future, io::{Error, ErrorKind}, pin::Pin, str::from_utf8, sync::Arc, task::{Context, Poll}};
+use std::{
+ future::Future,
+ pin::Pin,
+ str::from_utf8,
+ sync::Arc,
+ task::{Context, Poll},
+ time::Duration,
+};
/// Hyper service used to handle the discord webhooks
#[derive(Clone)]
pub struct HandlerService {
- pub config: Config,
+ pub config: Arc<Config>,
pub nats: Arc<Connection>,
}
impl HandlerService {
- async fn check_request(&self, req: Request<Body>) -> Result<Bytes, Error> {
+ async fn check_request(&self, req: Request<Body>) -> Result<Bytes, WebhookError> {
if req.method() == Method::POST {
let headers = req.headers().clone();
let signature = headers.get("X-Signature-Ed25519");
@@ -30,25 +42,96 @@ impl HandlerService {
) {
Ok(data)
} else {
- Err(Error::new(
- ErrorKind::InvalidData,
- "invalid signature specified",
+ Err(WebhookError::new(
+ StatusCode::UNAUTHORIZED,
+ "invalid signature",
))
}
} else {
- Err(Error::new(
- ErrorKind::BrokenPipe,
+ Err(WebhookError::new(
+ StatusCode::BAD_REQUEST,
"failed to read signature",
))
}
} else {
- Err(Error::new(ErrorKind::BrokenPipe, "unable to read body"))
+ Err(WebhookError::new(
+ StatusCode::BAD_REQUEST,
+ "unable to read body",
+ ))
}
} else {
- Err(Error::new(ErrorKind::InvalidData, "missing headers"))
+ Err(WebhookError::new(
+ StatusCode::UNAUTHORIZED,
+ "missing signature headers",
+ ))
}
} else {
- Err(Error::new(ErrorKind::InvalidData, "invalid method"))
+ Err(WebhookError::new(StatusCode::NOT_FOUND, "not found"))
+ }
+ }
+
+ async fn process_request(
+ &mut self,
+ req: Request<Body>,
+ ) -> Result<Response<Body>, WebhookError> {
+ match self.check_request(req).await {
+ Ok(data) => {
+ let utf8 = from_utf8(&data);
+ match utf8 {
+ Ok(data) => match serde_json::from_str::<Interaction>(data) {
+ Ok(value) => {
+ if value.t == 1 {
+ info!("sending pong");
+ // a ping must be responded with another ping
+ return Ok(Response::builder()
+ .header("Content-Type", "application/json")
+ .body(serde_json::to_string(&Ping { t: 1 }).unwrap().into())
+ .unwrap());
+ } else {
+ debug!("calling nats");
+ // this should hopefully not fail ?
+ let payload =
+ serde_json::to_string(&common::payloads::CachePayload {
+ tracing: common::payloads::Tracing {
+ node_id: "".to_string(),
+ span: None,
+ },
+ operation: "".to_string(),
+ data: value,
+ })
+ .unwrap();
+
+ match self.nats.request_timeout(
+ "nova.cache.dispatch.interaction",
+ payload,
+ Duration::from_secs(2),
+ ) {
+ Ok(response) => Ok(Response::builder()
+ .header("Content-Type", "application/json")
+ .body(Body::from(response.data))
+ .unwrap()),
+
+ Err(error) => {
+ error!("failed to request nats: {}", error);
+ Err(WebhookError::new(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "failed to request nats",
+ ))
+ }
+ }
+ }
+ }
+
+ Err(_) => Err(WebhookError::new(
+ StatusCode::BAD_REQUEST,
+ "invalid json body",
+ )),
+ },
+
+ Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")),
+ }
+ }
+ Err(error) => Err(error),
}
}
}
@@ -56,7 +139,7 @@ impl HandlerService {
#[derive(Debug, Serialize, Deserialize)]
pub struct Ping {
#[serde(rename = "type")]
- t: i32
+ t: i32,
}
/// Implementation of the service
@@ -70,56 +153,13 @@ impl Service<Request<Body>> for HandlerService {
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
- let self_clone = self.clone();
-
+ let mut clone = self.clone();
Box::pin(async move {
- match self_clone.check_request(req).await {
- Ok(data) => {
- let value: Interaction = serde_json::from_str(from_utf8(&data).unwrap()).unwrap();
- trace!("received value: {:?}", value);
-
- match value.t {
- 1 => {
- info!("sending pong");
- // a ping must be responded with another ping
- return Ok(Response::builder().header("Content-Type", "application/json").body(serde_json::to_string(&Ping {
- t: 1
- }).unwrap().into()).unwrap());
- },
- _ => {
- let payload = serde_json::to_string(&common::payloads::CachePayload {
- tracing: common::payloads::Tracing {
- node_id: "".to_string(),
- span: None,
- },
- data: value,
- }).unwrap();
+ let response = clone.process_request(req).await;
- match self_clone.nats.request("nova.cache.dispatch.interaction", payload) {
- Ok(response) => {
- Ok(
- Response::builder()
- .header("Content-Type", "application/json")
- .body(from_utf8(&response.data).unwrap().to_string().into())
- .unwrap()
- )
- },
- Err(error) => {
- error!("failed to request nats: {}", error);
- Ok(
- Response::builder()
- .status(500)
- .body("an internal server error occured".to_string().into())
- .unwrap()
- )
- }
- }
- },
- }
- },
- Err(error) => {
- Ok(Response::builder().status(StatusCode::UNAUTHORIZED).body(error.to_string().into()).unwrap())
- }
+ match response {
+ Ok(r) => Ok(r),
+ Err(e) => Ok(e.into()),
}
})
}