summaryrefslogtreecommitdiff
path: root/exes/ratelimit/src/buckets/redis_lock.rs
diff options
context:
space:
mode:
Diffstat (limited to 'exes/ratelimit/src/buckets/redis_lock.rs')
-rw-r--r--exes/ratelimit/src/buckets/redis_lock.rs131
1 files changed, 77 insertions, 54 deletions
diff --git a/exes/ratelimit/src/buckets/redis_lock.rs b/exes/ratelimit/src/buckets/redis_lock.rs
index fdae149..333d726 100644
--- a/exes/ratelimit/src/buckets/redis_lock.rs
+++ b/exes/ratelimit/src/buckets/redis_lock.rs
@@ -1,4 +1,6 @@
use std::{
+ future::Future,
+ pin::Pin,
sync::{atomic::AtomicU64, Arc},
time::{Duration, SystemTime},
};
@@ -7,7 +9,9 @@ use redis::{aio::MultiplexedConnection, AsyncCommands};
use tokio::sync::Mutex;
use tracing::debug;
-/// This is flawed and needs to be replaced sometime with the real RedisLock algorithm
+use super::GlobalLock;
+
+/// This is flawed and needs to be replaced sometime with the real `RedisLock` algorithm
#[derive(Debug)]
pub struct RedisLock {
redis: Mutex<MultiplexedConnection>,
@@ -15,70 +19,89 @@ pub struct RedisLock {
}
impl RedisLock {
- /// Set the global ratelimit as exhausted.
- pub async fn lock_for(self: &Arc<Self>, duration: Duration) {
- debug!("locking globally for {}", duration.as_secs());
- let _: () = self
- .redis
- .lock()
- .await
- .set_ex(
- "nova:rls:lock",
- 1,
- (duration.as_secs() + 1).try_into().unwrap(),
- )
- .await
- .unwrap();
-
- self.is_locked.store(
- (SystemTime::now() + duration)
- .duration_since(SystemTime::UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64,
- std::sync::atomic::Ordering::Relaxed,
- );
+ #[must_use]
+ pub fn new(redis: MultiplexedConnection) -> Arc<Self> {
+ Arc::new(Self {
+ redis: Mutex::new(redis),
+ is_locked: AtomicU64::new(0),
+ })
}
+}
+
+impl GlobalLock for RedisLock {
+ fn lock_for<'a>(
+ self: &'a Arc<Self>,
+ duration: Duration,
+ ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
+ Box::pin(async move {
+ debug!("locking globally for {}", duration.as_secs());
+ let _: () = self
+ .redis
+ .lock()
+ .await
+ .set_ex(
+ "nova:rls:lock",
+ 1,
+ (duration.as_secs() + 1).try_into().unwrap(),
+ )
+ .await
+ .unwrap();
- pub async fn locked_for(self: &Arc<Self>) -> Option<Duration> {
- let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed);
- if load != 0 {
- if load
- > SystemTime::now()
+ // Integer truncating is expected
+ #[allow(clippy::cast_possible_truncation)]
+ self.is_locked.store(
+ (SystemTime::now() + duration)
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
- .as_millis() as u64
- {
- return Some(Duration::from_millis(load));
- } else {
+ .as_millis() as u64,
+ std::sync::atomic::Ordering::Relaxed,
+ );
+ })
+ }
+
+ fn is_locked<'a>(
+ self: &'a Arc<Self>,
+ ) -> Pin<Box<dyn Future<Output = Option<Duration>> + Send + 'a>> {
+ Box::pin(async move {
+ let load = self.is_locked.load(std::sync::atomic::Ordering::Relaxed);
+ if load != 0 {
+ // Integer truncating is expected
+ #[allow(clippy::cast_possible_truncation)]
+ if load
+ > SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64
+ {
+ return Some(Duration::from_millis(load));
+ }
self.is_locked
.store(0, std::sync::atomic::Ordering::Relaxed);
}
- }
- let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await;
- match result {
- Ok(remaining_time) => {
- if remaining_time > 0 {
- let duration = Duration::from_secs(remaining_time as u64);
- debug!("external global lock detected, locking");
- self.lock_for(duration).await;
- Some(duration)
- } else {
- None
+ let result = self.redis.lock().await.ttl::<_, i64>("nova:rls:lock").await;
+ match result {
+ Ok(remaining_time) => {
+ if remaining_time > 0 {
+ // Sign loss is allowed since we know it's a positive number
+ // because a ttl is always positive when the key exists and have a ttl
+ // otherwise redis *will* return a negative number, hence the check for
+ // a positive sign.
+ #[allow(clippy::cast_sign_loss)]
+ let duration = Duration::from_secs(remaining_time as u64);
+ debug!("external global lock detected, locking");
+ self.lock_for(duration).await;
+ Some(duration)
+ } else {
+ None
+ }
}
- }
- Err(error) => {
- debug!("redis call failed: {}", error);
+ Err(error) => {
+ debug!("redis call failed: {}", error);
- None
+ None
+ }
}
- }
- }
-
- pub fn new(redis: MultiplexedConnection) -> Arc<Self> {
- Arc::new(Self {
- redis: Mutex::new(redis),
- is_locked: AtomicU64::new(0),
})
}
}