..Default::default()
};
- cbindgen::generate_with_config(&crate_dir, config)
+ cbindgen::generate_with_config(crate_dir, config)
.unwrap()
- .write_to_file(&output_file);
+ .write_to_file(output_file);
}
+#![allow(clippy::missing_safety_doc)]
+
extern crate libc;
use anyhow::Result;
use config::{Config, Environment, File};
-use std::net::ToSocketAddrs;
-
use futures_util::FutureExt;
use grpc::RLServer;
use leash::{AnyhowResultFuture, Component};
use proto::nova::ratelimit::ratelimiter::ratelimiter_server::RatelimiterServer;
use redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter;
-use shared::{config::Settings, redis_crate::Client};
+use shared::{config::Settings, redis_crate::aio::MultiplexedConnection};
+use std::future::Future;
+use std::{net::ToSocketAddrs, pin::Pin};
use tokio::sync::oneshot;
use tonic::transport::Server;
stop: oneshot::Receiver<()>,
) -> AnyhowResultFuture<()> {
Box::pin(async move {
- // let config = Arc::new(settings.config);
- let redis: Client = settings.redis.into();
- let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis.into()));
+ let redis = Into::<
+ Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>> + Send>>,
+ >::into(settings.redis)
+ .await?;
+
+ let server = RLServer::new(RedisGlobalLocalBucketRatelimiter::new(redis));
Server::builder()
.add_service(RatelimiterServer::new(server))
use self::bucket::{Bucket, BucketQueueTask};
-use shared::redis_crate::{Client, Commands};
+use shared::redis_crate::aio::MultiplexedConnection;
+use shared::redis_crate::{AsyncCommands};
+use tokio::sync::Mutex;
use twilight_http_ratelimiting::ticket::{self, TicketNotifier};
use twilight_http_ratelimiting::GetTicketFuture;
mod bucket;
use futures_util::future;
use std::{
collections::hash_map::{Entry, HashMap},
- sync::{Arc, Mutex},
+ sync::Arc,
time::Duration,
};
#[derive(Debug)]
-struct RedisLockPair(tokio::sync::Mutex<Client>);
+struct RedisLockPair(Mutex<MultiplexedConnection>);
impl RedisLockPair {
/// Set the global ratelimit as exhausted.
1,
(duration.as_secs() + 1).try_into().unwrap(),
)
+ .await
.unwrap();
}
pub async fn is_locked(&self) -> bool {
- self.0.lock().await.exists("nova:rls:lock").unwrap()
+ self.0.lock().await.exists("nova:rls:lock").await.unwrap()
}
}
#[derive(Clone, Debug)]
pub struct RedisGlobalLocalBucketRatelimiter {
- buckets: Arc<Mutex<HashMap<String, Arc<Bucket>>>>,
+ buckets: Arc<std::sync::Mutex<HashMap<String, Arc<Bucket>>>>,
global: Arc<RedisLockPair>,
}
impl RedisGlobalLocalBucketRatelimiter {
#[must_use]
- pub fn new(redis: tokio::sync::Mutex<Client>) -> Self {
+ pub fn new(redis: MultiplexedConnection) -> Self {
Self {
buckets: Arc::default(),
- global: Arc::new(RedisLockPair(redis)),
+ global: Arc::new(RedisLockPair(Mutex::new(redis))),
}
}
};
let request_path = request.uri().path();
- let (api_path, trimmed_path) = normalize_path(&request_path);
+ let (api_path, trimmed_path) = normalize_path(request_path);
let mut uri_string = format!("https://discord.com{}{}", api_path, trimmed_path);
if let Some(query) = request.uri().query() {
}
}
+type IssueTicket = Pin<
+ Box<
+ dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>>
+ + Send
+ + 'static,
+ >,
+>;
+
impl RemoteRatelimiter {
async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> {
// get list of dns responses
/*let responses = dns_lookup::lookup_host("localhost")
- .unwrap()
- .into_iter()
- .map(|f| f.to_string());*/
+ .unwrap()
+ .into_iter()
+ .map(|f| f.to_string());*/
let mut write = self.remotes.write().await;
write.add(a.clone());
}
- return Ok(());
+ Ok(())
}
#[must_use]
obj
}
- pub fn ticket(
- &self,
- path: String,
- ) -> Pin<
- Box<
- dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>>
- + Send
- + 'static,
- >,
- > {
+ pub fn ticket(&self, path: String) -> IssueTicket {
let remotes = self.remotes.clone();
let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
D: Deserializer<'de>,
{
let str = String::deserialize(deserializer)?;
- let public_key = PublicKey::from_bytes(&hex::decode(&str).unwrap()).unwrap();
+ let public_key = PublicKey::from_bytes(&hex::decode(str).unwrap()).unwrap();
Ok(public_key)
}
}
}
-impl Into<Response<Body>> for WebhookError {
- fn into(self) -> Response<Body> {
+impl From<WebhookError> for Response<Body> {
+ fn from(value: WebhookError) -> Self {
Response::builder()
- .status(self.code)
- .body(self.message.into())
+ .status(value.code)
+ .body(value.message.into())
.unwrap()
}
}
+++ /dev/null
-use super::error::WebhookError;
-use super::signature::validate_signature;
-use crate::config::WebhookConfig;
-use ed25519_dalek::PublicKey;
-use hyper::{
- body::{to_bytes, Bytes},
- service::Service,
- Body, Method, Request, Response, StatusCode,
-};
-use shared::nats_crate::Client;
-use shared::{
- log::{debug, error},
- payloads::{CachePayload, DispatchEventTagged, Tracing},
-};
-use std::{
- future::Future,
- pin::Pin,
- str::from_utf8,
- task::{Context, Poll},
-};
-use twilight_model::gateway::event::DispatchEvent;
-use twilight_model::{
- application::interaction::{Interaction, InteractionType},
- gateway::payload::incoming::InteractionCreate,
-};
-
-/// Hyper service used to handle the discord webhooks
-#[derive(Clone)]
-pub struct WebhookService {
- pub config: WebhookConfig,
- pub nats: Client,
-}
-
-impl WebhookService {
- async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {
- if req.method() == Method::POST {
- let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {
- sig.to_owned()
- } else {
- return Err(WebhookError::new(
- StatusCode::BAD_REQUEST,
- "missing signature header",
- ));
- };
-
- let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") {
- timestamp.to_owned()
- } else {
- return Err(WebhookError::new(
- StatusCode::BAD_REQUEST,
- "missing timestamp header",
- ));
- };
- let data = to_bytes(req.into_body()).await?;
-
- if validate_signature(
- &pk,
- &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),
- signature.to_str()?,
- ) {
- Ok(data)
- } else {
- Err(WebhookError::new(
- StatusCode::UNAUTHORIZED,
- "invalid signature",
- ))
- }
- } else {
- Err(WebhookError::new(StatusCode::NOT_FOUND, "not found"))
- }
- }
-
- async fn process_request(
- req: Request<Body>,
- nats: Client,
- pk: PublicKey,
- ) -> Result<Response<Body>, WebhookError> {
- match Self::check_request(req, pk).await {
- Ok(data) => {
- let utf8 = from_utf8(&data);
- match utf8 {
- Ok(data) => match serde_json::from_str::<Interaction>(data) {
- Ok(value) => {
- match value.kind {
- InteractionType::Ping => Ok(Response::builder()
- .header("Content-Type", "application/json")
- .body(r#"{"type":1}"#.into())
- .unwrap()),
- _ => {
- debug!("calling nats");
- // this should hopefully not fail ?
-
- let data = CachePayload {
- tracing: Tracing {
- node_id: "".to_string(),
- span: None,
- },
- data: DispatchEventTagged {
- data: DispatchEvent::InteractionCreate(Box::new(
- InteractionCreate(value),
- )),
- },
- };
-
- let payload = serde_json::to_string(&data).unwrap();
-
- match nats
- .request(
- "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
- Bytes::from(payload),
- )
- .await
- {
- Ok(response) => Ok(Response::builder()
- .header("Content-Type", "application/json")
- .body(Body::from(response.payload))
- .unwrap()),
-
- Err(error) => {
- error!("failed to request nats: {}", error);
- Err(WebhookError::new(
- StatusCode::INTERNAL_SERVER_ERROR,
- "failed to request nats",
- ))
- }
- }
- }
- }
- }
-
- Err(error) => {
- error!("invalid json body: {}", error);
- Err(WebhookError::new(
- StatusCode::BAD_REQUEST,
- "invalid json body",
- ))
- }
- },
-
- Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")),
- }
- }
- Err(error) => Err(error),
- }
- }
-}
-
-/// Implementation of the service
-impl Service<hyper::Request<Body>> for WebhookService {
- type Response = hyper::Response<Body>;
- type Error = hyper::Error;
- type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
-
- fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn call(&mut self, req: Request<Body>) -> Self::Future {
- let future = Self::process_request(req, self.nats.clone(), self.config.discord.public_key);
- Box::pin(async move {
- let response = future.await;
-
- match response {
- Ok(r) => Ok(r),
- Err(e) => Ok(e.into()),
- }
- })
- }
-}
+use crate::config::WebhookConfig;
+use ed25519_dalek::PublicKey;
+use error::WebhookError;
+use hyper::{
+ body::{to_bytes, Bytes},
+ service::Service,
+ Body, Method, Request, Response, StatusCode,
+};
+use shared::nats_crate::Client;
+use shared::{
+ log::{debug, error},
+ payloads::{CachePayload, DispatchEventTagged, Tracing},
+};
+use signature::validate_signature;
+use std::{
+ future::Future,
+ pin::Pin,
+ str::from_utf8,
+ task::{Context, Poll},
+};
+use twilight_model::gateway::event::DispatchEvent;
+use twilight_model::{
+ application::interaction::{Interaction, InteractionType},
+ gateway::payload::incoming::InteractionCreate,
+};
+
mod error;
-pub mod handler;
pub mod make_service;
mod signature;
#[cfg(test)]
pub mod tests;
+
+/// Hyper service used to handle the discord webhooks
+#[derive(Clone)]
+pub struct WebhookService {
+ pub config: WebhookConfig,
+ pub nats: Client,
+}
+
+impl WebhookService {
+ async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {
+ if req.method() == Method::POST {
+ let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {
+ sig.to_owned()
+ } else {
+ return Err(WebhookError::new(
+ StatusCode::BAD_REQUEST,
+ "missing signature header",
+ ));
+ };
+
+ let timestamp = if let Some(timestamp) = req.headers().get("X-Signature-Timestamp") {
+ timestamp.to_owned()
+ } else {
+ return Err(WebhookError::new(
+ StatusCode::BAD_REQUEST,
+ "missing timestamp header",
+ ));
+ };
+ let data = to_bytes(req.into_body()).await?;
+
+ if validate_signature(
+ &pk,
+ &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),
+ signature.to_str()?,
+ ) {
+ Ok(data)
+ } else {
+ Err(WebhookError::new(
+ StatusCode::UNAUTHORIZED,
+ "invalid signature",
+ ))
+ }
+ } else {
+ Err(WebhookError::new(StatusCode::NOT_FOUND, "not found"))
+ }
+ }
+
+ async fn process_request(
+ req: Request<Body>,
+ nats: Client,
+ pk: PublicKey,
+ ) -> Result<Response<Body>, WebhookError> {
+ match Self::check_request(req, pk).await {
+ Ok(data) => {
+ let utf8 = from_utf8(&data);
+ match utf8 {
+ Ok(data) => match serde_json::from_str::<Interaction>(data) {
+ Ok(value) => {
+ match value.kind {
+ InteractionType::Ping => Ok(Response::builder()
+ .header("Content-Type", "application/json")
+ .body(r#"{"type":1}"#.into())
+ .unwrap()),
+ _ => {
+ debug!("calling nats");
+ // this should hopefully not fail ?
+
+ let data = CachePayload {
+ tracing: Tracing {
+ node_id: "".to_string(),
+ span: None,
+ },
+ data: DispatchEventTagged {
+ data: DispatchEvent::InteractionCreate(Box::new(
+ InteractionCreate(value),
+ )),
+ },
+ };
+
+ let payload = serde_json::to_string(&data).unwrap();
+
+ match nats
+ .request(
+ "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
+ Bytes::from(payload),
+ )
+ .await
+ {
+ Ok(response) => Ok(Response::builder()
+ .header("Content-Type", "application/json")
+ .body(Body::from(response.payload))
+ .unwrap()),
+
+ Err(error) => {
+ error!("failed to request nats: {}", error);
+ Err(WebhookError::new(
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "failed to request nats",
+ ))
+ }
+ }
+ }
+ }
+ }
+
+ Err(error) => {
+ error!("invalid json body: {}", error);
+ Err(WebhookError::new(
+ StatusCode::BAD_REQUEST,
+ "invalid json body",
+ ))
+ }
+ },
+
+ Err(_) => Err(WebhookError::new(StatusCode::BAD_REQUEST, "not utf-8 body")),
+ }
+ }
+ Err(error) => Err(error),
+ }
+ }
+}
+
+/// Implementation of the service
+impl Service<hyper::Request<Body>> for WebhookService {
+ type Response = hyper::Response<Body>;
+ type Error = hyper::Error;
+ type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
+
+ fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ let future = Self::process_request(req, self.nats.clone(), self.config.discord.public_key);
+ Box::pin(async move {
+ let response = future.await;
+
+ match response {
+ Ok(r) => Ok(r),
+ Err(e) => Ok(e.into()),
+ }
+ })
+ }
+}
.unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", N, v.len()))
}
-pub fn validate_signature(public_key: &PublicKey, data: &Vec<u8>, hex_signature: &str) -> bool {
+pub fn validate_signature(public_key: &PublicKey, data: &[u8], hex_signature: &str) -> bool {
SIGNATURE_COUNTER.inc();
let timer = SIGNATURE_TIME_HISTOGRAM.with_label_values(&["webhook_main"]).start_timer();
use crate::{
config::WebhookConfig,
- handler::{handler::WebhookService, make_service::MakeSvc},
+ handler::{make_service::MakeSvc, WebhookService},
};
use hyper::Server;
use leash::{AnyhowResultFuture, Component};
let bind = settings.server.listening_adress;
info!("NAts connected!");
- let nats =
- Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(settings.nats)
-
- .await?;
+ let nats = Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(
+ settings.nats,
+ )
+ .await?;
let make_service = MakeSvc::new(WebhookService {
config: settings.config,
let server = Server::bind(&bind).serve(make_service);
- server.with_graceful_shutdown(async {
- stop.await.expect("should not fail");
- }).await?;
+ server
+ .with_graceful_shutdown(async {
+ stop.await.expect("should not fail");
+ })
+ .await?;
Ok(())
})
fn new() -> Self {
Self {}
}
-}
\ No newline at end of file
+}
pub redis: crate::redis::RedisConfiguration,
}
-impl<'de, T: Clone + DeserializeOwned + Default> Settings<T>
+impl<T: Clone + DeserializeOwned + Default> Settings<T>
{
pub fn new(service_name: &str) -> Result<Settings<T>, GenericError> {
let mut builder = Config::builder();
pub url: String,
}
-// Allows the configuration to directly create a nats connection
-impl Into<Client> for RedisConfiguration {
- fn into(self) -> Client {
- redis::Client::open(self.url).unwrap()
- }
-}
-
impl From<RedisConfiguration>
- for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>>>>
+ for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>> + Send>>
{
fn from(value: RedisConfiguration) -> Self {
Box::pin(async move {