summaryrefslogtreecommitdiff
path: root/exes/rest/src/ratelimit_client/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'exes/rest/src/ratelimit_client/mod.rs')
-rw-r--r--exes/rest/src/ratelimit_client/mod.rs130
1 files changed, 79 insertions, 51 deletions
diff --git a/exes/rest/src/ratelimit_client/mod.rs b/exes/rest/src/ratelimit_client/mod.rs
index ea34ad9..7493af9 100644
--- a/exes/rest/src/ratelimit_client/mod.rs
+++ b/exes/rest/src/ratelimit_client/mod.rs
@@ -1,10 +1,10 @@
-use self::remote_hashring::{HashRingWrapper, VNode};
-use futures_util::Future;
+use self::remote_hashring::{HashRingWrapper, MetadataMap, VNode};
+use opentelemetry::global;
use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};
use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest;
-use shared::log::debug;
use std::collections::HashMap;
use std::fmt::Debug;
+use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
@@ -12,6 +12,9 @@ use std::time::{Duration, SystemTime};
use tokio::sync::oneshot::{self};
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
+use tonic::Request;
+use tracing::{debug, debug_span, Instrument, Span, instrument};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
mod remote_hashring;
@@ -45,7 +48,7 @@ impl RemoteRatelimiter {
let mut write = self.remotes.write().await;
- for ip in ["localhost"] {
+ for ip in ["ratelimit"] {
let a = VNode::new(ip.into()).await?;
write.add(a.clone());
}
@@ -82,55 +85,80 @@ impl RemoteRatelimiter {
obj
}
+ #[instrument(name = "ticket task")]
pub fn ticket(&self, path: String) -> IssueTicket {
let remotes = self.remotes.clone();
let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
-
- Box::pin(async move {
- // Get node managing this path
- let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
-
- // Buffers for the gRPC streaming channel.
- let (send, remote) = mpsc::channel(5);
- let (do_request, wait) = oneshot::channel();
- // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
- let stream = ReceiverStream::new(remote);
-
- // Start the grpc streaming
- let ticket = node.submit_ticket(stream).await?;
-
- // First, send the request
- send.send(BucketSubmitTicketRequest {
- data: Some(Data::Path(path)),
- })
- .await?;
-
- // We continuously listen for events in the channel.
- tokio::spawn(async move {
- let message = ticket.into_inner().message().await.unwrap().unwrap();
-
- if message.accepted == 1 {
- do_request.send(()).unwrap();
- let headers = rx.await.unwrap();
-
- send.send(BucketSubmitTicketRequest {
- data: Some(Data::Headers(Headers {
- precise_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("time went backwards")
- .as_millis() as u64,
- headers,
- })),
- })
- .await
- .unwrap();
- }
- });
-
- // Wait for the message to be sent
- wait.await?;
-
- Ok(tx)
- })
+ Box::pin(
+ async move {
+ // Get node managing this path
+ let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
+
+ // Buffers for the gRPC streaming channel.
+ let (send, remote) = mpsc::channel(5);
+ let (do_request, wait) = oneshot::channel();
+ // Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
+ let stream = ReceiverStream::new(remote);
+
+ let mut request = Request::new(stream);
+
+ let span = debug_span!("remote request");
+ let context = span.context();
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&context, &mut MetadataMap(request.metadata_mut()))
+ });
+
+ // Start the grpc streaming
+ let ticket = node.submit_ticket(request).await?;
+
+ // First, send the request
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Path(path)),
+ })
+ .await?;
+
+ // We continuously listen for events in the channel.
+ let span = debug_span!("stream worker");
+ tokio::spawn(
+ async move {
+ let span = debug_span!("waiting for ticket upstream");
+ let message = ticket
+ .into_inner()
+ .message()
+ .instrument(span)
+ .await
+ .unwrap()
+ .unwrap();
+
+ if message.accepted == 1 {
+ debug!("request ticket was accepted");
+ do_request.send(()).unwrap();
+ let span = debug_span!("waiting for response headers");
+ let headers = rx.instrument(span).await.unwrap();
+
+ send.send(BucketSubmitTicketRequest {
+ data: Some(Data::Headers(Headers {
+ precise_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("time went backwards")
+ .as_millis()
+ as u64,
+ headers,
+ })),
+ })
+ .await
+ .unwrap();
+ }
+ }
+ .instrument(span),
+ );
+
+ // Wait for the message to be sent
+ wait.await?;
+
+ Ok(tx)
+ }
+ .instrument(Span::current()),
+ )
}
}