diff options
| -rw-r--r-- | exes/cache/src/main.rs | 2 | ||||
| -rw-r--r-- | exes/gateway/src/lib.rs | 4 | ||||
| -rw-r--r-- | exes/ratelimit/src/buckets/async_queue.rs | 57 | ||||
| -rw-r--r-- | exes/ratelimit/src/buckets/atomic_instant.rs | 255 | ||||
| -rw-r--r-- | exes/ratelimit/src/buckets/bucket.rs | 54 | ||||
| -rw-r--r-- | exes/webhook/src/handler/mod.rs | 8 | ||||
| -rw-r--r-- | libs/leash/src/lib.rs | 2 | ||||
| -rw-r--r-- | libs/shared/src/payloads.rs | 62 | 
8 files changed, 376 insertions, 68 deletions
diff --git a/exes/cache/src/main.rs b/exes/cache/src/main.rs index bc13cd5..c0dff6d 100644 --- a/exes/cache/src/main.rs +++ b/exes/cache/src/main.rs @@ -56,7 +56,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {  async fn listen(sub: &mut Subscriber, cache: &mut Cache, features: Vec<String>) {      while let Some(data) = sub.next().await {          let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap(); -        let event = cp.data.data; +        let event = cp.data.0;          match event {              // Channel events              DispatchEvent::ChannelCreate(_) diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs index e54bb5c..c4da8f4 100644 --- a/exes/gateway/src/lib.rs +++ b/exes/gateway/src/lib.rs @@ -93,9 +93,7 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {              debug!("handling event {}", name.unwrap());              let data = CachePayload { -                data: DispatchEventTagged { -                    data: dispatch_event, -                }, +                data: DispatchEventTagged(dispatch_event),              };              let value = serde_json::to_string(&data)?;              let bytes = bytes::Bytes::from(value); diff --git a/exes/ratelimit/src/buckets/async_queue.rs b/exes/ratelimit/src/buckets/async_queue.rs index 70b4ebd..6ae51c8 100644 --- a/exes/ratelimit/src/buckets/async_queue.rs +++ b/exes/ratelimit/src/buckets/async_queue.rs @@ -1,33 +1,46 @@  use tokio::sync::{      mpsc::{self, UnboundedReceiver, UnboundedSender}, -    oneshot::Sender,      Mutex,  }; -/// Queue of ratelimit requests for a bucket. +/// Simple async fifo (first in fist out) queue based on unbounded channels +///  +/// # Usage +/// ``` +/// # use ratelimit::buckets::async_queue::AsyncQueue; +/// # tokio_test::block_on(async { +/// let queue = AsyncQueue::<i64>::default(); +/// // Pushing into the queue is syncronous +/// queue.push(123); +///      +/// // Popping from the queue is asyncronous +/// let value = queue.pop().await; +///      +/// // Our value should be the same! +/// assert_eq!(value, Some(123)); +/// # }); +/// ```  #[derive(Debug)] -pub struct AsyncQueue { -    /// Receiver for the ratelimit requests. -    rx: Mutex<UnboundedReceiver<Sender<()>>>, -    /// Sender for the ratelimit requests. -    tx: UnboundedSender<Sender<()>>, +pub struct AsyncQueue<T: Send> { +    rx: Mutex<UnboundedReceiver<T>>, +    tx: UnboundedSender<T>,  } -impl AsyncQueue { -    /// Add a new ratelimit request to the queue. -    pub fn push(&self, tx: Sender<()>) { +impl<T: Send> AsyncQueue<T> { +    /// Add a new item to the queue +    pub fn push(&self, tx: T) {          let _sent = self.tx.send(tx);      }      /// Receive the first incoming ratelimit request. -    pub async fn pop(&self) -> Option<Sender<()>> { +    pub async fn pop(&self) -> Option<T> {          let mut rx = self.rx.lock().await;          rx.recv().await      }  } -impl Default for AsyncQueue { +impl<T: Send> Default for AsyncQueue<T> {      fn default() -> Self {          let (tx, rx) = mpsc::unbounded_channel(); @@ -37,3 +50,23 @@ impl Default for AsyncQueue {          }      }  } + +#[cfg(test)] +mod tests { +    use crate::buckets::async_queue::AsyncQueue; + +    #[test_log::test(tokio::test)] +    async fn should_queue_dequeue_fifo() { +        let queue = AsyncQueue::<i64>::default(); + +        // queue data +        for i in 0..2_000_000 { +            queue.push(i); +        } + +        for i in 0..2_000_000 { +            let result = queue.pop().await.unwrap(); +            assert_eq!(i, result); +        } +    } +} diff --git a/exes/ratelimit/src/buckets/atomic_instant.rs b/exes/ratelimit/src/buckets/atomic_instant.rs index 992adb9..fe26aa8 100644 --- a/exes/ratelimit/src/buckets/atomic_instant.rs +++ b/exes/ratelimit/src/buckets/atomic_instant.rs @@ -1,19 +1,36 @@  use std::{ +    hash::Hash, +    ops::{Add, AddAssign, Sub},      sync::atomic::{AtomicU64, Ordering},      time::{Duration, SystemTime, UNIX_EPOCH},  }; -use tracing::debug; - +/// Instant implementation based on an atomic number +/// # Example +/// ``` +/// # use ratelimit::buckets::atomic_instant::AtomicInstant; +/// # use std::time::Duration; +/// +/// let now = AtomicInstant::now(); +/// let max_seconds = u64::MAX / 1_000_000_000; +/// let duration = Duration::new(max_seconds, 0); +/// println!("{:?}", now + duration); +/// ```  #[derive(Default, Debug)] +#[cfg(not(target_feature = "atomic128"))]  pub struct AtomicInstant(AtomicU64);  impl AtomicInstant { -    #[must_use] -    pub const fn empty() -> Self { -        Self(AtomicU64::new(0)) -    } - +    /// Calculates the duration since the instant. +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::now(); +    /// std::thread::sleep(Duration::from_secs(1)); +    ///  +    /// assert_eq!(instant.elapsed().as_secs(), 1); +    /// ```      pub fn elapsed(&self) -> Duration {          // Truncation is expected          #[allow(clippy::cast_possible_truncation)] @@ -25,24 +42,228 @@ impl AtomicInstant {                  - self.0.load(Ordering::Relaxed),          )      } - +    /// Gets the current time in millis +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    /// instant.set_millis(1000); +    ///  +    /// assert_eq!(instant.as_millis(), 1000); +    /// ```      pub fn as_millis(&self) -> u64 {          self.0.load(Ordering::Relaxed)      } +    /// Creates an instant at the current time +    /// # Safety +    /// Truncates if the current unix time is greater than `u64::MAX` +    #[allow(clippy::cast_possible_truncation)] +    #[must_use] +    pub fn now() -> Self { +        Self(AtomicU64::new( +            SystemTime::now() +                .duration_since(UNIX_EPOCH) +                .expect("time went backwards") +                .as_millis() as u64, +        )) +    } + +    /// Sets the unix time of the instant +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    /// instant.set_millis(1000); +    ///  +    /// assert_eq!(instant.as_millis(), 1000); +    /// ```      pub fn set_millis(&self, millis: u64) { -        // get address of struct -        let b = self as *const _ as usize; -        debug!(millis, this = ?b, "settings instant millis");          self.0.store(millis, Ordering::Relaxed);      } +    /// Determines if the current instant is at the default value +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    ///  +    /// assert!(instant.is_empty()); +    /// ```      pub fn is_empty(&self) -> bool { -        let millis = self.as_millis(); -        // get address of struct -        let b = self as *const _ as usize; -        debug!(millis, this = ?b, "settings instant millis"); -        debug!(empty = (millis == 0), millis, this = ?b, "instant empty check"); -        millis == 0 +        self.as_millis() == 0 +    } +} + +impl Add<Duration> for AtomicInstant { +    type Output = Self; +    /// # Safety +    /// This panics if the right hand side is greater than `i64::MAX` +    /// You can remedy to this using the 128bits feature with changes the +    /// underlying atomic. +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    /// +    /// // we add one second to our instant +    /// instant = instant + Duration::from_secs(1); +    /// +    /// // should be equal to a second +    /// assert_eq!(instant.as_millis(), 1000); +    /// ``` +    fn add(self, rhs: Duration) -> Self::Output { +        self.0 +            .fetch_add(rhs.as_millis().try_into().unwrap(), Ordering::Relaxed); +        self +    } +} + +impl AddAssign<Duration> for AtomicInstant { +    /// # Safety +    /// This panics if the right hand side is greater than `i64::MAX` +    /// You can remedy to this using the 128bits feature with changes the +    /// underlying atomic. +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    /// +    /// // we add one second to our instant +    /// instant += Duration::from_secs(1); +    /// +    /// // should be equal to a second +    /// assert_eq!(instant.as_millis(), 1000); +    /// ``` +    fn add_assign(&mut self, rhs: Duration) { +        self.0 +            .fetch_add(rhs.as_millis().try_into().unwrap(), Ordering::Relaxed); +    } +} + +impl Hash for AtomicInstant { +    fn hash<H: std::hash::Hasher>(&self, state: &mut H) { +        self.0.load(Ordering::Relaxed).hash(state); +    } +} + +impl PartialEq for AtomicInstant { +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    /// let mut instant2 = AtomicInstant::default(); +    /// +    /// assert_eq!(instant, instant2); +    /// ``` +    fn eq(&self, other: &Self) -> bool { +        self.0.load(Ordering::Relaxed) == other.0.load(Ordering::Relaxed) +    } +} +impl Eq for AtomicInstant {} + +impl PartialOrd for AtomicInstant { +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    /// let mut instant2 = AtomicInstant::default(); +    /// +    /// assert!(instant == instant2); +    /// instant.set_millis(1000); +    /// assert!(instant > instant2); +    /// instant.set_millis(0); +    /// instant2.set_millis(1000); +    /// assert!(instant < instant2); +    /// ``` +    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { +        self.0 +            .load(Ordering::Relaxed) +            .partial_cmp(&other.0.load(Ordering::Relaxed)) +    } +} + +impl Ord for AtomicInstant { +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    /// let mut instant2 = AtomicInstant::default(); +    /// +    /// assert!(instant == instant2); +    /// instant.set_millis(1000); +    /// assert!(instant > instant2); +    /// instant.set_millis(0); +    /// instant2.set_millis(1000); +    /// assert!(instant < instant2); +    /// ``` +    fn cmp(&self, other: &Self) -> std::cmp::Ordering { +        self.0 +            .load(Ordering::Relaxed) +            .cmp(&other.0.load(Ordering::Relaxed)) +    } +} + +impl Sub<Duration> for AtomicInstant { +    type Output = Self; +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    /// instant.set_millis(1000); +    /// +    /// instant = instant - Duration::from_secs(1); +    /// +    /// assert!(instant.is_empty()); +    /// ``` +    fn sub(self, rhs: Duration) -> Self::Output { +        self.0 +            .fetch_sub(rhs.as_millis().try_into().unwrap(), Ordering::Relaxed); +        self +    } +} + +impl Sub<Self> for AtomicInstant { +    type Output = Self; +    /// # Example +    /// ``` +    /// # use ratelimit::buckets::atomic_instant::AtomicInstant; +    /// # use std::time::Duration; +    /// let mut instant = AtomicInstant::default(); +    /// let mut instant2 = AtomicInstant::default(); +    /// instant.set_millis(1000); +    /// instant2.set_millis(2000); +    /// +    /// instant = instant2 - instant; +    /// +    /// assert_eq!(instant.as_millis(), 1000); +    /// ``` +    fn sub(self, rhs: Self) -> Self::Output { +        self.0 +            .fetch_sub(rhs.0.load(Ordering::Relaxed), Ordering::Relaxed); +        self +    } +} + +#[cfg(test)] +mod tests { +    use super::AtomicInstant; + +    #[test] +    fn should_detect_default() { +        let instant = AtomicInstant::default(); +        assert!(instant.is_empty()); + +        instant.set_millis(1000); +        assert!(!instant.is_empty());      }  } diff --git a/exes/ratelimit/src/buckets/bucket.rs b/exes/ratelimit/src/buckets/bucket.rs index c40f059..2b0cabc 100644 --- a/exes/ratelimit/src/buckets/bucket.rs +++ b/exes/ratelimit/src/buckets/bucket.rs @@ -6,7 +6,10 @@ use std::{      },      time::Duration,  }; -use tokio::{sync::oneshot, task::JoinHandle}; +use tokio::{ +    sync::oneshot::{self, Sender}, +    task::JoinHandle, +};  use tracing::{debug, trace};  use twilight_http_ratelimiting::headers::Present; @@ -22,29 +25,36 @@ pub enum TimeRemaining {  ///  /// # Usage  /// ``` -/// use ratelimit::buckets::bucket::Bucket; -/// use twilight_http_ratelimiting::RatelimitHeaders; -/// use std::time::SystemTime; -/// -/// let bucket = Bucket::new(); +/// # use ratelimit::buckets::bucket::Bucket; +/// # use twilight_http_ratelimiting::RatelimitHeaders; +/// # use std::time::SystemTime; +/// # tokio_test::block_on(async {  /// -/// // Feed the headers informations into the bucket to update it -/// let headers = [ -///     ( "x-ratelimit-bucket", "bucket id".as_bytes()), -///     ("x-ratelimit-limit", "100".as_bytes()), -///     ("x-ratelimit-remaining", "0".as_bytes()), -///     ("x-ratelimit-reset", "".as_bytes()), -///     ("x-ratelimit-reset-after", "10.000".as_bytes()), -/// ]; +///     let bucket = Bucket::new();  /// -/// // Parse the headers -/// let present = if let Ok(RatelimitHeaders::Present(present)) = RatelimitHeaders::from_pairs(headers.into_iter()) { present } else { todo!() }; +///     // Feed the headers informations into the bucket to update it +///     let headers = [ +///         ( "x-ratelimit-bucket", "bucket id".as_bytes()), +///         ("x-ratelimit-limit", "100".as_bytes()), +///         ("x-ratelimit-remaining", "0".as_bytes()), +///         ("x-ratelimit-reset", "99999999999999".as_bytes()), +///         ("x-ratelimit-reset-after", "10.000".as_bytes()), +///     ];  /// -/// // this should idealy the time of the request -/// let current_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64; +///     // Parse the headers +///     let present = if let Ok(RatelimitHeaders::Present(present)) +///         = RatelimitHeaders::from_pairs(headers.into_iter()) { +///         present +///     } else { todo!() };  /// -/// bucket.update(present, current_time).await; +///     // this should idealy the time of the request +///     let current_time = SystemTime::now() +///         .duration_since(SystemTime::UNIX_EPOCH) +///         .unwrap() +///         .as_millis() as u64;  /// +///     bucket.update(&present, current_time); +/// # })  /// ```  ///  /// # Async @@ -63,7 +73,7 @@ pub struct Bucket {      /// List of tasks that dequeue tasks from [`Self::queue`]      tasks: Vec<JoinHandle<()>>,      /// Queue of tickets to be processed. -    queue: AsyncQueue, +    queue: AsyncQueue<Sender<()>>,  }  impl Drop for Bucket { @@ -88,7 +98,7 @@ impl Bucket {              queue: AsyncQueue::default(),              remaining: AtomicU64::new(u64::max_value()),              reset_after: AtomicU64::new(u64::max_value()), -            last_update: AtomicInstant::empty(), +            last_update: AtomicInstant::default(),              tasks,          }); @@ -292,7 +302,7 @@ mod tests {              ("x-ratelimit-limit", b"100"),              ("x-ratelimit-remaining", b"0"),              ("x-ratelimit-reset", mreset.as_bytes()), -            ("x-ratelimit-reset-after", b"100.000"), +            ("x-ratelimit-reset-after", b"10.000"),          ];          if let RatelimitHeaders::Present(present) = diff --git a/exes/webhook/src/handler/mod.rs b/exes/webhook/src/handler/mod.rs index ea7ecca..46a4d9e 100644 --- a/exes/webhook/src/handler/mod.rs +++ b/exes/webhook/src/handler/mod.rs @@ -82,11 +82,9 @@ impl WebhookService {              // this should hopefully not fail ?              let data = CachePayload { -                data: DispatchEventTagged { -                    data: DispatchEvent::InteractionCreate(Box::new(InteractionCreate( -                        interaction, -                    ))), -                }, +                data: DispatchEventTagged(DispatchEvent::InteractionCreate(Box::new( +                    InteractionCreate(interaction), +                ))),              };              let payload = serde_json::to_string(&data).unwrap(); diff --git a/libs/leash/src/lib.rs b/libs/leash/src/lib.rs index 37e2b7c..91bfaf5 100644 --- a/libs/leash/src/lib.rs +++ b/libs/leash/src/lib.rs @@ -128,5 +128,7 @@ mod test {          }      } +     +      ignite!(TestComponent);  } diff --git a/libs/shared/src/payloads.rs b/libs/shared/src/payloads.rs index 3f183a9..a4818e6 100644 --- a/libs/shared/src/payloads.rs +++ b/libs/shared/src/payloads.rs @@ -1,4 +1,5 @@  use std::fmt::Debug; +use std::ops::{Deref, DerefMut};  use serde::de::DeserializeSeed;  use serde::Deserializer; @@ -7,10 +8,20 @@ use serde_json::Value;  use tracing::trace_span;  use twilight_model::gateway::event::{DispatchEvent, DispatchEventWithTypeDeserializer}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)]  #[repr(transparent)] -pub struct DispatchEventTagged { -    pub data: DispatchEvent, +pub struct DispatchEventTagged(pub DispatchEvent); + +impl Deref for DispatchEventTagged { +    type Target = DispatchEvent; +    fn deref(&self) -> &Self::Target { +        &self.0 +    } +} +impl DerefMut for DispatchEventTagged { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.0 +    }  }  #[derive(Serialize, Deserialize)] @@ -31,9 +42,7 @@ impl<'de> Deserialize<'de> for DispatchEventTagged {          let tagged = DispatchEventTaggedSerialized::deserialize(deserializer)?;          let deserializer_seed = DispatchEventWithTypeDeserializer::new(&tagged.kind);          let dispatch_event = deserializer_seed.deserialize(tagged.data).unwrap(); -        Ok(Self { -            data: dispatch_event, -        }) +        Ok(Self(dispatch_event))      }  } @@ -43,9 +52,9 @@ impl Serialize for DispatchEventTagged {          S: serde::Serializer,      {          let _s = trace_span!("serializing DispatchEventTagged"); -        let kind = self.data.kind().name().unwrap(); +        let kind = self.0.kind().name().unwrap();          DispatchEventTaggedSerialized { -            data: serde_json::to_value(&self.data).unwrap(), +            data: serde_json::to_value(&self.0).unwrap(),              kind: kind.to_string(),          }          .serialize(serializer) @@ -58,3 +67,40 @@ pub struct CachePayload {      #[serde(flatten)]      pub data: DispatchEventTagged,  } +#[cfg(test)] +mod tests { +    use serde_json::json; +    use twilight_model::gateway::event::DispatchEvent; + +    use super::DispatchEventTagged; + +    #[test] +    fn serialize_event_tagged() { +        let dispatch_event = DispatchEvent::GiftCodeUpdate; + +        let value = serde_json::to_value(&dispatch_event); +        assert!(value.is_ok()); +        let value = value.unwrap(); + +        let kind = value.get("t").and_then(serde_json::Value::as_str); +        assert_eq!(kind, Some("GIFT_CODE_UPDATE")); +    } + +    #[test] +    fn deserialize_event_tagged() { +        let json = json!({ +            "t": "GIFT_CODE_UPDATE", +            "d": {} +        }); + +        let dispatch_event = serde_json::from_value::<DispatchEventTagged>(json); +        assert!(dispatch_event.is_ok()); + +        let dispatch_event_tagged = dispatch_event.unwrap(); + +        assert_eq!( +            DispatchEventTagged(DispatchEvent::GiftCodeUpdate), +            dispatch_event_tagged +        ); +    } +}  | 
