summaryrefslogtreecommitdiff
path: root/exes/gateway/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'exes/gateway/src/lib.rs')
-rw-r--r--exes/gateway/src/lib.rs87
1 files changed, 87 insertions, 0 deletions
diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs
new file mode 100644
index 0000000..d7a4cee
--- /dev/null
+++ b/exes/gateway/src/lib.rs
@@ -0,0 +1,87 @@
+use config::GatewayConfig;
+use leash::{AnyhowResultFuture, Component};
+use shared::{
+ config::Settings,
+ log::{debug, info},
+ nats_crate::Client,
+ payloads::{CachePayload, DispatchEventTagged, Tracing},
+};
+use std::{convert::TryFrom, pin::Pin};
+use tokio::sync::oneshot;
+use twilight_gateway::{Event, Shard};
+pub mod config;
+use futures::FutureExt;
+use futures::{select, Future, StreamExt};
+use twilight_model::gateway::event::DispatchEvent;
+
+pub struct GatewayServer {}
+impl Component for GatewayServer {
+ type Config = GatewayConfig;
+ const SERVICE_NAME: &'static str = "gateway";
+
+ fn start(
+ &self,
+ settings: Settings<Self::Config>,
+ stop: oneshot::Receiver<()>,
+ ) -> AnyhowResultFuture<()> {
+ Box::pin(async move {
+ let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
+ .shard(settings.shard, settings.shard_total)?
+ .build();
+
+ let nats = Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(
+ settings.nats,
+ )
+ .await?;
+
+ shard.start().await?;
+
+ let mut stop = stop.fuse();
+ loop {
+ select! {
+ event = events.next().fuse() => {
+ if let Some(event) = event {
+ match event {
+ Event::Ready(ready) => {
+ info!("Logged in as {}", ready.user.name);
+ }
+
+ _ => {
+ let name = event.kind().name();
+ if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+ let data = CachePayload {
+ tracing: Tracing {
+ node_id: "".to_string(),
+ span: None,
+ },
+ data: DispatchEventTagged {
+ data: dispatch_event,
+ },
+ };
+ let value = serde_json::to_string(&data)?;
+ debug!("nats send: {}", value);
+ let bytes = bytes::Bytes::from(value);
+ nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
+ .await?;
+ }
+ }
+ }
+ } else {
+ break
+ }
+ },
+ _ = stop => break
+ };
+ }
+
+ info!("stopping shard...");
+ shard.shutdown();
+
+ Ok(())
+ })
+ }
+
+ fn new() -> Self {
+ Self {}
+ }
+}