]> git.puffer.fish Git - matthieu/nova.git/commitdiff
fix clippy warnings
authorMatthieuCoder <matthieu@matthieu-dev.xyz>
Wed, 4 Jan 2023 15:36:27 +0000 (19:36 +0400)
committerMatthieuCoder <matthieu@matthieu-dev.xyz>
Wed, 4 Jan 2023 15:36:27 +0000 (19:36 +0400)
14 files changed:
exes/all/build.rs
exes/all/src/lib.rs
exes/ratelimit/src/lib.rs
exes/ratelimit/src/redis_global_local_bucket_ratelimiter/mod.rs
exes/rest/src/handler.rs
exes/rest/src/ratelimit_client/mod.rs
exes/webhook/src/config.rs
exes/webhook/src/handler/error.rs
exes/webhook/src/handler/handler.rs [deleted file]
exes/webhook/src/handler/mod.rs
exes/webhook/src/handler/signature.rs
exes/webhook/src/lib.rs
libs/shared/src/config.rs
libs/shared/src/redis.rs

index c52ad11333d796bd41cda4836790617f81e6cf6d..192dffd960189411d28077e9b3f9327cdc71d588 100644 (file)
@@ -19,7 +19,7 @@ fn main() {
         ..Default::default()
     };
 
-    cbindgen::generate_with_config(&crate_dir, config)
+    cbindgen::generate_with_config(crate_dir, config)
       .unwrap()
-      .write_to_file(&output_file);
+      .write_to_file(output_file);
 }
index 29dd6a0c292e19770fc19bbdd6ac678d162fff68..de83243b5863176670a731d5059abfb2272f62d3 100644 (file)
@@ -1,3 +1,5 @@
+#![allow(clippy::missing_safety_doc)]
+
 extern crate libc;
 use anyhow::Result;
 use config::{Config, Environment, File};
index 6d6d608dc8a79336f747ca149ffac37899168181..345c37a456cd11f5a45ce68ac70554f5b7bd2ab6 100644 (file)
@@ -1,11 +1,11 @@
-use std::net::ToSocketAddrs;
-
 use futures_util::FutureExt;
 use grpc::RLServer;
 use leash::{AnyhowResultFuture, Component};
 use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer;
 use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter;
-use shared::{config::Settings, redis_crate::Client};
+use shared::{config::Settings, redis_crate::aio::MultiplexedConnection};
+use std::future::Future;
+use std::{net::ToSocketAddrs, pin::Pin};
 use tokio::sync::oneshot;
 use tonic::transport::Server;
 
@@ -23,9 +23,12 @@ impl Component for RatelimiterServerComponent {
         stop: oneshot::Receiver<()>,
     ) -> AnyhowResultFuture<()> {
         Box::pin(async move {
-            // let config = Arc::new(settings.config);
-            let redis: Client = settings.redis.into();
-            let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis.into()));
+            let redis = Into::<
+                Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>> + Send>>,
+            >::into(settings.redis)
+            .await?;
+
+            let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis));
 
             Server::builder()
                 .add_service(RatelimiterServer::new(server))
index c759db9fbaeb6c5bd729b63d32dd3b3d30f54256..a97d5a3d2b5d5e1361c8234cf1eb5bb91448ad70 100644 (file)
@@ -1,5 +1,7 @@
 use self::bucket::{Bucket, BucketQueueTask};
-use shared::redis_crate::{Client, Commands};
+use shared::redis_crate::aio::MultiplexedConnection;
+use shared::redis_crate::{AsyncCommands};
+use tokio::sync::Mutex;
 use twilight_http_ratelimiting::ticket::{self, TicketNotifier};
 use twilight_http_ratelimiting::GetTicketFuture;
 mod bucket;
@@ -7,12 +9,12 @@ mod bucket;
 use futures_util::future;
 use std::{
     collections::hash_map::{Entry, HashMap},
-    sync::{Arc, Mutex},
+    sync::Arc,
     time::Duration,
 };
 
 #[derive(Debug)]
