1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
  | 
use async_nats::{Client, HeaderMap, HeaderValue};
use config::GatewayConfig;
use leash::{AnyhowResultFuture, Component};
use opentelemetry::{global, propagation::Injector};
use shared::{
    config::Settings,
    payloads::{CachePayload, DispatchEventTagged},
};
use std::{convert::TryFrom, future::Future, pin::Pin, str::FromStr};
use tokio::{select, sync::oneshot};
use tokio_stream::StreamExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_gateway::{Event, Shard};
pub mod config;
use tracing::{debug, info, trace_span};
use twilight_model::gateway::event::DispatchEvent;
struct MetadataMap<'a>(&'a mut HeaderMap);
impl<'a> Injector for MetadataMap<'a> {
    fn set(&mut self, key: &str, value: String) {
        self.0.insert(key, HeaderValue::from_str(&value).unwrap())
    }
}
pub struct GatewayServer {}
impl Component for GatewayServer {
    type Config = GatewayConfig;
    const SERVICE_NAME: &'static str = "gateway";
    fn start(
        &self,
        settings: Settings<Self::Config>,
        mut stop: oneshot::Receiver<()>,
    ) -> AnyhowResultFuture<()> {
        Box::pin(async move {
            let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
                .shard(settings.shard, settings.shard_total)?
                .build();
            let nats = Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>> + Send>>>::into(
                settings.nats,
            )
            .await?;
            shard.start().await?;
            loop {
                select! {
                    event = events.next() => {
                        if let Some(event) = event {
                            match event {
                                Event::Ready(ready) => {
                                    info!("Logged in as {}", ready.user.name);
                                },
                                _ => {
                                    let name = event.kind().name();
                                    if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
                                        debug!("handling event {}", name.unwrap());
                                        let data = CachePayload {
                                            data: DispatchEventTagged {
                                                data: dispatch_event,
                                            },
                                        };
                                        let value = serde_json::to_string(&data)?;
                                        let bytes = bytes::Bytes::from(value);
                                        let span = trace_span!("nats send");
                                        let mut header_map = HeaderMap::new();
                                        let context = span.context();
                                        global::get_text_map_propagator(|propagator| {
                                            propagator.inject_context(&context, &mut MetadataMap(&mut header_map))
                                        });
                                        nats.publish_with_headers(format!("nova.cache.dispatch.{}", name.unwrap()), header_map, bytes)
                                            .await?;
                                    }
                                }
                            }
                        } else {
                            break
                        }
                    },
                    _ = (&mut stop) => break
                };
            }
            info!("stopping shard...");
            shard.shutdown();
            Ok(())
        })
    }
    fn new() -> Self {
        Self {}
    }
}
  |