+++ /dev/null
-version: 2.1
-
-orbs:
- win: circleci/windows@2.2.0
-
-commands:
- setup-bazel:
- description: |
- Setup the Bazel build system used for building Android projects
- steps:
- - run:
- name: Install bazel from go
- command: |
- go install github.com/bazelbuild/bazelisk
- - run: docker info
- setup-ci:
- description: |
- Prepare all the requirements for the CI
- steps:
- - checkout
- - setup-bazel
- #- run:
- # name: Docker login
- # command: echo "$GHCR_PASS" | docker login ghcr.io -u "$GHCR_USER" --password-stdin
-jobs:
- build-linux-arm64:
- machine:
- image: ubuntu-2004:202101-01
- resource_class: arm.medium
- steps:
- - setup-ci
- - restore_cache:
- keys:
- - bazel-cache-linux-arm-{{ .Branch }}
- - run:
- name: "Test"
- command: "bazelisk test //:tests || true"
-
- - store_artifacts:
- path: ~/project/bazel-testlogs/*
-
- - store_test_results:
- path: ~/project/bazel-testlogs/rest
- - store_test_results:
- path: ~/project/bazel-testlogs/webhook
-
- - run:
- name: "Build"
- command: "bazelisk build //:packages"
- - run:
- name: "Move artifacts"
- command: |
- mkdir ~/project/artifacts
- mv ~/project/bazel-bin/packages* ~/project/artifacts
- - store_artifacts:
- path: ~/project/artifacts
- # - run:
- # name: Publish docker images
- # command: |
- # bazel run --define docker_repo=ghcr.io --define docker_tag=arm-{{ .Branch }} //:container_publish
-
- build-linux-x86:
- machine:
- image: ubuntu-2004:202010-01
- steps:
- - setup-ci
- - save_cache:
- paths:
- - ~/.cache/bazel
- key: bazel-cache-linux-x86-{{ .Branch }}
- - run:
- name: "Test"
- command: "bazelisk test //:tests || true"
-
- - store_test_results:
- path: ~/project/bazel-testlogs/rest
- - store_test_results:
- path: ~/project/bazel-testlogs/webhook
-
- - store_artifacts:
- path: ~/project/bazel-testlogs/*
-
- - run:
- name: "Build"
- command: "bazelisk build //:packages"
- - run:
- name: "Move artifacts"
- command: |
- mkdir ~/project/artifacts
- mv ~/project/bazel-bin/packages* ~/project/artifacts
- - store_artifacts:
- path: ~/project/artifacts
- # - run:
- # name: Publish docker images
- # command: |
- # bazel run --define docker_repo=ghcr.io --define docker_tag={{ .Branch }} //:container_publish
-
- build-windows:
- executor:
- name: win/default
- shell: powershell.exe
- steps:
- - run: systeminfo
- - checkout
- - run: choco install bazelisk
- - restore_cache:
- keys:
- - bazel-cache-windows-{{ .Branch }}
- - run:
- name: "Test"
- command: |
- bazelisk test //:tests | Out-Null
-
- - store_test_results:
- path: ~/project/bazel-testlogs/rest
- - store_test_results:
- path: ~/project/bazel-testlogs/webhook
- - store_artifacts:
- path: ~/project/bazel-testlogs/*
-
- - run:
- name: "Build"
- command: "bazelisk build //:packages"
- - run:
- name: "Move artifacts"
- command: |
- mkdir ~/project/artifacts
- mv ~/project/bazel-bin/packages* ~/project/artifacts
- - store_artifacts:
- path: ~/project/artifacts
-
-workflows:
- build-workflow:
- jobs:
- - build-linux-x86
- - build-linux-arm64
- - build-windows
- # - build-macos
use crate::{config::Config, ratelimit::Ratelimiter};
+use common::log::debug;
use futures_util::future::TryFutureExt;
use hyper::{
client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client,
let ratelimiter = self.ratelimiter.clone();
return Box::pin(async move {
- match ratelimiter.check(&req).await {
+ match ratelimiter.before_request(&req).await {
Ok(allowed) => match allowed {
- true => {
- Ok(client
- .request(req)
- .map_ok(move |res| {
- if let Some(bucket) = res.headers().get("x-ratelimit-bucket") {
-
- println!("bucket ratelimit! {:?} : {:?}", path, bucket);
- }
- res
- }).await.unwrap())
- },
- false => {
+ crate::ratelimit::RatelimiterResponse::Ratelimited => {
+ debug!("ratelimited");
Ok(Response::builder().body("ratelimited".into()).unwrap())
- },
- },
- Err(_) => {
- Ok(Response::builder().body("server error".into()).unwrap())
+ }
+ _ => {
+ debug!("forwarding request");
+ match client.request(req).await {
+ Ok(response) => {
+ ratelimiter.after_request(&path, &response).await;
+ Ok(response)
+ }
+ Err(e) => Err(e),
+ }
+ }
},
+ Err(e) => Ok(Response::builder()
+ .body(format!("server error: {}", e).into())
+ .unwrap()),
}
});
}
pub fn new(config: Arc<Config>, ratelimiter: Arc<Ratelimiter>) -> Self {
let https = HttpsConnector::new();
let client = Client::builder().build::<_, hyper::Body>(https);
- ServiceProxy { client, config, ratelimiter }
+ ServiceProxy {
+ client,
+ config,
+ ratelimiter,
+ }
}
}
-use common::{error::NovaError, redis_crate::{AsyncCommands, RedisError, aio::Connection}};
-use hyper::{Body, Request};
+use common::{
+ error::NovaError,
+ log::debug,
+ redis_crate::{aio::Connection, AsyncCommands},
+};
+use hyper::{Body, Request, Response};
+use std::{
+ convert::TryInto,
+ sync::Arc,
+ time::{SystemTime, UNIX_EPOCH},
+};
use tokio::sync::Mutex;
-use std::sync::Arc;
use xxhash_rust::xxh32::xxh32;
+pub enum RatelimiterResponse {
+ NoSuchUrl,
+ Ratelimited,
+ Pass,
+}
+
pub struct Ratelimiter {
- redis: Arc<Mutex<Connection>>
+ redis: Arc<Mutex<Connection>>,
}
impl Ratelimiter {
pub fn new(redis: Arc<Mutex<Connection>>) -> Ratelimiter {
- return Ratelimiter {
- redis
- }
+ return Ratelimiter { redis };
}
- pub async fn check(&self,request: &Request<Body>) -> Result<bool, NovaError> {
+ pub async fn before_request(
+ &self,
+ request: &Request<Body>,
+ ) -> Result<RatelimiterResponse, NovaError> {
// we lookup if the route hash is stored in the redis table
let path = request.uri().path();
let hash = xxh32(path.as_bytes(), 32);
- let key = format!("nova:rest:ratelimit:url_store:{}", hash);
let mut redis = self.redis.lock().await;
- let value: Result<String, RedisError> = redis.get(key).await;
- match value {
- Ok(response) => {
- Ok(false)
+ let start = SystemTime::now();
+ let since_the_epoch = start
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards");
+
+ // global rate litmit
+ match redis
+ .get::<String, Option<i32>>(format!(
+ "nova:rest:ratelimit:global:{}",
+ since_the_epoch.as_secs()
+ ))
+ .await
+ {
+ Ok(value) => {
+ match value {
+ Some(value) => {
+ debug!("incr: {}", value);
+ if value >= 49 {
+ return Ok(RatelimiterResponse::Ratelimited);
+ }
+ }
+ None => {
+ let key =
+ format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs());
+ // init global ratelimit
+ redis.set_ex::<String, i32, ()>(key, 0, 2).await.unwrap();
+ }
+ }
+ }
+ Err(_) => {
+ return Err(NovaError::from("internal error"));
+ }
+ };
+
+ // we lookup the corresponding bucket for this url
+ match redis
+ .get::<String, Option<String>>(format!("nova:rest:ratelimit:url_bucket:{}", hash))
+ .await
+ {
+ Ok(bucket) => match bucket {
+ Some(bucket) => {
+ match redis
+ .exists::<String, bool>(format!("nova:rest:ratelimit:lock:{}", bucket))
+ .await
+ {
+ Ok(exists) => {
+ if exists {
+ Ok(RatelimiterResponse::Ratelimited)
+ } else {
+ Ok(RatelimiterResponse::Pass)
+ }
+ }
+ Err(_) => Err(NovaError::from("unable to reach the server")),
+ }
+ }
+ None => Ok(RatelimiterResponse::NoSuchUrl),
},
- Err(error) => Err(NovaError::from("failed to issue redis request")),
+ Err(_) => Err(NovaError::from("internal error")),
+ }
+ }
+
+ fn parse_headers(&self, response: &Response<Body>) -> Option<(String, i32, i32)> {
+ if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") {
+ let bucket = bucket.to_str().unwrap().to_string();
+
+ let remaining = response.headers().get("X-RateLimit-Remaining").unwrap();
+ let reset = response.headers().get("X-RateLimit-Reset-After").unwrap();
+
+ let remaining_i32 = remaining.to_str().unwrap().parse::<i32>().unwrap();
+ let reset_ms_i32 = reset.to_str().unwrap().parse::<f32>().unwrap().ceil() as i32;
+ return Some((bucket, remaining_i32, reset_ms_i32));
+ } else {
+ None
+ }
+ }
+
+ pub async fn after_request(&self, path: &str, response: &Response<Body>) {
+ let hash = xxh32(path.as_bytes(), 32);
+ // verified earlier
+
+ let mut redis = self.redis.lock().await;
+
+ let start = SystemTime::now();
+ let since_the_epoch = start
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards");
+
+ redis
+ .incr::<String, i32, ()>(
+ format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()),
+ 1,
+ )
+ .await
+ .unwrap();
+ if let Some((bucket, remaining, reset)) = self.parse_headers(response) {
+ if remaining <= 1 {
+ // we set a lock for the bucket until the timeout passes
+ redis
+ .set_ex::<String, bool, ()>(
+ format!("nova:rest:ratelimit:lock:{}", bucket),
+ true,
+ reset.try_into().unwrap(),
+ )
+ .await
+ .unwrap();
+ }
+
+ redis
+ .set_ex::<String, String, ()>(
+ format!("nova:rest:ratelimit:url_bucket:{}", hash),
+ bucket,
+ reset.try_into().unwrap(),
+ )
+ .await
+ .unwrap();
}
}
}