diff options
Diffstat (limited to 'exes/gateway/src/lib.rs')
| -rw-r--r-- | exes/gateway/src/lib.rs | 94 | 
1 files changed, 56 insertions, 38 deletions
diff --git a/exes/gateway/src/lib.rs b/exes/gateway/src/lib.rs index ec3337b..e54bb5c 100644 --- a/exes/gateway/src/lib.rs +++ b/exes/gateway/src/lib.rs @@ -1,5 +1,17 @@ +#![deny( +    clippy::all, +    clippy::correctness, +    clippy::suspicious, +    clippy::style, +    clippy::complexity, +    clippy::perf, +    clippy::pedantic, +    clippy::nursery, +    unsafe_code +)] +#![allow(clippy::redundant_pub_crate)]  use async_nats::{Client, HeaderMap, HeaderValue}; -use config::GatewayConfig; +use config::Gateway;  use leash::{AnyhowResultFuture, Component};  use opentelemetry::{global, propagation::Injector};  use shared::{ @@ -12,21 +24,21 @@ use tokio_stream::StreamExt;  use tracing_opentelemetry::OpenTelemetrySpanExt;  use twilight_gateway::{Event, Shard};  pub mod config; -use tracing::{debug, info, trace_span}; +use tracing::{debug, error, info, trace_span};  use twilight_model::gateway::event::DispatchEvent;  struct MetadataMap<'a>(&'a mut HeaderMap);  impl<'a> Injector for MetadataMap<'a> {      fn set(&mut self, key: &str, value: String) { -        self.0.insert(key, HeaderValue::from_str(&value).unwrap()) +        self.0.insert(key, HeaderValue::from_str(&value).unwrap());      }  }  pub struct GatewayServer {}  impl Component for GatewayServer { -    type Config = GatewayConfig; +    type Config = Gateway;      const SERVICE_NAME: &'static str = "gateway";      fn start( @@ -35,7 +47,7 @@ impl Component for GatewayServer {          mut stop: oneshot::Receiver<()>,      ) -> AnyhowResultFuture<()> {          Box::pin(async move { -            let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents) +            let (shard, mut events) = Shard::builder(settings.token.clone(), settings.intents)                  .shard(settings.shard, settings.shard_total)?                  .build(); @@ -48,40 +60,10 @@ impl Component for GatewayServer {              loop {                  select! {                      event = events.next() => { -                          if let Some(event) = event { -                            match event { -                                Event::Ready(ready) => { -                                    info!("Logged in as {}", ready.user.name); -                                }, - -                                _ => { - -                                    let name = event.kind().name(); -                                    if let Ok(dispatch_event) = DispatchEvent::try_from(event) { -                                        debug!("handling event {}", name.unwrap()); - -                                        let data = CachePayload { -                                            data: DispatchEventTagged { -                                                data: dispatch_event, -                                            }, -                                        }; -                                        let value = serde_json::to_string(&data)?; -                                        let bytes = bytes::Bytes::from(value); - -                                        let span = trace_span!("nats send"); - -                                        let mut header_map = HeaderMap::new(); -                                        let context = span.context(); -                                        global::get_text_map_propagator(|propagator| { -                                            propagator.inject_context(&context, &mut MetadataMap(&mut header_map)) -                                        }); - -                                        nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes) -                                            .await?; -                                    } -                                } -                            } +                           let _ = handle_event(event, &nats) +                            .await +                            .map_err(|err| error!(error = ?err, "event publish failed"));                          } else {                              break                          } @@ -101,3 +83,39 @@ impl Component for GatewayServer {          Self {}      }  } + +async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> { +    if let Event::Ready(ready) = event { +        info!("Logged in as {}", ready.user.name); +    } else { +        let name = event.kind().name(); +        if let Ok(dispatch_event) = DispatchEvent::try_from(event) { +            debug!("handling event {}", name.unwrap()); + +            let data = CachePayload { +                data: DispatchEventTagged { +                    data: dispatch_event, +                }, +            }; +            let value = serde_json::to_string(&data)?; +            let bytes = bytes::Bytes::from(value); + +            let span = trace_span!("nats send"); + +            let mut header_map = HeaderMap::new(); +            let context = span.context(); +            global::get_text_map_propagator(|propagator| { +                propagator.inject_context(&context, &mut MetadataMap(&mut header_map)); +            }); + +            nats.publish_with_headers( +                format!("nova.cache.dispatch.{}", name.unwrap()), +                header_map, +                bytes, +            ) +            .await?; +        } +    } + +    Ok(()) +}  | 
