diff options
Diffstat (limited to 'exes/gateway')
| -rw-r--r-- | exes/gateway/src/main.rs | 72 |
1 files changed, 46 insertions, 26 deletions
diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs index 7957b08..f2a4f93 100644 --- a/exes/gateway/src/main.rs +++ b/exes/gateway/src/main.rs @@ -6,18 +6,24 @@ use shared::{ nats_crate::Client, payloads::{CachePayload, DispatchEventTagged, Tracing}, }; +use tokio::sync::oneshot; use std::{convert::TryFrom, pin::Pin}; use twilight_gateway::{Event, Shard}; mod config; -use futures::{Future, StreamExt}; +use futures::{Future, StreamExt, select}; use twilight_model::gateway::event::DispatchEvent; +use futures::FutureExt; struct GatewayServer {} impl Component for GatewayServer { type Config = GatewayConfig; const SERVICE_NAME: &'static str = "gateway"; - fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> { + fn start( + &self, + settings: Settings<Self::Config>, + stop: oneshot::Receiver<()>, + ) -> AnyhowResultFuture<()> { Box::pin(async move { let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) .shard(settings.shard, settings.shard_total)? @@ -29,34 +35,48 @@ impl Component for GatewayServer { shard.start().await?; - while let Some(event) = events.next().await { - match event { - Event::Ready(ready) => { - info!("Logged in as {}", ready.user.name); - } + let mut stop = stop.fuse(); + loop { - _ => { - let name = event.kind().name(); - if let Ok(dispatch_event) = DispatchEvent::try_from(event) { - let data = CachePayload { - tracing: Tracing { - node_id: "".to_string(), - span: None, - }, - data: DispatchEventTagged { - data: dispatch_event, - }, - }; - let value = serde_json::to_string(&data)?; - debug!("nats send: {}", value); - let bytes = bytes::Bytes::from(value); - nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) - .await?; + select! { + event = events.next().fuse() => { + if let Some(event) = event { + match event { + Event::Ready(ready) => { + info!("Logged in as {}", ready.user.name); + } + + _ => { + let name = event.kind().name(); + if let Ok(dispatch_event) = DispatchEvent::try_from(event) { + let data = CachePayload { + tracing: Tracing { + node_id: "".to_string(), + span: None, + }, + data: DispatchEventTagged { + data: dispatch_event, + }, + }; + let value = serde_json::to_string(&data)?; + debug!("nats send: {}", value); + let bytes = bytes::Bytes::from(value); + nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) + .await?; + } + } + } + } else { + break } - } - } + }, + _ = stop => break + }; } + info!("stopping shard..."); + shard.shutdown(); + Ok(()) }) } |
