diff options
Diffstat (limited to 'exes/gateway/src/lib.rs')
| -rw-r--r-- | exes/gateway/src/lib.rs | 53 |
1 files changed, 34 insertions, 19 deletions
diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs index d7a4cee..014b72a 100644 --- a/exes/gateway/src/lib.rs +++ b/exes/gateway/src/lib.rs @@ -1,19 +1,28 @@ +use async_nats::{Client, HeaderMap, HeaderValue}; use config::GatewayConfig; use leash::{AnyhowResultFuture, Component}; +use opentelemetry::{global, propagation::Injector}; use shared::{ config::Settings, - log::{debug, info}, - nats_crate::Client, - payloads::{CachePayload, DispatchEventTagged, Tracing}, + payloads::{CachePayload, DispatchEventTagged}, }; -use std::{convert::TryFrom, pin::Pin}; -use tokio::sync::oneshot; +use std::{convert::TryFrom, future::Future, pin::Pin, str::FromStr}; +use tokio::{select, sync::oneshot}; +use tokio_stream::StreamExt; +use tracing_opentelemetry::OpenTelemetrySpanExt; use twilight_gateway::{Event, Shard}; pub mod config; -use futures::FutureExt; -use futures::{select, Future, StreamExt}; +use tracing::{debug, info, trace_span}; use twilight_model::gateway::event::DispatchEvent; +struct MetadataMap<'a>(&'a mut HeaderMap); + +impl<'a> Injector for MetadataMap<'a> { + fn set(&mut self, key: &str, value: String) { + self.0.insert(key, HeaderValue::from_str(&value).unwrap()) + } +} + pub struct GatewayServer {} impl Component for GatewayServer { type Config = GatewayConfig; @@ -22,7 +31,7 @@ impl Component for GatewayServer { fn start( &self, settings: Settings<Self::Config>, - stop: oneshot::Receiver<()>, + mut stop: oneshot::Receiver<()>, ) -> AnyhowResultFuture<()> { Box::pin(async move { let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) @@ -33,35 +42,41 @@ impl Component for GatewayServer { settings.nats, ) .await?; - shard.start().await?; - let mut stop = stop.fuse(); loop { select! { - event = events.next().fuse() => { + event = events.next() => { + if let Some(event) = event { match event { Event::Ready(ready) => { info!("Logged in as {}", ready.user.name); - } + }, _ => { + let name = event.kind().name(); if let Ok(dispatch_event) = DispatchEvent::try_from(event) { + debug!("handling event {}", name.unwrap()); + let data = CachePayload { - tracing: Tracing { - node_id: "".to_string(), - span: None, - }, data: DispatchEventTagged { data: dispatch_event, }, }; let value = serde_json::to_string(&data)?; - debug!("nats send: {}", value); let bytes = bytes::Bytes::from(value); - nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) + + let span = trace_span!("nats send"); + + let mut header_map = HeaderMap::new(); + let context = span.context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&context, &mut MetadataMap(&mut header_map)) + }); + + nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes) .await?; } } @@ -70,7 +85,7 @@ impl Component for GatewayServer { break } }, - _ = stop => break + _ = (&mut stop) => break }; } |
