summaryrefslogtreecommitdiff
path: root/exes/gateway/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'exes/gateway/src/lib.rs')
-rw-r--r--exes/gateway/src/lib.rs94
1 files changed, 56 insertions, 38 deletions
diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs
index ec3337b..e54bb5c 100644
--- a/exes/gateway/src/lib.rs
+++ b/exes/gateway/src/lib.rs
@@ -1,5 +1,17 @@
+#![deny(
+ clippy::all,
+ clippy::correctness,
+ clippy::suspicious,
+ clippy::style,
+ clippy::complexity,
+ clippy::perf,
+ clippy::pedantic,
+ clippy::nursery,
+ unsafe_code
+)]
+#![allow(clippy::redundant_pub_crate)]
use async_nats::{Client, HeaderMap, HeaderValue};
-use config::GatewayConfig;
+use config::Gateway;
use leash::{AnyhowResultFuture, Component};
use opentelemetry::{global, propagation::Injector};
use shared::{
@@ -12,21 +24,21 @@ use tokio_stream::StreamExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_gateway::{Event, Shard};
pub mod config;
-use tracing::{debug, info, trace_span};
+use tracing::{debug, error, info, trace_span};
use twilight_model::gateway::event::DispatchEvent;
struct MetadataMap<'a>(&'a mut HeaderMap);
impl<'a> Injector for MetadataMap<'a> {
fn set(&mut self, key: &str, value: String) {
- self.0.insert(key, HeaderValue::from_str(&value).unwrap())
+ self.0.insert(key, HeaderValue::from_str(&value).unwrap());
}
}
pub struct GatewayServer {}
impl Component for GatewayServer {
- type Config = GatewayConfig;
+ type Config = Gateway;
const SERVICE_NAME: &'static str = "gateway";
fn start(
@@ -35,7 +47,7 @@ impl Component for GatewayServer {
mut stop: oneshot::Receiver<()>,
) -> AnyhowResultFuture<()> {
Box::pin(async move {
- let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
+ let (shard, mut events) = Shard::builder(settings.token.clone(), settings.intents)
.shard(settings.shard, settings.shard_total)?
.build();
@@ -48,40 +60,10 @@ impl Component for GatewayServer {
loop {
select! {
event = events.next() => {
-
if let Some(event) = event {
- match event {
- Event::Ready(ready) => {
- info!("Logged in as {}", ready.user.name);
- },
-
- _ => {
-
- let name = event.kind().name();
- if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
- debug!("handling event {}", name.unwrap());
-
- let data = CachePayload {
- data: DispatchEventTagged {
- data: dispatch_event,
- },
- };
- let value = serde_json::to_string(&data)?;
- let bytes = bytes::Bytes::from(value);
-
- let span = trace_span!("nats send");
-
- let mut header_map = HeaderMap::new();
- let context = span.context();
- global::get_text_map_propagator(|propagator| {
- propagator.inject_context(&context, &mut MetadataMap(&mut header_map))
- });
-
- nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes)
- .await?;
- }
- }
- }
+ let _ = handle_event(event, &nats)
+ .await
+ .map_err(|err| error!(error = ?err, "event publish failed"));
} else {
break
}
@@ -101,3 +83,39 @@ impl Component for GatewayServer {
Self {}
}
}
+
+async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
+ if let Event::Ready(ready) = event {
+ info!("Logged in as {}", ready.user.name);
+ } else {
+ let name = event.kind().name();
+ if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
+ debug!("handling event {}", name.unwrap());
+
+ let data = CachePayload {
+ data: DispatchEventTagged {
+ data: dispatch_event,
+ },
+ };
+ let value = serde_json::to_string(&data)?;
+ let bytes = bytes::Bytes::from(value);
+
+ let span = trace_span!("nats send");
+
+ let mut header_map = HeaderMap::new();
+ let context = span.context();
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&context, &mut MetadataMap(&mut header_map));
+ });
+
+ nats.publish_with_headers(
+ format!("nova.cache.dispatch.{}", name.unwrap()),
+ header_map,
+ bytes,
+ )
+ .await?;
+ }
+ }
+
+ Ok(())
+}