diff options
Diffstat (limited to 'exes/rest/src/ratelimit_client/mod.rs')
| -rw-r--r-- | exes/rest/src/ratelimit_client/mod.rs | 130 |
1 files changed, 79 insertions, 51 deletions
diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs index ea34ad9..7493af9 100644 --- a/exes/rest/src/ratelimit_client/mod.rs +++ b/exes/rest/src/ratelimit_client/mod.rs @@ -1,10 +1,10 @@ -use self::remote_hashring::{HashRingWrapper, VNode}; -use futures_util::Future; +use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode}; +use opentelemetry::global; use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers}; use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest; -use shared::log::debug; use std::collections::HashMap; use std::fmt::Debug; +use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::time::UNIX_EPOCH; @@ -12,6 +12,9 @@ use std::time::{Duration, SystemTime}; use tokio::sync::oneshot::{self}; use tokio::sync::{broadcast, mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; +use tonic::Request; +use tracing::{debug, debug_span, Instrument, Span, instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; mod remote_hashring; @@ -45,7 +48,7 @@ impl RemoteRatelimiter { let mut write = self.remotes.write().await; - for ip in ["localhost"] { + for ip in ["ratelimit"] { let a = VNode::new(ip.into()).await?; write.add(a.clone()); } @@ -82,55 +85,80 @@ impl RemoteRatelimiter { obj } + #[instrument(name = "ticket task")] pub fn ticket(&self, path: String) -> IssueTicket { let remotes = self.remotes.clone(); let (tx, rx) = oneshot::channel::<HashMap<String, String>>(); - - Box::pin(async move { - // Get node managing this path - let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); - - // Buffers for the gRPC streaming channel. - let (send, remote) = mpsc::channel(5); - let (do_request, wait) = oneshot::channel(); - // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it - let stream = ReceiverStream::new(remote); - - // Start the grpc streaming - let ticket = node.submit_ticket(stream).await?; - - // First, send the request - send.send(BucketSubmitTicketRequest { - data: Some(Data::Path(path)), - }) - .await?; - - // We continuously listen for events in the channel. - tokio::spawn(async move { - let message = ticket.into_inner().message().await.unwrap().unwrap(); - - if message.accepted == 1 { - do_request.send(()).unwrap(); - let headers = rx.await.unwrap(); - - send.send(BucketSubmitTicketRequest { - data: Some(Data::Headers(Headers { - precise_time: SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("time went backwards") - .as_millis() as u64, - headers, - })), - }) - .await - .unwrap(); - } - }); - - // Wait for the message to be sent - wait.await?; - - Ok(tx) - }) + Box::pin( + async move { + // Get node managing this path + let mut node = (*remotes.read().await.get(&path).unwrap()).clone(); + + // Buffers for the gRPC streaming channel. + let (send, remote) = mpsc::channel(5); + let (do_request, wait) = oneshot::channel(); + // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it + let stream = ReceiverStream::new(remote); + + let mut request = Request::new(stream); + + let span = debug_span!("remote request"); + let context = span.context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut())) + }); + + // Start the grpc streaming + let ticket = node.submit_ticket(request).await?; + + // First, send the request + send.send(BucketSubmitTicketRequest { + data: Some(Data::Path(path)), + }) + .await?; + + // We continuously listen for events in the channel. + let span = debug_span!("stream worker"); + tokio::spawn( + async move { + let span = debug_span!("waiting for ticket upstream"); + let message = ticket + .into_inner() + .message() + .instrument(span) + .await + .unwrap() + .unwrap(); + + if message.accepted == 1 { + debug!("request ticket was accepted"); + do_request.send(()).unwrap(); + let span = debug_span!("waiting for response headers"); + let headers = rx.instrument(span).await.unwrap(); + + send.send(BucketSubmitTicketRequest { + data: Some(Data::Headers(Headers { + precise_time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_millis() + as u64, + headers, + })), + }) + .await + .unwrap(); + } + } + .instrument(span), + ); + + // Wait for the message to be sent + wait.await?; + + Ok(tx) + } + .instrument(Span::current()), + ) } } |