-struct RedisLockPair(tokio::sync::Mutex<Client>);
+struct RedisLockPair(Mutex<MultiplexedConnection>);
 
 impl RedisLockPair {
     /// Set the global ratelimit as exhausted.
@@ -26,27 +28,28 @@ impl RedisLockPair {
                 1,
                 (duration.as_secs() + 1).try_into().unwrap(),
             )
+            .await
             .unwrap();
     }
 
     pub async fn is_locked(&self) -> bool {
-        self.0.lock().await.exists("nova:rls:lock").unwrap()
+        self.0.lock().await.exists("nova:rls:lock").await.unwrap()
     }
 }
 
 #[derive(Clone, Debug)]
 pub struct RedisGlobalLocalBucketRatelimiter {
-    buckets: Arc<Mutex<HashMap<String, Arc<Bucket>>>>,
+    buckets: Arc<std::sync::Mutex<HashMap<String, Arc<Bucket>>>>,
 
     global: Arc<RedisLockPair>,
 }
 
 impl RedisGlobalLocalBucketRatelimiter {
     #[must_use]
-    pub fn new(redis: tokio::sync::Mutex<Client>) -> Self {
+    pub fn new(redis: MultiplexedConnection) -> Self {
         Self {
             buckets: Arc::default(),
-            global: Arc::new(RedisLockPair(redis)),
+            global: Arc::new(RedisLockPair(Mutex::new(redis))),
         }
     }
 
index ea81ade96975bb817491b154b6841d4130143b36..3828154130427992af237130770da6431244dcc7 100644 (file)
@@ -56,7 +56,7 @@ pub async fn handle_request(
         };
 
         let request_path = request.uri().path();
-        let (api_path, trimmed_path) = normalize_path(&request_path);
+        let (api_path, trimmed_path) = normalize_path(request_path);
 
         let mut uri_string = format!("https://discord.com{}{}", api_path, trimmed_path);
         if let Some(query) = request.uri().query() {
index afaf2b7446ff673da2e895897188f18f73e018fb..ea34ad96e42f92774c74ca8ffe3f1bc9c86d5e8a 100644 (file)
@@ -27,13 +27,21 @@ impl Drop for RemoteRatelimiter {
     }
 }
 
+type IssueTicket = Pin<
+    Box<
+        dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>>
+            + Send
+            + 'static,
+    >,
+>;
+
 impl RemoteRatelimiter {
     async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> {
         // get list of dns responses
         /*let responses = dns_lookup::lookup_host("localhost")
-            .unwrap()
-            .into_iter()
-            .map(|f| f.to_string());*/
+        .unwrap()
+        .into_iter()
+        .map(|f| f.to_string());*/
 
         let mut write = self.remotes.write().await;
 
@@ -42,7 +50,7 @@ impl RemoteRatelimiter {
             write.add(a.clone());
         }
 
-        return Ok(());
+        Ok(())
     }
 
     #[must_use]
@@ -74,16 +82,7 @@ impl RemoteRatelimiter {
         obj
     }
 
-    pub fn ticket(
-        &self,
-        path: String,
-    ) -> Pin<
-        Box<
-            dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>>
-                + Send
-                + 'static,
-        >,
-    > {
+    pub fn ticket(&self, path: String) -> IssueTicket {
         let remotes = self.remotes.clone();
         let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
 
index 56a9b784fffd6cdb2870a8cfd2ef07de7de4b39d..d1b3fb616c7ebf8abe68b5a78262aadef8117bc5 100644 (file)
@@ -25,7 +25,7 @@ where
     D: Deserializer<'de>,
 {
     let str = String::deserialize(deserializer)?;
-    let public_key = PublicKey::from_bytes(&hex::decode(&str).unwrap()).unwrap();
+    let public_key = PublicKey::from_bytes(&hex::decode(str).unwrap()).unwrap();
     Ok(public_key)
 }
 
index d4fee0704527d6d9fe0fce46c2859f934b92adbd..ffa4cca1249c1a41a14cef67282056a41bb086b6 100644 (file)
@@ -14,11 +14,11 @@ impl WebhookError {
     }
 }
 
-impl Into<Response<Body>> for WebhookError {
-    fn into(self) -> Response<Body> {
+impl From<WebhookError> for Response<Body> {
+    fn from(value: WebhookError) -> Self {
         Response::builder()
-            .status(self.code)
-            .body(self.message.into())
+            .status(value.code)
+            .body(value.message.into())
             .unwrap()
     }
 }
diff --git a/exes/webhook/src/handler/handler.rs b/exes/webhook/src/handler/handler.rs
deleted file mode 100644 (file)
index 896e43f..0000000
+++ /dev/null
@@ -1,169 +0,0 @@
-use super::error::WebhookError;
-use super::signature::validate_signature;
-use crate::config::WebhookConfig;
-use ed25519_dalek::PublicKey;
-use hyper::{
-    body::{to_bytes, Bytes},
-    service::Service,
-    Body, Method, Request, Response, StatusCode,
-};
-use shared::nats_crate::Client;
-use shared::{
-    log::{debug, error},
-    payloads::{CachePayload, DispatchEventTagged, Tracing},
-};
-use std::{
-    future::Future,
-    pin::Pin,
-    str::from_utf8,
-    task::{Context, Poll},
-};
-use twilight_model::gateway::event::DispatchEvent;
-use twilight_model::{
-    application::interaction::{Interaction, InteractionType},
-    gateway::payload::incoming::InteractionCreate,
-};
-
-/// Hyper service used to handle the discord webhooks
-#[derive(Clone)]
-pub struct WebhookService {
-    pub config: WebhookConfig,
-    pub nats: Client,
-}
-
-impl WebhookService {
-    async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {
-        if req.method() == Method::POST {
-            let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {
-                sig.to_owned()
-            } else {
-                return Err(WebhookError::new(
-                    StatusCode::BAD_REQUEST,
-                    "missing signature header",
-                ));
-            };
-
-            let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") {
-                timestamp.to_owned()
-            } else {
-                return Err(WebhookError::new(
-                    StatusCode::BAD_REQUEST,
-                    "missing timestamp header",
-                ));
-            };
-            let data = to_bytes(req.into_body()).await?;
-
-            if validate_signature(
-                &pk,
-                &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),
-                signature.to_str()?,
-            ) {
-                Ok(data)
-            } else {
-                Err(WebhookError::new(
-                    StatusCode::UNAUTHORIZED,
-                    "invalid signature",
-                ))
-            }
-        } else {
-            Err(WebhookError::new(StatusCode::NOT_FOUND, "not found"))
-        }
-    }
-
-    async fn process_request(
-        req: Request<Body>,
-        nats: Client,
-        pk: PublicKey,
-    ) -> Result<Response<Body>, WebhookError> {
-        match Self::check_request(req, pk).await {
-            Ok(data) => {
-                let utf8 = from_utf8(&data);
-                match utf8 {
-                    Ok(data) => match serde_json::from_str::<Interaction>(data) {
-                        Ok(value) => {
-                            match value.kind {
-                                InteractionType::Ping => Ok(Response::builder()
-                                    .header("Content-Type", "application/json")
-                                    .body(r#"{"type":1}"#.into())
-                                    .unwrap()),
-                                _ => {
-                                    debug!("calling nats");
-                                    // this should hopefully not fail ?
-
-                                    let data = CachePayload {
-                                        tracing: Tracing {
-                                            node_id: "".to_string(),
-                                            span: None,
-                                        },
-                                        data: DispatchEventTagged {
-                                            data: DispatchEvent::InteractionCreate(Box::new(
-                                                InteractionCreate(value),
-                                            )),
-                                        },
-                                    };
-
-                                    let payload = serde_json::to_string(&data).unwrap();
-
-                                    match nats
-                                        .request(
-                                            "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
-                                            Bytes::from(payload),
-                                        )
-                                        .await
-                                    {
-                                        Ok(response) => Ok(Response::builder()
-                                            .header("Content-Type", "application/json")
-                                            .body(Body::from(response.payload))
-                                            .unwrap()),
-
-                                        Err(error) => {
-                                            error!("failed to request nats: {}", error);
-                                            Err(WebhookError::new(
-                                                StatusCode::INTERNAL_SERVER_ERROR,
-                                                "failed to request nats",
-                                            ))
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        Err(error) => {
-                            error!("invalid json body: {}", error);
-                            Err(WebhookError::new(
-                                StatusCode::BAD_REQUEST,
-                                "invalid json body",
-                            ))
-                        }
-                    },
-
-                    Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")),
-                }
-            }
-            Err(error) => Err(error),
-        }
-    }
-}
-
-/// Implementation of the service
-impl Service<hyper::Request<Body>> for WebhookService {
-    type Response = hyper::Response<Body>;
-    type Error = hyper::Error;
-    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
-
-    fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn call(&mut self, req: Request<Body>) -> Self::Future {
-        let future = Self::process_request(req, self.nats.clone(), self.config.discord.public_key);
-        Box::pin(async move {
-            let response = future.await;
-
-            match response {
-                Ok(r) => Ok(r),
-                Err(e) => Ok(e.into()),
-            }
-        })
-    }
-}
index e4cf35ae3dfc219a458f881ee5827428fb84c349..3ef859e63fa0d1e751998e9daa7e40c41e289477 100644 (file)
@@ -1,7 +1,176 @@
+use crate::config::WebhookConfig;
+use ed25519_dalek::PublicKey;
+use error::WebhookError;
+use hyper::{
+    body::{to_bytes, Bytes},
+    service::Service,
+    Body, Method, Request, Response, StatusCode,
+};
+use shared::nats_crate::Client;
+use shared::{
+    log::{debug, error},
+    payloads::{CachePayload, DispatchEventTagged, Tracing},
+};
+use signature::validate_signature;
+use std::{
+    future::Future,
+    pin::Pin,
+    str::from_utf8,
+    task::{Context, Poll},
+};
+use twilight_model::gateway::event::DispatchEvent;
+use twilight_model::{
+    application::interaction::{Interaction, InteractionType},
+    gateway::payload::incoming::InteractionCreate,
+};
+
 mod error;
-pub mod handler;
 pub mod make_service;
 mod signature;
 
 #[cfg(test)]
 pub mod tests;
+
+/// Hyper service used to handle the discord webhooks
+#[derive(Clone)]
+pub struct WebhookService {
+    pub config: WebhookConfig,
+    pub nats: Client,
+}
+
+impl WebhookService {
+    async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {
+        if req.method() == Method::POST {
+            let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {
+                sig.to_owned()
+            } else {
+                return Err(WebhookError::new(
+                    StatusCode::BAD_REQUEST,
+                    "missing signature header",
+                ));
+            };
+
+            let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") {
+                timestamp.to_owned()
+            } else {
+                return Err(WebhookError::new(
+                    StatusCode::BAD_REQUEST,
+                    "missing timestamp header",
+                ));
+            };
+            let data = to_bytes(req.into_body()).await?;
+
+            if validate_signature(
+                &pk,
+                &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),
+                signature.to_str()?,
+            ) {
+                Ok(data)
+            } else {
+                Err(WebhookError::new(
+                    StatusCode::UNAUTHORIZED,
+                    "invalid signature",
+                ))
+            }
+        } else {
+            Err(WebhookError::new(StatusCode::NOT_FOUND, "not found"))
+        }
+    }
+
+    async fn process_request(
+        req: Request<Body>,
+        nats: Client,
+        pk: PublicKey,
+    ) -> Result<Response<Body>, WebhookError> {
+        match Self::check_request(req, pk).await {
+            Ok(data) => {
+                let utf8 = from_utf8(&data);
+                match utf8 {
+                    Ok(data) => match serde_json::from_str::<Interaction>(data) {
+                        Ok(value) => {
+                            match value.kind {
+                                InteractionType::Ping => Ok(Response::builder()
+                                    .header("Content-Type", "application/json")
+                                    .body(r#"{"type":1}"#.into())
+                                    .unwrap()),
+                                _ => {
+                                    debug!("calling nats");
+                                    // this should hopefully not fail ?
+
+                                    let data = CachePayload {
+                                        tracing: Tracing {
+                                            node_id: "".to_string(),
+                                            span: None,
+                                        },
+                                        data: DispatchEventTagged {
+                                            data: DispatchEvent::InteractionCreate(Box::new(
+                                                InteractionCreate(value),
+                                            )),
+                                        },
+                                    };
+
+                                    let payload = serde_json::to_string(&data).unwrap();
+
+                                    match nats
+                                        .request(
+                                            "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
+                                            Bytes::from(payload),
+                                        )
+                                        .await
+                                    {
+                                        Ok(response) => Ok(Response::builder()
+                                            .header("Content-Type", "application/json")
+                                            .body(Body::from(response.payload))
+                                            .unwrap()),
+
+                                        Err(error) => {
+                                            error!("failed to request nats: {}", error);
+                                            Err(WebhookError::new(
+                                                StatusCode::INTERNAL_SERVER_ERROR,
+                                                "failed to request nats",
+                                            ))
+                                        }
+                                    }
+                                }
+                            }
+                        }
+
+                        Err(error) => {
+                            error!("invalid json body: {}", error);
+                            Err(WebhookError::new(
+                                StatusCode::BAD_REQUEST,
+                                "invalid json body",
+                            ))
+                        }
+                    },
+
+                    Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")),
+                }
+            }
+            Err(error) => Err(error),
+        }
+    }
+}
+
+/// Implementation of the service
+impl Service<hyper::Request<Body>> for WebhookService {
+    type Response = hyper::Response<Body>;
+    type Error = hyper::Error;
+    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
+
+    fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, req: Request<Body>) -> Self::Future {
+        let future = Self::process_request(req, self.nats.clone(), self.config.discord.public_key);
+        Box::pin(async move {
+            let response = future.await;
+
+            match response {
+                Ok(r) => Ok(r),
+                Err(e) => Ok(e.into()),
+            }
+        })
+    }
+}
index 3dc4373537deacddab0d584feeaa9172ed070b94..fc5555fc6ecf48e3bcf301a55967062bdf4c2585 100644 (file)
@@ -23,7 +23,7 @@ fn demo<T, const N: usize>(v: Vec<T>) -> [T; N] {
         .unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", N, v.len()))
 }
 
-pub fn validate_signature(public_key: &PublicKey, data: &Vec<u8>, hex_signature: &str) -> bool {
+pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool {
     SIGNATURE_COUNTER.inc();
     let timer = SIGNATURE_TIME_HISTOGRAM.with_label_values(&["webhook_main"]).start_timer();
 
index 13a4e6004dfeff8d57083e3ec4546f8dc9e47744..43ab9c4d85bb7ba64dfa574e0abbc121dc8585fa 100644 (file)
@@ -4,7 +4,7 @@ use std::{future::Future, pin::Pin};
 
 use crate::{
     config::WebhookConfig,
-    handler::{handler::WebhookService, make_service::MakeSvc},
+    handler::{make_service::MakeSvc, WebhookService},
 };
 use hyper::Server;
 use leash::{AnyhowResultFuture, Component};
@@ -28,10 +28,10 @@ impl Component for WebhookServer {
 
             let bind = settings.server.listening_adress;
             info!("NAts connected!");
-            let nats =
-                Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats)
-
-                    .await?;
+            let nats = Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(
+                settings.nats,
+            )
+            .await?;
 
             let make_service = MakeSvc::new(WebhookService {
                 config: settings.config,
@@ -40,9 +40,11 @@ impl Component for WebhookServer {
 
             let server = Server::bind(&bind).serve(make_service);
 
-            server.with_graceful_shutdown(async {
-                stop.await.expect("should not fail");
-            }).await?;
+            server
+                .with_graceful_shutdown(async {
+                    stop.await.expect("should not fail");
+                })
+                .await?;
 
             Ok(())
         })
@@ -51,4 +53,4 @@ impl Component for WebhookServer {
     fn new() -> Self {
         Self {}
     }
-}
\ No newline at end of file
+}
index 4387dfb0cc8fcd2321d2e4fca1a18682731b4050..ab584a2b420b00f8990eaea2503cfade957fea48 100644 (file)
@@ -13,7 +13,7 @@ pub struct Settings<T: Clone + DeserializeOwned + Default> {
     pub redis: crate::redis::RedisConfiguration,
 }
 
-impl<'de, T: Clone + DeserializeOwned + Default> Settings<T>
+impl<T: Clone + DeserializeOwned + Default> Settings<T>
 {
     pub fn new(service_name: &str) -> Result<Settings<T>, GenericError> {
         let mut builder = Config::builder();
index 5753fb6649963317d864bd088cfd3dd05b72d0cb..a623c2fac42654b64085f40d88649af65db9a8e2 100644 (file)
@@ -7,15 +7,8 @@ pub struct RedisConfiguration {
     pub url: String,
 }
 
-// Allows the configuration to directly create a nats connection
-impl Into<Client> for RedisConfiguration {
-    fn into(self) -> Client {
-        redis::Client::open(self.url).unwrap()
-    }
-}
-
 impl From<RedisConfiguration>
-    for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>>>>
+    for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>> + Send>>
 {
     fn from(value: RedisConfiguration) -> Self {
         Box::pin(async move {