diff options
Diffstat (limited to 'exes/gateway/src/lib.rs')
| -rw-r--r-- | exes/gateway/src/lib.rs | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs new file mode 100644 index 0000000..d7a4cee --- /dev/null +++ b/exes/gateway/src/lib.rs @@ -0,0 +1,87 @@ +use config::GatewayConfig; +use leash::{AnyhowResultFuture, Component}; +use shared::{ + config::Settings, + log::{debug, info}, + nats_crate::Client, + payloads::{CachePayload, DispatchEventTagged, Tracing}, +}; +use std::{convert::TryFrom, pin::Pin}; +use tokio::sync::oneshot; +use twilight_gateway::{Event, Shard}; +pub mod config; +use futures::FutureExt; +use futures::{select, Future, StreamExt}; +use twilight_model::gateway::event::DispatchEvent; + +pub struct GatewayServer {} +impl Component for GatewayServer { + type Config = GatewayConfig; + const SERVICE_NAME: &'static str = "gateway"; + + fn start( + &self, + settings: Settings<Self::Config>, + stop: oneshot::Receiver<()>, + ) -> AnyhowResultFuture<()> { + Box::pin(async move { + let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) + .shard(settings.shard, settings.shard_total)? + .build(); + + let nats = Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into( + settings.nats, + ) + .await?; + + shard.start().await?; + + let mut stop = stop.fuse(); + loop { + select! { + event = events.next().fuse() => { + if let Some(event) = event { + match event { + Event::Ready(ready) => { + info!("Logged in as {}", ready.user.name); + } + + _ => { + let name = event.kind().name(); + if let Ok(dispatch_event) = DispatchEvent::try_from(event) { + let data = CachePayload { + tracing: Tracing { + node_id: "".to_string(), + span: None, + }, + data: DispatchEventTagged { + data: dispatch_event, + }, + }; + let value = serde_json::to_string(&data)?; + debug!("nats send: {}", value); + let bytes = bytes::Bytes::from(value); + nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes) + .await?; + } + } + } + } else { + break + } + }, + _ = stop => break + }; + } + + info!("stopping shard..."); + shard.shutdown(); + + Ok(()) + }) + } + + fn new() -> Self { + Self {} + } +} |
