summaryrefslogtreecommitdiff
path: root/exes/ratelimit/src/grpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'exes/ratelimit/src/grpc.rs')
-rw-r--r--exes/ratelimit/src/grpc.rs60
1 files changed, 45 insertions, 15 deletions
diff --git a/exes/ratelimit/src/grpc.rs b/exes/ratelimit/src/grpc.rs
index a75c329..fbcf3b7 100644
--- a/exes/ratelimit/src/grpc.rs
+++ b/exes/ratelimit/src/grpc.rs
@@ -1,11 +1,13 @@
-
+use opentelemetry::{global, propagation::Extractor};
+use proto::nova::ratelimit::ratelimiter::{
+ ratelimiter_server::Ratelimiter, BucketSubmitTicketRequest, BucketSubmitTicketResponse,
+};
use std::pin::Pin;
-
-use futures_util::Stream;
-use proto::nova::ratelimit::ratelimiter::{ratelimiter_server::Ratelimiter, BucketSubmitTicketResponse, BucketSubmitTicketRequest};
use tokio::sync::mpsc;
-use tokio_stream::{wrappers::ReceiverStream, StreamExt};
+use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
+use tracing::{debug, debug_span, info, Instrument};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_http_ratelimiting::{ticket::TicketReceiver, RatelimitHeaders};
use crate::redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter;
@@ -20,9 +22,28 @@ impl RLServer {
}
}
+struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);
+
+impl<'a> Extractor for MetadataMap<'a> {
+ /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None
+ fn get(&self, key: &str) -> Option<&str> {
+ self.0.get(key).and_then(|metadata| metadata.to_str().ok())
+ }
+
+ /// Collect all the keys from the MetadataMap.
+ fn keys(&self) -> Vec<&str> {
+ self.0
+ .keys()
+ .map(|key| match key {
+ tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
+ tonic::metadata::KeyRef::Binary(v) => v.as_str(),
+ })
+ .collect::<Vec<_>>()
+ }
+}
+
#[tonic::async_trait]
impl Ratelimiter for RLServer {
-
type SubmitTicketStream =
Pin<Box<dyn Stream<Item = Result<BucketSubmitTicketResponse, Status>> + Send>>;
@@ -30,6 +51,14 @@ impl Ratelimiter for RLServer {
&self,
req: Request<Streaming<BucketSubmitTicketRequest>>,
) -> Result<Response<Self::SubmitTicketStream>, Status> {
+ let parent_cx =
+ global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata())));
+ // Generate a tracing span as usual
+ let span = tracing::span!(tracing::Level::INFO, "request process");
+
+ // Assign parent trace from external context
+ span.set_parent(parent_cx);
+
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let imrl = self.ratelimiter.clone();
@@ -45,29 +74,30 @@ impl Ratelimiter for RLServer {
match result.data.unwrap() {
proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Path(path) => {
- let a = imrl.ticket(path).await.unwrap();
+ let span = debug_span!("requesting ticket");
+ let a = imrl.ticket(path).instrument(span).await.unwrap();
receiver = Some(a);
-
tx.send(Ok(BucketSubmitTicketResponse {
accepted: 1
})).await.unwrap();
-
},
proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Headers(b) => {
if let Some(recv) = receiver {
- let recv = recv.await.unwrap();
+ let span = debug_span!("waiting for headers data");
+ let recv = recv.instrument(span).await.unwrap();
let rheaders = RatelimitHeaders::from_pairs(b.headers.iter().map(|f| (f.0.as_str(), f.1.as_bytes()))).unwrap();
-
- recv.headers(Some(rheaders)).unwrap();
+ recv.headers(Some(rheaders)).unwrap();
break;
}
},
}
}
- println!("\tstream ended");
- });
+
+ debug!("\tstream ended");
+ info!("request terminated");
+ }.instrument(span));
// echo just write the same data that was received
let out_stream = ReceiverStream::new(rx);
@@ -76,4 +106,4 @@ impl Ratelimiter for RLServer {
Box::pin(out_stream) as Self::SubmitTicketStream
))
}
-} \ No newline at end of file
+}