diff options
Diffstat (limited to 'exes/ratelimit/src/buckets/async_queue.rs')
| -rw-r--r-- | exes/ratelimit/src/buckets/async_queue.rs | 57 |
1 files changed, 45 insertions, 12 deletions
diff --git a/exes/ratelimit/src/buckets/async_queue.rs b/exes/ratelimit/src/buckets/async_queue.rs index 70b4ebd..6ae51c8 100644 --- a/exes/ratelimit/src/buckets/async_queue.rs +++ b/exes/ratelimit/src/buckets/async_queue.rs @@ -1,33 +1,46 @@ use tokio::sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, - oneshot::Sender, Mutex, }; -/// Queue of ratelimit requests for a bucket. +/// Simple async fifo (first in fist out) queue based on unbounded channels +/// +/// # Usage +/// ``` +/// # use ratelimit::buckets::async_queue::AsyncQueue; +/// # tokio_test::block_on(async { +/// let queue = AsyncQueue::<i64>::default(); +/// // Pushing into the queue is syncronous +/// queue.push(123); +/// +/// // Popping from the queue is asyncronous +/// let value = queue.pop().await; +/// +/// // Our value should be the same! +/// assert_eq!(value, Some(123)); +/// # }); +/// ``` #[derive(Debug)] -pub struct AsyncQueue { - /// Receiver for the ratelimit requests. - rx: Mutex<UnboundedReceiver<Sender<()>>>, - /// Sender for the ratelimit requests. - tx: UnboundedSender<Sender<()>>, +pub struct AsyncQueue<T: Send> { + rx: Mutex<UnboundedReceiver<T>>, + tx: UnboundedSender<T>, } -impl AsyncQueue { - /// Add a new ratelimit request to the queue. - pub fn push(&self, tx: Sender<()>) { +impl<T: Send> AsyncQueue<T> { + /// Add a new item to the queue + pub fn push(&self, tx: T) { let _sent = self.tx.send(tx); } /// Receive the first incoming ratelimit request. - pub async fn pop(&self) -> Option<Sender<()>> { + pub async fn pop(&self) -> Option<T> { let mut rx = self.rx.lock().await; rx.recv().await } } -impl Default for AsyncQueue { +impl<T: Send> Default for AsyncQueue<T> { fn default() -> Self { let (tx, rx) = mpsc::unbounded_channel(); @@ -37,3 +50,23 @@ impl Default for AsyncQueue { } } } + +#[cfg(test)] +mod tests { + use crate::buckets::async_queue::AsyncQueue; + + #[test_log::test(tokio::test)] + async fn should_queue_dequeue_fifo() { + let queue = AsyncQueue::<i64>::default(); + + // queue data + for i in 0..2_000_000 { + queue.push(i); + } + + for i in 0..2_000_000 { + let result = queue.pop().await.unwrap(); + assert_eq!(i, result); + } + } +} |
