diff options
Diffstat (limited to 'exes/ratelimit/src/grpc.rs')
| -rw-r--r-- | exes/ratelimit/src/grpc.rs | 60 |
1 files changed, 45 insertions, 15 deletions
diff --git a/exes/ratelimit/src/grpc.rs b/exes/ratelimit/src/grpc.rs index a75c329..fbcf3b7 100644 --- a/exes/ratelimit/src/grpc.rs +++ b/exes/ratelimit/src/grpc.rs @@ -1,11 +1,13 @@ - +use opentelemetry::{global, propagation::Extractor}; +use proto::nova::ratelimit::ratelimiter::{ + ratelimiter_server::Ratelimiter, BucketSubmitTicketRequest, BucketSubmitTicketResponse, +}; use std::pin::Pin; - -use futures_util::Stream; -use proto::nova::ratelimit::ratelimiter::{ratelimiter_server::Ratelimiter, BucketSubmitTicketResponse, BucketSubmitTicketRequest}; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; +use tracing::{debug, debug_span, info, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use twilight_http_ratelimiting::{ticket::TicketReceiver, RatelimitHeaders}; use crate::redis_global_local_bucket_ratelimiter::RedisGlobalLocalBucketRatelimiter; @@ -20,9 +22,28 @@ impl RLServer { } } +struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); + +impl<'a> Extractor for MetadataMap<'a> { + /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + /// Collect all the keys from the MetadataMap. + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::<Vec<_>>() + } +} + #[tonic::async_trait] impl Ratelimiter for RLServer { - type SubmitTicketStream = Pin<Box<dyn Stream<Item = Result<BucketSubmitTicketResponse, Status>> + Send>>; @@ -30,6 +51,14 @@ impl Ratelimiter for RLServer { &self, req: Request<Streaming<BucketSubmitTicketRequest>>, ) -> Result<Response<Self::SubmitTicketStream>, Status> { + let parent_cx = + global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata()))); + // Generate a tracing span as usual + let span = tracing::span!(tracing::Level::INFO, "request process"); + + // Assign parent trace from external context + span.set_parent(parent_cx); + let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); let imrl = self.ratelimiter.clone(); @@ -45,29 +74,30 @@ impl Ratelimiter for RLServer { match result.data.unwrap() { proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Path(path) => { - let a = imrl.ticket(path).await.unwrap(); + let span = debug_span!("requesting ticket"); + let a = imrl.ticket(path).instrument(span).await.unwrap(); receiver = Some(a); - tx.send(Ok(BucketSubmitTicketResponse { accepted: 1 })).await.unwrap(); - }, proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::Data::Headers(b) => { if let Some(recv) = receiver { - let recv = recv.await.unwrap(); + let span = debug_span!("waiting for headers data"); + let recv = recv.instrument(span).await.unwrap(); let rheaders = RatelimitHeaders::from_pairs(b.headers.iter().map(|f| (f.0.as_str(), f.1.as_bytes()))).unwrap(); - - recv.headers(Some(rheaders)).unwrap(); + recv.headers(Some(rheaders)).unwrap(); break; } }, } } - println!("\tstream ended"); - }); + + debug!("\tstream ended"); + info!("request terminated"); + }.instrument(span)); // echo just write the same data that was received let out_stream = ReceiverStream::new(rx); @@ -76,4 +106,4 @@ impl Ratelimiter for RLServer { Box::pin(out_stream) as Self::SubmitTicketStream )) } -}
\ No newline at end of file +} |
