]> git.puffer.fish Git - matthieu/nova.git/commitdiff
base ratelimiter (need to implement it in lua script later)
authorMatthieu <matthieu@developershouse.xyz>
Fri, 15 Oct 2021 13:33:06 +0000 (17:33 +0400)
committerMatthieu <matthieu@developershouse.xyz>
Fri, 15 Oct 2021 13:33:06 +0000 (17:33 +0400)
.circleci/config.yml [deleted file]
config/default.json
rest/src/proxy/mod.rs
rest/src/ratelimit/mod.rs

diff --git a/.circleci/config.yml b/.circleci/config.yml
deleted file mode 100644 (file)
index 2b2ed11..0000000
+++ /dev/null
@@ -1,138 +0,0 @@
-version: 2.1
-
-orbs:
-  win: circleci/windows@2.2.0
-
-commands:
-  setup-bazel:
-    description: |
-      Setup the Bazel build system used for building Android projects
-    steps:
-      - run:
-          name: Install bazel from go
-          command: |
-            go install github.com/bazelbuild/bazelisk
-      - run: docker info
-  setup-ci:
-    description: |
-      Prepare all the requirements for the CI
-    steps:
-      - checkout
-      - setup-bazel
-      #- run:
-      #    name: Docker login
-      #    command: echo "$GHCR_PASS" | docker login ghcr.io -u "$GHCR_USER" --password-stdin
-jobs:
-  build-linux-arm64:
-    machine:
-      image: ubuntu-2004:202101-01
-    resource_class: arm.medium
-    steps:
-      - setup-ci
-      - restore_cache:
-          keys:
-            - bazel-cache-linux-arm-{{ .Branch }}
-      - run:
-          name: "Test"
-          command: "bazelisk test //:tests || true"
-
-      - store_artifacts:
-          path: ~/project/bazel-testlogs/*
-      
-      - store_test_results:
-          path: ~/project/bazel-testlogs/rest
-      - store_test_results:
-          path: ~/project/bazel-testlogs/webhook
-
-      - run:
-          name: "Build"
-          command: "bazelisk build //:packages"
-      - run:
-          name: "Move artifacts"
-          command: |
-            mkdir ~/project/artifacts
-            mv ~/project/bazel-bin/packages* ~/project/artifacts
-      - store_artifacts:
-          path: ~/project/artifacts
-      # - run:
-      #     name: Publish docker images
-      #     command: |
-      #      bazel run --define docker_repo=ghcr.io --define docker_tag=arm-{{ .Branch }} //:container_publish
-
-  build-linux-x86:
-    machine:
-      image: ubuntu-2004:202010-01
-    steps:
-      - setup-ci
-      - save_cache:
-          paths:
-            - ~/.cache/bazel
-          key: bazel-cache-linux-x86-{{ .Branch }}
-      - run:
-          name: "Test"
-          command: "bazelisk test //:tests || true"
-
-      - store_test_results:
-          path: ~/project/bazel-testlogs/rest
-      - store_test_results:
-          path: ~/project/bazel-testlogs/webhook
-
-      - store_artifacts:
-          path: ~/project/bazel-testlogs/*
-      
-      - run:
-          name: "Build"
-          command: "bazelisk build //:packages"
-      - run:
-          name: "Move artifacts"
-          command: |
-            mkdir ~/project/artifacts
-            mv ~/project/bazel-bin/packages* ~/project/artifacts
-      - store_artifacts:
-          path: ~/project/artifacts
-      # - run:
-      #     name: Publish docker images
-      #     command: |
-      #       bazel run --define docker_repo=ghcr.io --define docker_tag={{ .Branch }} //:container_publish
-
-  build-windows:
-    executor:
-      name: win/default
-      shell: powershell.exe
-    steps:
-      - run: systeminfo
-      - checkout
-      - run: choco install bazelisk
-      - restore_cache:
-          keys:
-            - bazel-cache-windows-{{ .Branch }}
-      - run:
-          name: "Test"
-          command: |
-            bazelisk test //:tests | Out-Null
-
-      - store_test_results:
-          path: ~/project/bazel-testlogs/rest
-      - store_test_results:
-          path: ~/project/bazel-testlogs/webhook
-      - store_artifacts:
-          path: ~/project/bazel-testlogs/*
-
-      - run:
-          name: "Build"
-          command: "bazelisk build //:packages"
-      - run:
-          name: "Move artifacts"
-          command: |
-            mkdir ~/project/artifacts
-            mv ~/project/bazel-bin/packages* ~/project/artifacts
-      - store_artifacts:
-          path: ~/project/artifacts
-
-workflows:
-  build-workflow:
-    jobs:
-      - build-linux-x86
-      - build-linux-arm64
-      - build-windows
-      # - build-macos
index d30823d432c221ec7522283509a1de6863a1bad8..70d00d314e7e2d72544e205ad8cd6499ba33daf5 100644 (file)
@@ -13,5 +13,8 @@
         "discord": {
             "token": "<your token>"
         }
+    },
+    "redis": {
+        "url": "redis://localhost"
     }
 }
\ No newline at end of file
index ad1abbae3167c4b364023f38035e346b2030a9d2..dc0be03e39146b90688b7c59f2a5cf5a76366d01 100644 (file)
@@ -1,4 +1,5 @@
 use crate::{config::Config, ratelimit::Ratelimiter};
+use common::log::debug;
 use futures_util::future::TryFutureExt;
 use hyper::{
     client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client,
@@ -59,26 +60,26 @@ impl Service<Request<Body>> for ServiceProxy {
         let ratelimiter = self.ratelimiter.clone();
 
         return Box::pin(async move {
-            match ratelimiter.check(&req).await {
+            match ratelimiter.before_request(&req).await {
                 Ok(allowed) => match allowed {
-                    true => {
-                        Ok(client
-                        .request(req)
-                        .map_ok(move |res| {
-                            if let Some(bucket) = res.headers().get("x-ratelimit-bucket") {
-                                
-                                println!("bucket ratelimit! {:?} : {:?}", path, bucket);
-                            }
-                            res
-                        }).await.unwrap())
-                    },
-                    false => {
+                    crate::ratelimit::RatelimiterResponse::Ratelimited => {
+                        debug!("ratelimited");
                         Ok(Response::builder().body("ratelimited".into()).unwrap())
-                    },
-                },
-                Err(_) => {
-                    Ok(Response::builder().body("server error".into()).unwrap())
+                    }
+                    _ => {
+                        debug!("forwarding request");
+                        match client.request(req).await {
+                            Ok(response) => {
+                                ratelimiter.after_request(&path, &response).await;
+                                Ok(response)
+                            }
+                            Err(e) => Err(e),
+                        }
+                    }
                 },
+                Err(e) => Ok(Response::builder()
+                    .body(format!("server error: {}", e).into())
+                    .unwrap()),
             }
         });
     }
@@ -88,6 +89,10 @@ impl ServiceProxy {
     pub fn new(config: Arc<Config>, ratelimiter: Arc<Ratelimiter>) -> Self {
         let https = HttpsConnector::new();
         let client = Client::builder().build::<_, hyper::Body>(https);
-        ServiceProxy { client, config, ratelimiter }
+        ServiceProxy {
+            client,
+            config,
+            ratelimiter,
+        }
     }
 }
index 07db64395f370aa0c4018f8b6490e5bf4324735e..86fc7afff4d4472f5cb0cca0d41ae63a58bfdb81 100644 (file)
-use common::{error::NovaError, redis_crate::{AsyncCommands, RedisError, aio::Connection}};
-use hyper::{Body, Request};
+use common::{
+    error::NovaError,
+    log::debug,
+    redis_crate::{aio::Connection, AsyncCommands},
+};
+use hyper::{Body, Request, Response};
+use std::{
+    convert::TryInto,
+    sync::Arc,
+    time::{SystemTime, UNIX_EPOCH},
+};
 use tokio::sync::Mutex;
-use std::sync::Arc;
 use xxhash_rust::xxh32::xxh32;
 
+pub enum RatelimiterResponse {
+    NoSuchUrl,
+    Ratelimited,
+    Pass,
+}
+
 pub struct Ratelimiter {
-    redis: Arc<Mutex<Connection>>
+    redis: Arc<Mutex<Connection>>,
 }
 
 impl Ratelimiter {
     pub fn new(redis: Arc<Mutex<Connection>>) -> Ratelimiter {
-        return Ratelimiter {
-            redis
-        }
+        return Ratelimiter { redis };
     }
 
-    pub async fn check(&self,request: &Request<Body>) -> Result<bool, NovaError> {
+    pub async fn before_request(
+        &self,
+        request: &Request<Body>,
+    ) -> Result<RatelimiterResponse, NovaError> {
         // we lookup if the route hash is stored in the redis table
         let path = request.uri().path();
         let hash = xxh32(path.as_bytes(), 32);
-        let key = format!("nova:rest:ratelimit:url_store:{}", hash);
         let mut redis = self.redis.lock().await;
-        let value: Result<String, RedisError> = redis.get(key).await;
 
-        match value {
-            Ok(response) => {
-                Ok(false)
+        let start = SystemTime::now();
+        let since_the_epoch = start
+            .duration_since(UNIX_EPOCH)
+            .expect("Time went backwards");
+
+        // global rate litmit
+        match redis
+            .get::<String, Option<i32>>(format!(
+                "nova:rest:ratelimit:global:{}",
+                since_the_epoch.as_secs()
+            ))
+            .await
+        {
+            Ok(value) => {
+                match value {
+                    Some(value) => {
+                        debug!("incr: {}", value);
+                        if value >= 49 {
+                            return Ok(RatelimiterResponse::Ratelimited);
+                        }
+                    }
+                    None => {
+                        let key =
+                            format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs());
+                        // init global ratelimit
+                        redis.set_ex::<String, i32, ()>(key, 0, 2).await.unwrap();
+                    }
+                }
+            }
+            Err(_) => {
+                return Err(NovaError::from("internal error"));
+            }
+        };
+
+        // we lookup the corresponding bucket for this url
+        match redis
+            .get::<String, Option<String>>(format!("nova:rest:ratelimit:url_bucket:{}", hash))
+            .await
+        {
+            Ok(bucket) => match bucket {
+                Some(bucket) => {
+                    match redis
+                        .exists::<String, bool>(format!("nova:rest:ratelimit:lock:{}", bucket))
+                        .await
+                    {
+                        Ok(exists) => {
+                            if exists {
+                                Ok(RatelimiterResponse::Ratelimited)
+                            } else {
+                                Ok(RatelimiterResponse::Pass)
+                            }
+                        }
+                        Err(_) => Err(NovaError::from("unable to reach the server")),
+                    }
+                }
+                None => Ok(RatelimiterResponse::NoSuchUrl),
             },
-            Err(error) => Err(NovaError::from("failed to issue redis request")),
+            Err(_) => Err(NovaError::from("internal error")),
+        }
+    }
+
+    fn parse_headers(&self, response: &Response<Body>) -> Option<(String, i32, i32)> {
+        if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") {
+            let bucket = bucket.to_str().unwrap().to_string();
+
+            let remaining = response.headers().get("X-RateLimit-Remaining").unwrap();
+            let reset = response.headers().get("X-RateLimit-Reset-After").unwrap();
+
+            let remaining_i32 = remaining.to_str().unwrap().parse::<i32>().unwrap();
+            let reset_ms_i32 = reset.to_str().unwrap().parse::<f32>().unwrap().ceil() as i32;
+            return Some((bucket, remaining_i32, reset_ms_i32));
+        } else {
+            None
+        }
+    }
+
+    pub async fn after_request(&self, path: &str, response: &Response<Body>) {
+        let hash = xxh32(path.as_bytes(), 32);
+        // verified earlier
+
+        let mut redis = self.redis.lock().await;
+
+        let start = SystemTime::now();
+        let since_the_epoch = start
+            .duration_since(UNIX_EPOCH)
+            .expect("Time went backwards");
+
+        redis
+            .incr::<String, i32, ()>(
+                format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()),
+                1,
+            )
+            .await
+            .unwrap();
+        if let Some((bucket, remaining, reset)) = self.parse_headers(response) {
+            if remaining <= 1 {
+                // we set a lock for the bucket until the timeout passes
+                redis
+                    .set_ex::<String, bool, ()>(
+                        format!("nova:rest:ratelimit:lock:{}", bucket),
+                        true,
+                        reset.try_into().unwrap(),
+                    )
+                    .await
+                    .unwrap();
+            }
+
+            redis
+                .set_ex::<String, String, ()>(
+                    format!("nova:rest:ratelimit:url_bucket:{}", hash),
+                    bucket,
+                    reset.try_into().unwrap(),
+                )
+                .await
+                .unwrap();
         }
     }
 }