summaryrefslogtreecommitdiff
path: root/exes/gateway/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'exes/gateway/src/lib.rs')
-rw-r--r--exes/gateway/src/lib.rs53
1 files changed, 34 insertions, 19 deletions
diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs
index d7a4cee..014b72a 100644
--- a/exes/gateway/src/lib.rs
+++ b/exes/gateway/src/lib.rs
@@ -1,19 +1,28 @@
+use async_nats::{Client, HeaderMap, HeaderValue};
use config::GatewayConfig;
use leash::{AnyhowResultFuture, Component};
+use opentelemetry::{global, propagation::Injector};
use shared::{
config::Settings,
- log::{debug, info},
- nats_crate::Client,
- payloads::{CachePayload, DispatchEventTagged, Tracing},
+ payloads::{CachePayload, DispatchEventTagged},
};
-use std::{convert::TryFrom, pin::Pin};
-use tokio::sync::oneshot;
+use std::{convert::TryFrom, future::Future, pin::Pin, str::FromStr};
+use tokio::{select, sync::oneshot};
+use tokio_stream::StreamExt;
+use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_gateway::{Event, Shard};
pub mod config;
-use futures::FutureExt;
-use futures::{select, Future, StreamExt};
+use tracing::{debug, info, trace_span};
use twilight_model::gateway::event::DispatchEvent;
+struct MetadataMap<'a>(&'a mut HeaderMap);
+
+impl<'a> Injector for MetadataMap<'a> {
+ fn set(&mut self, key: &str, value: String) {
+ self.0.insert(key, HeaderValue::from_str(&value).unwrap())
+ }
+}
+
pub struct GatewayServer {}
impl Component for GatewayServer {
type Config = GatewayConfig;
@@ -22,7 +31,7 @@ impl Component for GatewayServer {
fn start(
&self,
settings: Settings<Self::Config>,
- stop: oneshot::Receiver<()>,
+ mut stop: oneshot::Receiver<()>,
) -> AnyhowResultFuture<()> {
Box::pin(async move {
let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
@@ -33,35 +42,41 @@ impl Component for GatewayServer {
settings.nats,
)
.await?;
-
shard.start().await?;
- let mut stop = stop.fuse();
loop {
select! {
- event = events.next().fuse() => {
+ event = events.next() => {
+
if let Some(event) = event {
match event {
Event::Ready(ready) => {
info!("Logged in as {}", ready.user.name);
- }
+ },
_ => {
+
let name = event.kind().name();
if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+ debug!("handling event {}", name.unwrap());
+
let data = CachePayload {
- tracing: Tracing {
- node_id: "".to_string(),
- span: None,
- },
data: DispatchEventTagged {
data: dispatch_event,
},
};
let value = serde_json::to_string(&data)?;
- debug!("nats send: {}", value);
let bytes = bytes::Bytes::from(value);
- nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+
+ let span = trace_span!("nats send");
+
+ let mut header_map = HeaderMap::new();
+ let context = span.context();
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&context, &mut MetadataMap(&mut header_map))
+ });
+
+ nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes)
.await?;
}
}
@@ -70,7 +85,7 @@ impl Component for GatewayServer {
break
}
},
- _ = stop => break
+ _ = (&mut stop) => break
};
}