summaryrefslogtreecommitdiff
path: root/exes/gateway
diff options
context:
space:
mode:
Diffstat (limited to 'exes/gateway')
-rw-r--r--exes/gateway/src/main.rs72
1 files changed, 46 insertions, 26 deletions
diff --git a/exes/gateway/src/main.rs b/exes/gateway/src/main.rs
index 7957b08..f2a4f93 100644
--- a/exes/gateway/src/main.rs
+++ b/exes/gateway/src/main.rs
@@ -6,18 +6,24 @@ use shared::{
nats_crate::Client,
payloads::{CachePayload, DispatchEventTagged, Tracing},
};
+use tokio::sync::oneshot;
use std::{convert::TryFrom, pin::Pin};
use twilight_gateway::{Event, Shard};
mod config;
-use futures::{Future, StreamExt};
+use futures::{Future, StreamExt, select};
use twilight_model::gateway::event::DispatchEvent;
+use futures::FutureExt;
struct GatewayServer {}
impl Component for GatewayServer {
type Config = GatewayConfig;
const SERVICE_NAME: &'static str = "gateway";
- fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
+ fn start(
+ &self,
+ settings: Settings<Self::Config>,
+ stop: oneshot::Receiver<()>,
+ ) -> AnyhowResultFuture<()> {
Box::pin(async move {
let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
.shard(settings.shard, settings.shard_total)?
@@ -29,34 +35,48 @@ impl Component for GatewayServer {
shard.start().await?;
- while let Some(event) = events.next().await {
- match event {
- Event::Ready(ready) => {
- info!("Logged in as {}", ready.user.name);
- }
+ let mut stop = stop.fuse();
+ loop {
- _ => {
- let name = event.kind().name();
- if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
- let data = CachePayload {
- tracing: Tracing {
- node_id: "".to_string(),
- span: None,
- },
- data: DispatchEventTagged {
- data: dispatch_event,
- },
- };
- let value = serde_json::to_string(&data)?;
- debug!("nats send: {}", value);
- let bytes = bytes::Bytes::from(value);
- nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
- .await?;
+ select! {
+ event = events.next().fuse() => {
+ if let Some(event) = event {
+ match event {
+ Event::Ready(ready) => {
+ info!("Logged in as {}", ready.user.name);
+ }
+
+ _ => {
+ let name = event.kind().name();
+ if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+ let data = CachePayload {
+ tracing: Tracing {
+ node_id: "".to_string(),
+ span: None,
+ },
+ data: DispatchEventTagged {
+ data: dispatch_event,
+ },
+ };
+ let value = serde_json::to_string(&data)?;
+ debug!("nats send: {}", value);
+ let bytes = bytes::Bytes::from(value);
+ nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+ .await?;
+ }
+ }
+ }
+ } else {
+ break
}
- }
- }
+ },
+ _ = stop => break
+ };
}
+ info!("stopping shard...");
+ shard.shutdown();
+
Ok(())
})
}