From: MatthieuCoder Date: Wed, 4 Jan 2023 15:36:27 +0000 (+0400) Subject: fix clippy warnings X-Git-Tag: v0.1~3 X-Git-Url: https://git.puffer.fish/?a=commitdiff_plain;h=408b524ba7d7c2b8b34131054d9f2498cc0d62ff;p=matthieu%2Fnova.git fix clippy warnings --- diff --git a/exes/all/build.rs b/exes/all/build.rs index c52ad11..192dffd 100644 --- a/exes/all/build.rs +++ b/exes/all/build.rs @@ -19,7 +19,7 @@ fn main() { ..Default::default() }; - cbindgen::generate_with_config(&crate_dir, config) + cbindgen::generate_with_config(crate_dir, config) .unwrap() - .write_to_file(&output_file); + .write_to_file(output_file); } diff --git a/exes/all/src/lib.rs b/exes/all/src/lib.rs index 29dd6a0..de83243 100644 --- a/exes/all/src/lib.rs +++ b/exes/all/src/lib.rs @@ -1,3 +1,5 @@ +#![allow(clippy::missing_safety_doc)] + extern crate libc; use anyhow::Result; use config::{Config, Environment, File}; diff --git a/exes/ratelimit/src/lib.rs b/exes/ratelimit/src/lib.rs index 6d6d608..345c37a 100644 --- a/exes/ratelimit/src/lib.rs +++ b/exes/ratelimit/src/lib.rs @@ -1,11 +1,11 @@ -use std::net::ToSocketAddrs; - use futures_util::FutureExt; use grpc::RLServer; use leash::{AnyhowResultFuture, Component}; use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer; use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter; -use shared::{config::Settings, redis_crate::Client}; +use shared::{config::Settings, redis_crate::aio::MultiplexedConnection}; +use std::future::Future; +use std::{net::ToSocketAddrs, pin::Pin}; use tokio::sync::oneshot; use tonic::transport::Server; @@ -23,9 +23,12 @@ impl Component for RatelimiterServerComponent { stop: oneshot::Receiver<()>, ) -> AnyhowResultFuture<()> { Box::pin(async move { - // let config = Arc::new(settings.config); - let redis: Client = settings.redis.into(); - let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis.into())); + let redis = Into::< + Pin> + Send>>, + >::into(settings.redis) + .await?; + + let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis)); Server::builder() .add_service(RatelimiterServer::new(server)) diff --git a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs index c759db9..a97d5a3 100644 --- a/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs +++ b/exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs @@ -1,5 +1,7 @@ use self::bucket::{Bucket, BucketQueueTask}; -use shared::redis_crate::{Client, Commands}; +use shared::redis_crate::aio::MultiplexedConnection; +use shared::redis_crate::{AsyncCommands}; +use tokio::sync::Mutex; use twilight_http_ratelimiting::ticket::{self, TicketNotifier}; use twilight_http_ratelimiting::GetTicketFuture; mod bucket; @@ -7,12 +9,12 @@ mod bucket; use futures_util::future; use std::{ collections::hash_map::{Entry, HashMap}, - sync::{Arc, Mutex}, + sync::Arc, time::Duration, }; #[derive(Debug)] -struct RedisLockPair(tokio::sync::Mutex); +struct RedisLockPair(Mutex); impl RedisLockPair { /// Set the global ratelimit as exhausted. @@ -26,27 +28,28 @@ impl RedisLockPair { 1, (duration.as_secs() + 1).try_into().unwrap(), ) + .await .unwrap(); } pub async fn is_locked(&self) -> bool { - self.0.lock().await.exists("nova:rls:lock").unwrap() + self.0.lock().await.exists("nova:rls:lock").await.unwrap() } } #[derive(Clone, Debug)] pub struct RedisGlobalLocalBucketRatelimiter { - buckets: Arc>>>, + buckets: Arc>>>, global: Arc, } impl RedisGlobalLocalBucketRatelimiter { #[must_use] - pub fn new(redis: tokio::sync::Mutex) -> Self { + pub fn new(redis: MultiplexedConnection) -> Self { Self { buckets: Arc::default(), - global: Arc::new(RedisLockPair(redis)), + global: Arc::new(RedisLockPair(Mutex::new(redis))), } } diff --git a/exes/rest/src/handler.rs b/exes/rest/src/handler.rs index ea81ade..3828154 100644 --- a/exes/rest/src/handler.rs +++ b/exes/rest/src/handler.rs @@ -56,7 +56,7 @@ pub async fn handle_request( }; let request_path = request.uri().path(); - let (api_path, trimmed_path) = normalize_path(&request_path); + let (api_path, trimmed_path) = normalize_path(request_path); let mut uri_string = format!("https://discord.com{}{}", api_path, trimmed_path); if let Some(query) = request.uri().query() { diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index afaf2b7..ea34ad9 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -27,13 +27,21 @@ impl Drop for RemoteRatelimiter { } } +type IssueTicket = Pin< + Box< + dyn Future>>> + + Send + + 'static, + >, +>; + impl RemoteRatelimiter { async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> { // get list of dns responses /*let responses = dns_lookup::lookup_host("localhost") - .unwrap() - .into_iter() - .map(|f| f.to_string());*/ + .unwrap() + .into_iter() + .map(|f| f.to_string());*/ let mut write = self.remotes.write().await; @@ -42,7 +50,7 @@ impl RemoteRatelimiter { write.add(a.clone()); } - return Ok(()); + Ok(()) } #[must_use] @@ -74,16 +82,7 @@ impl RemoteRatelimiter { obj } - pub fn ticket( - &self, - path: String, - ) -> Pin< - Box< - dyn Future>>> - + Send - + 'static, - >, - > { + pub fn ticket(&self, path: String) -> IssueTicket { let remotes = self.remotes.clone(); let (tx, rx) = oneshot::channel::>(); diff --git a/exes/webhook/src/config.rs b/exes/webhook/src/config.rs index 56a9b78..d1b3fb6 100644 --- a/exes/webhook/src/config.rs +++ b/exes/webhook/src/config.rs @@ -25,7 +25,7 @@ where D: Deserializer<'de>, { let str = String::deserialize(deserializer)?; - let public_key = PublicKey::from_bytes(&hex::decode(&str).unwrap()).unwrap(); + let public_key = PublicKey::from_bytes(&hex::decode(str).unwrap()).unwrap(); Ok(public_key) } diff --git a/exes/webhook/src/handler/error.rs b/exes/webhook/src/handler/error.rs index d4fee07..ffa4cca 100644 --- a/exes/webhook/src/handler/error.rs +++ b/exes/webhook/src/handler/error.rs @@ -14,11 +14,11 @@ impl WebhookError { } } -impl Into> for WebhookError { - fn into(self) -> Response { +impl From for Response { + fn from(value: WebhookError) -> Self { Response::builder() - .status(self.code) - .body(self.message.into()) + .status(value.code) + .body(value.message.into()) .unwrap() } } diff --git a/exes/webhook/src/handler/handler.rs b/exes/webhook/src/handler/handler.rs deleted file mode 100644 index 896e43f..0000000 --- a/exes/webhook/src/handler/handler.rs +++ /dev/null @@ -1,169 +0,0 @@ -use super::error::WebhookError; -use super::signature::validate_signature; -use crate::config::WebhookConfig; -use ed25519_dalek::PublicKey; -use hyper::{ - body::{to_bytes, Bytes}, - service::Service, - Body, Method, Request, Response, StatusCode, -}; -use shared::nats_crate::Client; -use shared::{ - log::{debug, error}, - payloads::{CachePayload, DispatchEventTagged, Tracing}, -}; -use std::{ - future::Future, - pin::Pin, - str::from_utf8, - task::{Context, Poll}, -}; -use twilight_model::gateway::event::DispatchEvent; -use twilight_model::{ - application::interaction::{Interaction, InteractionType}, - gateway::payload::incoming::InteractionCreate, -}; - -/// Hyper service used to handle the discord webhooks -#[derive(Clone)] -pub struct WebhookService { - pub config: WebhookConfig, - pub nats: Client, -} - -impl WebhookService { - async fn check_request(req: Request, pk: PublicKey) -> Result { - if req.method() == Method::POST { - let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") { - sig.to_owned() - } else { - return Err(WebhookError::new( - StatusCode::BAD_REQUEST, - "missing signature header", - )); - }; - - let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") { - timestamp.to_owned() - } else { - return Err(WebhookError::new( - StatusCode::BAD_REQUEST, - "missing timestamp header", - )); - }; - let data = to_bytes(req.into_body()).await?; - - if validate_signature( - &pk, - &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(), - signature.to_str()?, - ) { - Ok(data) - } else { - Err(WebhookError::new( - StatusCode::UNAUTHORIZED, - "invalid signature", - )) - } - } else { - Err(WebhookError::new(StatusCode::NOT_FOUND, "not found")) - } - } - - async fn process_request( - req: Request, - nats: Client, - pk: PublicKey, - ) -> Result, WebhookError> { - match Self::check_request(req, pk).await { - Ok(data) => { - let utf8 = from_utf8(&data); - match utf8 { - Ok(data) => match serde_json::from_str::(data) { - Ok(value) => { - match value.kind { - InteractionType::Ping => Ok(Response::builder() - .header("Content-Type", "application/json") - .body(r#"{"type":1}"#.into()) - .unwrap()), - _ => { - debug!("calling nats"); - // this should hopefully not fail ? - - let data = CachePayload { - tracing: Tracing { - node_id: "".to_string(), - span: None, - }, - data: DispatchEventTagged { - data: DispatchEvent::InteractionCreate(Box::new( - InteractionCreate(value), - )), - }, - }; - - let payload = serde_json::to_string(&data).unwrap(); - - match nats - .request( - "nova.cache.dispatch.INTERACTION_CREATE".to_string(), - Bytes::from(payload), - ) - .await - { - Ok(response) => Ok(Response::builder() - .header("Content-Type", "application/json") - .body(Body::from(response.payload)) - .unwrap()), - - Err(error) => { - error!("failed to request nats: {}", error); - Err(WebhookError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "failed to request nats", - )) - } - } - } - } - } - - Err(error) => { - error!("invalid json body: {}", error); - Err(WebhookError::new( - StatusCode::BAD_REQUEST, - "invalid json body", - )) - } - }, - - Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")), - } - } - Err(error) => Err(error), - } - } -} - -/// Implementation of the service -impl Service> for WebhookService { - type Response = hyper::Response; - type Error = hyper::Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - let future = Self::process_request(req, self.nats.clone(), self.config.discord.public_key); - Box::pin(async move { - let response = future.await; - - match response { - Ok(r) => Ok(r), - Err(e) => Ok(e.into()), - } - }) - } -} diff --git a/exes/webhook/src/handler/mod.rs b/exes/webhook/src/handler/mod.rs index e4cf35a..3ef859e 100644 --- a/exes/webhook/src/handler/mod.rs +++ b/exes/webhook/src/handler/mod.rs @@ -1,7 +1,176 @@ +use crate::config::WebhookConfig; +use ed25519_dalek::PublicKey; +use error::WebhookError; +use hyper::{ + body::{to_bytes, Bytes}, + service::Service, + Body, Method, Request, Response, StatusCode, +}; +use shared::nats_crate::Client; +use shared::{ + log::{debug, error}, + payloads::{CachePayload, DispatchEventTagged, Tracing}, +}; +use signature::validate_signature; +use std::{ + future::Future, + pin::Pin, + str::from_utf8, + task::{Context, Poll}, +}; +use twilight_model::gateway::event::DispatchEvent; +use twilight_model::{ + application::interaction::{Interaction, InteractionType}, + gateway::payload::incoming::InteractionCreate, +}; + mod error; -pub mod handler; pub mod make_service; mod signature; #[cfg(test)] pub mod tests; + +/// Hyper service used to handle the discord webhooks +#[derive(Clone)] +pub struct WebhookService { + pub config: WebhookConfig, + pub nats: Client, +} + +impl WebhookService { + async fn check_request(req: Request, pk: PublicKey) -> Result { + if req.method() == Method::POST { + let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") { + sig.to_owned() + } else { + return Err(WebhookError::new( + StatusCode::BAD_REQUEST, + "missing signature header", + )); + }; + + let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") { + timestamp.to_owned() + } else { + return Err(WebhookError::new( + StatusCode::BAD_REQUEST, + "missing timestamp header", + )); + }; + let data = to_bytes(req.into_body()).await?; + + if validate_signature( + &pk, + &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(), + signature.to_str()?, + ) { + Ok(data) + } else { + Err(WebhookError::new( + StatusCode::UNAUTHORIZED, + "invalid signature", + )) + } + } else { + Err(WebhookError::new(StatusCode::NOT_FOUND, "not found")) + } + } + + async fn process_request( + req: Request, + nats: Client, + pk: PublicKey, + ) -> Result, WebhookError> { + match Self::check_request(req, pk).await { + Ok(data) => { + let utf8 = from_utf8(&data); + match utf8 { + Ok(data) => match serde_json::from_str::(data) { + Ok(value) => { + match value.kind { + InteractionType::Ping => Ok(Response::builder() + .header("Content-Type", "application/json") + .body(r#"{"type":1}"#.into()) + .unwrap()), + _ => { + debug!("calling nats"); + // this should hopefully not fail ? + + let data = CachePayload { + tracing: Tracing { + node_id: "".to_string(), + span: None, + }, + data: DispatchEventTagged { + data: DispatchEvent::InteractionCreate(Box::new( + InteractionCreate(value), + )), + }, + }; + + let payload = serde_json::to_string(&data).unwrap(); + + match nats + .request( + "nova.cache.dispatch.INTERACTION_CREATE".to_string(), + Bytes::from(payload), + ) + .await + { + Ok(response) => Ok(Response::builder() + .header("Content-Type", "application/json") + .body(Body::from(response.payload)) + .unwrap()), + + Err(error) => { + error!("failed to request nats: {}", error); + Err(WebhookError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "failed to request nats", + )) + } + } + } + } + } + + Err(error) => { + error!("invalid json body: {}", error); + Err(WebhookError::new( + StatusCode::BAD_REQUEST, + "invalid json body", + )) + } + }, + + Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")), + } + } + Err(error) => Err(error), + } + } +} + +/// Implementation of the service +impl Service> for WebhookService { + type Response = hyper::Response; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + let future = Self::process_request(req, self.nats.clone(), self.config.discord.public_key); + Box::pin(async move { + let response = future.await; + + match response { + Ok(r) => Ok(r), + Err(e) => Ok(e.into()), + } + }) + } +} diff --git a/exes/webhook/src/handler/signature.rs b/exes/webhook/src/handler/signature.rs index 3dc4373..fc5555f 100644 --- a/exes/webhook/src/handler/signature.rs +++ b/exes/webhook/src/handler/signature.rs @@ -23,7 +23,7 @@ fn demo(v: Vec) -> [T; N] { .unwrap_or_else(|v: Vec| panic!("Expected a Vec of length {} but it was {}", N, v.len())) } -pub fn validate_signature(public_key: &PublicKey, data: &Vec, hex_signature: &str) -> bool { +pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool { SIGNATURE_COUNTER.inc(); let timer = SIGNATURE_TIME_HISTOGRAM.with_label_values(&["webhook_main"]).start_timer(); diff --git a/exes/webhook/src/lib.rs b/exes/webhook/src/lib.rs index 13a4e60..43ab9c4 100644 --- a/exes/webhook/src/lib.rs +++ b/exes/webhook/src/lib.rs @@ -4,7 +4,7 @@ use std::{future::Future, pin::Pin}; use crate::{ config::WebhookConfig, - handler::{handler::WebhookService, make_service::MakeSvc}, + handler::{make_service::MakeSvc, WebhookService}, }; use hyper::Server; use leash::{AnyhowResultFuture, Component}; @@ -28,10 +28,10 @@ impl Component for WebhookServer { let bind = settings.server.listening_adress; info!("NAts connected!"); - let nats = - Into::> + Send>>>::into(settings.nats) - - .await?; + let nats = Into::> + Send>>>::into( + settings.nats, + ) + .await?; let make_service = MakeSvc::new(WebhookService { config: settings.config, @@ -40,9 +40,11 @@ impl Component for WebhookServer { let server = Server::bind(&bind).serve(make_service); - server.with_graceful_shutdown(async { - stop.await.expect("should not fail"); - }).await?; + server + .with_graceful_shutdown(async { + stop.await.expect("should not fail"); + }) + .await?; Ok(()) }) @@ -51,4 +53,4 @@ impl Component for WebhookServer { fn new() -> Self { Self {} } -} \ No newline at end of file +} diff --git a/libs/shared/src/config.rs b/libs/shared/src/config.rs index 4387dfb..ab584a2 100644 --- a/libs/shared/src/config.rs +++ b/libs/shared/src/config.rs @@ -13,7 +13,7 @@ pub struct Settings { pub redis: crate::redis::RedisConfiguration, } -impl<'de, T: Clone + DeserializeOwned + Default> Settings +impl Settings { pub fn new(service_name: &str) -> Result, GenericError> { let mut builder = Config::builder(); diff --git a/libs/shared/src/redis.rs b/libs/shared/src/redis.rs index 5753fb6..a623c2f 100644 --- a/libs/shared/src/redis.rs +++ b/libs/shared/src/redis.rs @@ -7,15 +7,8 @@ pub struct RedisConfiguration { pub url: String, } -// Allows the configuration to directly create a nats connection -impl Into for RedisConfiguration { - fn into(self) -> Client { - redis::Client::open(self.url).unwrap() - } -} - impl From - for Pin>>> + for Pin> + Send>> { fn from(value: RedisConfiguration) -> Self { Box::pin(async move {