]> git.puffer.fish Git - matthieu/nova.git/commitdiff
add tests
authorMatthieuCoder <matthieu@matthieu-dev.xyz>
Sat, 14 Jan 2023 15:06:14 +0000 (19:06 +0400)
committerMatthieuCoder <matthieu@matthieu-dev.xyz>
Sat, 14 Jan 2023 15:06:14 +0000 (19:06 +0400)
exes/cache/src/main.rs
exes/gateway/src/lib.rs
exes/ratelimit/src/buckets/async_queue.rs
exes/ratelimit/src/buckets/atomic_instant.rs
exes/ratelimit/src/buckets/bucket.rs
exes/webhook/src/handler/mod.rs
libs/leash/src/lib.rs
libs/shared/src/payloads.rs

index bc13cd56a200d37e7829a716fc9dd32820c4f94d..c0dff6d3d774f400fa4e0fba5248d24c2467422e 100644 (file)
@@ -56,7 +56,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
 async fn listen(sub: &mut Subscriber, cache: &mut Cache, features: Vec<String>) {
     while let Some(data) = sub.next().await {
         let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap();
-        let event = cp.data.data;
+        let event = cp.data.0;
         match event {
             // Channel events
             DispatchEvent::ChannelCreate(_)
index e54bb5cda683ac857fd7b8d12ff225644de3a486..c4da8f44ef4487af4fa608c887c087e05788c79c 100644 (file)
@@ -93,9 +93,7 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
             debug!("handling event {}", name.unwrap());
 
             let data = CachePayload {
-                data: DispatchEventTagged {
-                    data: dispatch_event,
-                },
+                data: DispatchEventTagged(dispatch_event),
             };
             let value = serde_json::to_string(&data)?;
             let bytes = bytes::Bytes::from(value);
index 70b4ebdb5b23f6b4b5aad93af4cb5cf7703f4ef6..6ae51c88ca53939b60b2588e4aeb966b24fea7d3 100644 (file)
@@ -1,33 +1,46 @@
 use tokio::sync::{
     mpsc::{self, UnboundedReceiver, UnboundedSender},
-    oneshot::Sender,
     Mutex,
 };
 
-/// Queue of ratelimit requests for a bucket.
+/// Simple async fifo (first in fist out) queue based on unbounded channels
+/// 
+/// # Usage
+/// ```
+/// # use ratelimit::buckets::async_queue::AsyncQueue;
+/// # tokio_test::block_on(async {
+/// let queue = AsyncQueue::<i64>::default();
+/// // Pushing into the queue is syncronous
+/// queue.push(123);
+///     
+/// // Popping from the queue is asyncronous
+/// let value = queue.pop().await;
+///     
+/// // Our value should be the same!
+/// assert_eq!(value, Some(123));
+/// # });
+/// ```
 #[derive(Debug)]
-pub struct AsyncQueue {
-    /// Receiver for the ratelimit requests.
-    rx: Mutex<UnboundedReceiver<Sender<()>>>,
-    /// Sender for the ratelimit requests.
-    tx: UnboundedSender<Sender<()>>,
+pub struct AsyncQueue<T: Send> {
+    rx: Mutex<UnboundedReceiver<T>>,
+    tx: UnboundedSender<T>,
 }
 
-impl AsyncQueue {
-    /// Add a new ratelimit request to the queue.
-    pub fn push(&self, tx: Sender<()>) {
+impl<T: Send> AsyncQueue<T> {
+    /// Add a new item to the queue
+    pub fn push(&self, tx: T) {
         let _sent = self.tx.send(tx);
     }
 
     /// Receive the first incoming ratelimit request.
-    pub async fn pop(&self) -> Option<Sender<()>> {
+    pub async fn pop(&self) -> Option<T> {
         let mut rx = self.rx.lock().await;
 
         rx.recv().await
     }
 }
 
-impl Default for AsyncQueue {
+impl<T: Send> Default for AsyncQueue<T> {
     fn default() -> Self {
         let (tx, rx) = mpsc::unbounded_channel();
 
@@ -37,3 +50,23 @@ impl Default for AsyncQueue {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::buckets::async_queue::AsyncQueue;
+
+    #[test_log::test(tokio::test)]
+    async fn should_queue_dequeue_fifo() {
+        let queue = AsyncQueue::<i64>::default();
+
+        // queue data
+        for i in 0..2_000_000 {
+            queue.push(i);
+        }
+
+        for i in 0..2_000_000 {
+            let result = queue.pop().await.unwrap();
+            assert_eq!(i, result);
+        }
+    }
+}
index 992adb927708db5f9db4e6a96010187bb8c40370..fe26aa8be579da75c1702f4388ffc80c89f3b899 100644 (file)
@@ -1,19 +1,36 @@
 use std::{
+    hash::Hash,
+    ops::{Add, AddAssign, Sub},
     sync::atomic::{AtomicU64, Ordering},
     time::{Duration, SystemTime, UNIX_EPOCH},
 };
 
-use tracing::debug;
-
+/// Instant implementation based on an atomic number
+/// # Example
+/// ```
+/// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+/// # use std::time::Duration;
+///
+/// let now = AtomicInstant::now();
+/// let max_seconds = u64::MAX / 1_000_000_000;
+/// let duration = Duration::new(max_seconds, 0);
+/// println!("{:?}", now + duration);
+/// ```
 #[derive(Default, Debug)]
+#[cfg(not(target_feature = "atomic128"))]
 pub struct AtomicInstant(AtomicU64);
 
 impl AtomicInstant {
-    #[must_use]
-    pub const fn empty() -> Self {
-        Self(AtomicU64::new(0))
-    }
-
+    /// Calculates the duration since the instant.
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::now();
+    /// std::thread::sleep(Duration::from_secs(1));
+    /// 
+    /// assert_eq!(instant.elapsed().as_secs(), 1);
+    /// ```
     pub fn elapsed(&self) -> Duration {
         // Truncation is expected
         #[allow(clippy::cast_possible_truncation)]
@@ -25,24 +42,228 @@ impl AtomicInstant {
                 - self.0.load(Ordering::Relaxed),
         )
     }
-
+    /// Gets the current time in millis
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    /// instant.set_millis(1000);
+    /// 
+    /// assert_eq!(instant.as_millis(), 1000);
+    /// ```
     pub fn as_millis(&self) -> u64 {
         self.0.load(Ordering::Relaxed)
     }
 
+    /// Creates an instant at the current time
+    /// # Safety
+    /// Truncates if the current unix time is greater than `u64::MAX`
+    #[allow(clippy::cast_possible_truncation)]
+    #[must_use]
+    pub fn now() -> Self {
+        Self(AtomicU64::new(
+            SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .expect("time went backwards")
+                .as_millis() as u64,
+        ))
+    }
+
+    /// Sets the unix time of the instant
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    /// instant.set_millis(1000);
+    /// 
+    /// assert_eq!(instant.as_millis(), 1000);
+    /// ```
     pub fn set_millis(&self, millis: u64) {
-        // get address of struct
-        let b = self as *const _ as usize;
-        debug!(millis, this = ?b, "settings instant millis");
         self.0.store(millis, Ordering::Relaxed);
     }
 
+    /// Determines if the current instant is at the default value
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    /// 
+    /// assert!(instant.is_empty());
+    /// ```
     pub fn is_empty(&self) -> bool {
-        let millis = self.as_millis();
-        // get address of struct
-        let b = self as *const _ as usize;
-        debug!(millis, this = ?b, "settings instant millis");
-        debug!(empty = (millis == 0), millis, this = ?b, "instant empty check");
-        millis == 0
+        self.as_millis() == 0
+    }
+}
+
+impl Add<Duration> for AtomicInstant {
+    type Output = Self;
+    /// # Safety
+    /// This panics if the right hand side is greater than `i64::MAX`
+    /// You can remedy to this using the 128bits feature with changes the
+    /// underlying atomic.
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    ///
+    /// // we add one second to our instant
+    /// instant = instant + Duration::from_secs(1);
+    ///
+    /// // should be equal to a second
+    /// assert_eq!(instant.as_millis(), 1000);
+    /// ```
+    fn add(self, rhs: Duration) -> Self::Output {
+        self.0
+            .fetch_add(rhs.as_millis().try_into().unwrap(), Ordering::Relaxed);
+        self
+    }
+}
+
+impl AddAssign<Duration> for AtomicInstant {
+    /// # Safety
+    /// This panics if the right hand side is greater than `i64::MAX`
+    /// You can remedy to this using the 128bits feature with changes the
+    /// underlying atomic.
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    ///
+    /// // we add one second to our instant
+    /// instant += Duration::from_secs(1);
+    ///
+    /// // should be equal to a second
+    /// assert_eq!(instant.as_millis(), 1000);
+    /// ```
+    fn add_assign(&mut self, rhs: Duration) {
+        self.0
+            .fetch_add(rhs.as_millis().try_into().unwrap(), Ordering::Relaxed);
+    }
+}
+
+impl Hash for AtomicInstant {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.0.load(Ordering::Relaxed).hash(state);
+    }
+}
+
+impl PartialEq for AtomicInstant {
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    /// let mut instant2 = AtomicInstant::default();
+    ///
+    /// assert_eq!(instant, instant2);
+    /// ```
+    fn eq(&self, other: &Self) -> bool {
+        self.0.load(Ordering::Relaxed) == other.0.load(Ordering::Relaxed)
+    }
+}
+impl Eq for AtomicInstant {}
+
+impl PartialOrd for AtomicInstant {
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    /// let mut instant2 = AtomicInstant::default();
+    ///
+    /// assert!(instant == instant2);
+    /// instant.set_millis(1000);
+    /// assert!(instant > instant2);
+    /// instant.set_millis(0);
+    /// instant2.set_millis(1000);
+    /// assert!(instant < instant2);
+    /// ```
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        self.0
+            .load(Ordering::Relaxed)
+            .partial_cmp(&other.0.load(Ordering::Relaxed))
+    }
+}
+
+impl Ord for AtomicInstant {
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    /// let mut instant2 = AtomicInstant::default();
+    ///
+    /// assert!(instant == instant2);
+    /// instant.set_millis(1000);
+    /// assert!(instant > instant2);
+    /// instant.set_millis(0);
+    /// instant2.set_millis(1000);
+    /// assert!(instant < instant2);
+    /// ```
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.0
+            .load(Ordering::Relaxed)
+            .cmp(&other.0.load(Ordering::Relaxed))
+    }
+}
+
+impl Sub<Duration> for AtomicInstant {
+    type Output = Self;
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    /// instant.set_millis(1000);
+    ///
+    /// instant = instant - Duration::from_secs(1);
+    ///
+    /// assert!(instant.is_empty());
+    /// ```
+    fn sub(self, rhs: Duration) -> Self::Output {
+        self.0
+            .fetch_sub(rhs.as_millis().try_into().unwrap(), Ordering::Relaxed);
+        self
+    }
+}
+
+impl Sub<Self> for AtomicInstant {
+    type Output = Self;
+    /// # Example
+    /// ```
+    /// # use ratelimit::buckets::atomic_instant::AtomicInstant;
+    /// # use std::time::Duration;
+    /// let mut instant = AtomicInstant::default();
+    /// let mut instant2 = AtomicInstant::default();
+    /// instant.set_millis(1000);
+    /// instant2.set_millis(2000);
+    ///
+    /// instant = instant2 - instant;
+    ///
+    /// assert_eq!(instant.as_millis(), 1000);
+    /// ```
+    fn sub(self, rhs: Self) -> Self::Output {
+        self.0
+            .fetch_sub(rhs.0.load(Ordering::Relaxed), Ordering::Relaxed);
+        self
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::AtomicInstant;
+
+    #[test]
+    fn should_detect_default() {
+        let instant = AtomicInstant::default();
+        assert!(instant.is_empty());
+
+        instant.set_millis(1000);
+        assert!(!instant.is_empty());
     }
 }
index c40f059739191b1269966d6a817ef3b688487ea6..2b0cabccc74d443fdd180c535b5a75985b4652ec 100644 (file)
@@ -6,7 +6,10 @@ use std::{
     },
     time::Duration,
 };
-use tokio::{sync::oneshot, task::JoinHandle};
+use tokio::{
+    sync::oneshot::{self, Sender},
+    task::JoinHandle,
+};
 use tracing::{debug, trace};
 use twilight_http_ratelimiting::headers::Present;
 
@@ -22,29 +25,36 @@ pub enum TimeRemaining {
 ///
 /// # Usage
 /// ```
-/// use ratelimit::buckets::bucket::Bucket;
-/// use twilight_http_ratelimiting::RatelimitHeaders;
-/// use std::time::SystemTime;
-///
-/// let bucket = Bucket::new();
+/// # use ratelimit::buckets::bucket::Bucket;
+/// # use twilight_http_ratelimiting::RatelimitHeaders;
+/// # use std::time::SystemTime;
+/// # tokio_test::block_on(async {
 ///
-/// // Feed the headers informations into the bucket to update it
-/// let headers = [
-///     ( "x-ratelimit-bucket", "bucket id".as_bytes()),
-///     ("x-ratelimit-limit", "100".as_bytes()),
-///     ("x-ratelimit-remaining", "0".as_bytes()),
-///     ("x-ratelimit-reset", "".as_bytes()),
-///     ("x-ratelimit-reset-after", "10.000".as_bytes()),
-/// ];
+///     let bucket = Bucket::new();
 ///
-/// // Parse the headers
-/// let present = if let Ok(RatelimitHeaders::Present(present)) = RatelimitHeaders::from_pairs(headers.into_iter()) { present } else { todo!() };
+///     // Feed the headers informations into the bucket to update it
+///     let headers = [
+///         ( "x-ratelimit-bucket", "bucket id".as_bytes()),
+///         ("x-ratelimit-limit", "100".as_bytes()),
+///         ("x-ratelimit-remaining", "0".as_bytes()),
+///         ("x-ratelimit-reset", "99999999999999".as_bytes()),
+///         ("x-ratelimit-reset-after", "10.000".as_bytes()),
+///     ];
 ///
-/// // this should idealy the time of the request
-/// let current_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
+///     // Parse the headers
+///     let present = if let Ok(RatelimitHeaders::Present(present))
+///         = RatelimitHeaders::from_pairs(headers.into_iter()) {
+///         present
+///     } else { todo!() };
 ///
-/// bucket.update(present, current_time).await;
+///     // this should idealy the time of the request
+///     let current_time = SystemTime::now()
+///         .duration_since(SystemTime::UNIX_EPOCH)
+///         .unwrap()
+///         .as_millis() as u64;
 ///
+///     bucket.update(&present, current_time);
+/// # })
 /// ```
 ///
 /// # Async
@@ -63,7 +73,7 @@ pub struct Bucket {
     /// List of tasks that dequeue tasks from [`Self::queue`]
     tasks: Vec<JoinHandle<()>>,
     /// Queue of tickets to be processed.
-    queue: AsyncQueue,
+    queue: AsyncQueue<Sender<()>>,
 }
 
 impl Drop for Bucket {
@@ -88,7 +98,7 @@ impl Bucket {
             queue: AsyncQueue::default(),
             remaining: AtomicU64::new(u64::max_value()),
             reset_after: AtomicU64::new(u64::max_value()),
-            last_update: AtomicInstant::empty(),
+            last_update: AtomicInstant::default(),
             tasks,
         });
 
@@ -292,7 +302,7 @@ mod tests {
             ("x-ratelimit-limit", b"100"),
             ("x-ratelimit-remaining", b"0"),
             ("x-ratelimit-reset", mreset.as_bytes()),
-            ("x-ratelimit-reset-after", b"100.000"),
+            ("x-ratelimit-reset-after", b"10.000"),
         ];
 
         if let RatelimitHeaders::Present(present) =
index ea7eccacba358116b98800a8999fa2558a169dc0..46a4d9e7e04730c104a68bf3a2448e4b676e57f7 100644 (file)
@@ -82,11 +82,9 @@ impl WebhookService {
             // this should hopefully not fail ?
 
             let data = CachePayload {
-                data: DispatchEventTagged {
-                    data: DispatchEvent::InteractionCreate(Box::new(InteractionCreate(
-                        interaction,
-                    ))),
-                },
+                data: DispatchEventTagged(DispatchEvent::InteractionCreate(Box::new(
+                    InteractionCreate(interaction),
+                ))),
             };
 
             let payload = serde_json::to_string(&data).unwrap();
index 37e2b7c60b2d844130a1a47dab2debd8f7195bae..91bfaf59257e8db14704e9a3b1cf6a9f2312b278 100644 (file)
@@ -128,5 +128,7 @@ mod test {
         }
     }
 
+    
+
     ignite!(TestComponent);
 }
index 3f183a94f256153c2c61ba3e33cfa8e33cb412ff..a4818e6265bc036088cc0f96818a43ab993769a5 100644 (file)
@@ -1,4 +1,5 @@
 use std::fmt::Debug;
+use std::ops::{Deref, DerefMut};
 
 use serde::de::DeserializeSeed;
 use serde::Deserializer;
@@ -7,10 +8,20 @@ use serde_json::Value;
 use tracing::trace_span;
 use twilight_model::gateway::event::{DispatchEvent, DispatchEventWithTypeDeserializer};
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq)]
 #[repr(transparent)]
-pub struct DispatchEventTagged {
-    pub data: DispatchEvent,
+pub struct DispatchEventTagged(pub DispatchEvent);
+
+impl Deref for DispatchEventTagged {
+    type Target = DispatchEvent;
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+impl DerefMut for DispatchEventTagged {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.0
+    }
 }
 
 #[derive(Serialize, Deserialize)]
@@ -31,9 +42,7 @@ impl<'de> Deserialize<'de> for DispatchEventTagged {
         let tagged = DispatchEventTaggedSerialized::deserialize(deserializer)?;
         let deserializer_seed = DispatchEventWithTypeDeserializer::new(&tagged.kind);
         let dispatch_event = deserializer_seed.deserialize(tagged.data).unwrap();
-        Ok(Self {
-            data: dispatch_event,
-        })
+        Ok(Self(dispatch_event))
     }
 }
 
@@ -43,9 +52,9 @@ impl Serialize for DispatchEventTagged {
         S: serde::Serializer,
     {
         let _s = trace_span!("serializing DispatchEventTagged");
-        let kind = self.data.kind().name().unwrap();
+        let kind = self.0.kind().name().unwrap();
         DispatchEventTaggedSerialized {
-            data: serde_json::to_value(&self.data).unwrap(),
+            data: serde_json::to_value(&self.0).unwrap(),
             kind: kind.to_string(),
         }
         .serialize(serializer)
@@ -58,3 +67,40 @@ pub struct CachePayload {
     #[serde(flatten)]
     pub data: DispatchEventTagged,
 }
+#[cfg(test)]
+mod tests {
+    use serde_json::json;
+    use twilight_model::gateway::event::DispatchEvent;
+
+    use super::DispatchEventTagged;
+
+    #[test]
+    fn serialize_event_tagged() {
+        let dispatch_event = DispatchEvent::GiftCodeUpdate;
+
+        let value = serde_json::to_value(&dispatch_event);
+        assert!(value.is_ok());
+        let value = value.unwrap();
+
+        let kind = value.get("t").and_then(serde_json::Value::as_str);
+        assert_eq!(kind, Some("GIFT_CODE_UPDATE"));
+    }
+
+    #[test]
+    fn deserialize_event_tagged() {
+        let json = json!({
+            "t": "GIFT_CODE_UPDATE",
+            "d": {}
+        });
+
+        let dispatch_event = serde_json::from_value::<DispatchEventTagged>(json);
+        assert!(dispatch_event.is_ok());
+
+        let dispatch_event_tagged = dispatch_event.unwrap();
+
+        assert_eq!(
+            DispatchEventTagged(DispatchEvent::GiftCodeUpdate),
+            dispatch_event_tagged
+        );
+    }
+}