summaryrefslogtreecommitdiff
path: root/exes/ratelimit/src/lib.rs
blob: d1bd6e03a27f0b7fe222ba9848e07341fdf3f288 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#![deny(
    clippy::all,
    clippy::correctness,
    clippy::suspicious,
    clippy::style,
    clippy::complexity,
    clippy::perf,
    clippy::pedantic,
    clippy::nursery,
    unsafe_code
)]

use buckets::redis_lock::RedisLock;
use config::Ratelimit;
use grpc::RLServer;
use leash::{AnyhowResultFuture, Component};
use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer;
use redis::aio::MultiplexedConnection;
use shared::config::Settings;
use std::future::Future;
use std::pin::Pin;
use tokio::sync::oneshot;
use tonic::transport::Server;

pub mod buckets;
mod config;
mod grpc;

pub struct RatelimiterServerComponent {}
impl Component for RatelimiterServerComponent {
    type Config = Ratelimit;
    const SERVICE_NAME: &'static str = "ratelimiter";

    fn start(
        &self,
        settings: Settings<Self::Config>,
        stop: oneshot::Receiver<()>,
    ) -> AnyhowResultFuture<()> {
        Box::pin(async move {
            let listening_address = settings.server.listening_adress;
            let redis = Into::<
                Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>> + Send>>,
            >::into(settings.redis)
            .await?;

            let server = RLServer::new(RedisLock::new(redis));

            Server::builder()
                .add_service(RatelimiterServer::new(server))
                .serve_with_shutdown(listening_address, async move {
                    let _ = stop.await;
                })
                .await?;

            Ok(())
        })
    }

    fn new() -> Self {
        Self {}
    }
}