From 8e959936dce532d5d61845a716b6d962a3d6c265 Mon Sep 17 00:00:00 2001 From: Matthieu Date: Fri, 15 Oct 2021 17:33:06 +0400 Subject: [PATCH] base ratelimiter (need to implement it in lua script later) --- .circleci/config.yml | 138 ---------------------------------- config/default.json | 3 + rest/src/proxy/mod.rs | 41 ++++++----- rest/src/ratelimit/mod.rs | 151 ++++++++++++++++++++++++++++++++++---- 4 files changed, 163 insertions(+), 170 deletions(-) delete mode 100644 .circleci/config.yml diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index 2b2ed11..0000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,138 +0,0 @@ -version: 2.1 - -orbs: - win: circleci/windows@2.2.0 - -commands: - setup-bazel: - description: | - Setup the Bazel build system used for building Android projects - steps: - - run: - name: Install bazel from go - command: | - go install github.com/bazelbuild/bazelisk - - run: docker info - setup-ci: - description: | - Prepare all the requirements for the CI - steps: - - checkout - - setup-bazel - #- run: - # name: Docker login - # command: echo "$GHCR_PASS" | docker login ghcr.io -u "$GHCR_USER" --password-stdin -jobs: - build-linux-arm64: - machine: - image: ubuntu-2004:202101-01 - resource_class: arm.medium - steps: - - setup-ci - - restore_cache: - keys: - - bazel-cache-linux-arm-{{ .Branch }} - - run: - name: "Test" - command: "bazelisk test //:tests || true" - - - store_artifacts: - path: ~/project/bazel-testlogs/* - - - store_test_results: - path: ~/project/bazel-testlogs/rest - - store_test_results: - path: ~/project/bazel-testlogs/webhook - - - run: - name: "Build" - command: "bazelisk build //:packages" - - run: - name: "Move artifacts" - command: | - mkdir ~/project/artifacts - mv ~/project/bazel-bin/packages* ~/project/artifacts - - store_artifacts: - path: ~/project/artifacts - # - run: - # name: Publish docker images - # command: | - # bazel run --define docker_repo=ghcr.io --define docker_tag=arm-{{ .Branch }} //:container_publish - - build-linux-x86: - machine: - image: ubuntu-2004:202010-01 - steps: - - setup-ci - - save_cache: - paths: - - ~/.cache/bazel - key: bazel-cache-linux-x86-{{ .Branch }} - - run: - name: "Test" - command: "bazelisk test //:tests || true" - - - store_test_results: - path: ~/project/bazel-testlogs/rest - - store_test_results: - path: ~/project/bazel-testlogs/webhook - - - store_artifacts: - path: ~/project/bazel-testlogs/* - - - run: - name: "Build" - command: "bazelisk build //:packages" - - run: - name: "Move artifacts" - command: | - mkdir ~/project/artifacts - mv ~/project/bazel-bin/packages* ~/project/artifacts - - store_artifacts: - path: ~/project/artifacts - # - run: - # name: Publish docker images - # command: | - # bazel run --define docker_repo=ghcr.io --define docker_tag={{ .Branch }} //:container_publish - - build-windows: - executor: - name: win/default - shell: powershell.exe - steps: - - run: systeminfo - - checkout - - run: choco install bazelisk - - restore_cache: - keys: - - bazel-cache-windows-{{ .Branch }} - - run: - name: "Test" - command: | - bazelisk test //:tests | Out-Null - - - store_test_results: - path: ~/project/bazel-testlogs/rest - - store_test_results: - path: ~/project/bazel-testlogs/webhook - - store_artifacts: - path: ~/project/bazel-testlogs/* - - - run: - name: "Build" - command: "bazelisk build //:packages" - - run: - name: "Move artifacts" - command: | - mkdir ~/project/artifacts - mv ~/project/bazel-bin/packages* ~/project/artifacts - - store_artifacts: - path: ~/project/artifacts - -workflows: - build-workflow: - jobs: - - build-linux-x86 - - build-linux-arm64 - - build-windows - # - build-macos diff --git a/config/default.json b/config/default.json index d30823d..70d00d3 100644 --- a/config/default.json +++ b/config/default.json @@ -13,5 +13,8 @@ "discord": { "token": "" } + }, + "redis": { + "url": "redis://localhost" } } \ No newline at end of file diff --git a/rest/src/proxy/mod.rs b/rest/src/proxy/mod.rs index ad1abba..dc0be03 100644 --- a/rest/src/proxy/mod.rs +++ b/rest/src/proxy/mod.rs @@ -1,4 +1,5 @@ use crate::{config::Config, ratelimit::Ratelimiter}; +use common::log::debug; use futures_util::future::TryFutureExt; use hyper::{ client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client, @@ -59,26 +60,26 @@ impl Service> for ServiceProxy { let ratelimiter = self.ratelimiter.clone(); return Box::pin(async move { - match ratelimiter.check(&req).await { + match ratelimiter.before_request(&req).await { Ok(allowed) => match allowed { - true => { - Ok(client - .request(req) - .map_ok(move |res| { - if let Some(bucket) = res.headers().get("x-ratelimit-bucket") { - - println!("bucket ratelimit! {:?} : {:?}", path, bucket); - } - res - }).await.unwrap()) - }, - false => { + crate::ratelimit::RatelimiterResponse::Ratelimited => { + debug!("ratelimited"); Ok(Response::builder().body("ratelimited".into()).unwrap()) - }, - }, - Err(_) => { - Ok(Response::builder().body("server error".into()).unwrap()) + } + _ => { + debug!("forwarding request"); + match client.request(req).await { + Ok(response) => { + ratelimiter.after_request(&path, &response).await; + Ok(response) + } + Err(e) => Err(e), + } + } }, + Err(e) => Ok(Response::builder() + .body(format!("server error: {}", e).into()) + .unwrap()), } }); } @@ -88,6 +89,10 @@ impl ServiceProxy { pub fn new(config: Arc, ratelimiter: Arc) -> Self { let https = HttpsConnector::new(); let client = Client::builder().build::<_, hyper::Body>(https); - ServiceProxy { client, config, ratelimiter } + ServiceProxy { + client, + config, + ratelimiter, + } } } diff --git a/rest/src/ratelimit/mod.rs b/rest/src/ratelimit/mod.rs index 07db643..86fc7af 100644 --- a/rest/src/ratelimit/mod.rs +++ b/rest/src/ratelimit/mod.rs @@ -1,33 +1,156 @@ -use common::{error::NovaError, redis_crate::{AsyncCommands, RedisError, aio::Connection}}; -use hyper::{Body, Request}; +use common::{ + error::NovaError, + log::debug, + redis_crate::{aio::Connection, AsyncCommands}, +}; +use hyper::{Body, Request, Response}; +use std::{ + convert::TryInto, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; use tokio::sync::Mutex; -use std::sync::Arc; use xxhash_rust::xxh32::xxh32; +pub enum RatelimiterResponse { + NoSuchUrl, + Ratelimited, + Pass, +} + pub struct Ratelimiter { - redis: Arc> + redis: Arc>, } impl Ratelimiter { pub fn new(redis: Arc>) -> Ratelimiter { - return Ratelimiter { - redis - } + return Ratelimiter { redis }; } - pub async fn check(&self,request: &Request) -> Result { + pub async fn before_request( + &self, + request: &Request, + ) -> Result { // we lookup if the route hash is stored in the redis table let path = request.uri().path(); let hash = xxh32(path.as_bytes(), 32); - let key = format!("nova:rest:ratelimit:url_store:{}", hash); let mut redis = self.redis.lock().await; - let value: Result = redis.get(key).await; - match value { - Ok(response) => { - Ok(false) + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + + // global rate litmit + match redis + .get::>(format!( + "nova:rest:ratelimit:global:{}", + since_the_epoch.as_secs() + )) + .await + { + Ok(value) => { + match value { + Some(value) => { + debug!("incr: {}", value); + if value >= 49 { + return Ok(RatelimiterResponse::Ratelimited); + } + } + None => { + let key = + format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()); + // init global ratelimit + redis.set_ex::(key, 0, 2).await.unwrap(); + } + } + } + Err(_) => { + return Err(NovaError::from("internal error")); + } + }; + + // we lookup the corresponding bucket for this url + match redis + .get::>(format!("nova:rest:ratelimit:url_bucket:{}", hash)) + .await + { + Ok(bucket) => match bucket { + Some(bucket) => { + match redis + .exists::(format!("nova:rest:ratelimit:lock:{}", bucket)) + .await + { + Ok(exists) => { + if exists { + Ok(RatelimiterResponse::Ratelimited) + } else { + Ok(RatelimiterResponse::Pass) + } + } + Err(_) => Err(NovaError::from("unable to reach the server")), + } + } + None => Ok(RatelimiterResponse::NoSuchUrl), }, - Err(error) => Err(NovaError::from("failed to issue redis request")), + Err(_) => Err(NovaError::from("internal error")), + } + } + + fn parse_headers(&self, response: &Response) -> Option<(String, i32, i32)> { + if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") { + let bucket = bucket.to_str().unwrap().to_string(); + + let remaining = response.headers().get("X-RateLimit-Remaining").unwrap(); + let reset = response.headers().get("X-RateLimit-Reset-After").unwrap(); + + let remaining_i32 = remaining.to_str().unwrap().parse::().unwrap(); + let reset_ms_i32 = reset.to_str().unwrap().parse::().unwrap().ceil() as i32; + return Some((bucket, remaining_i32, reset_ms_i32)); + } else { + None + } + } + + pub async fn after_request(&self, path: &str, response: &Response) { + let hash = xxh32(path.as_bytes(), 32); + // verified earlier + + let mut redis = self.redis.lock().await; + + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + + redis + .incr::( + format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()), + 1, + ) + .await + .unwrap(); + if let Some((bucket, remaining, reset)) = self.parse_headers(response) { + if remaining <= 1 { + // we set a lock for the bucket until the timeout passes + redis + .set_ex::( + format!("nova:rest:ratelimit:lock:{}", bucket), + true, + reset.try_into().unwrap(), + ) + .await + .unwrap(); + } + + redis + .set_ex::( + format!("nova:rest:ratelimit:url_bucket:{}", hash), + bucket, + reset.try_into().unwrap(), + ) + .await + .unwrap(); } } } -- 2.39.5