diff options
Diffstat (limited to 'gateway/src')
| -rw-r--r-- | gateway/src/client/connexion.rs | 148 | ||||
| -rw-r--r-- | gateway/src/client/error.rs | 20 | ||||
| -rw-r--r-- | gateway/src/client/mod.rs | 5 | ||||
| -rw-r--r-- | gateway/src/client/payloads/payloads.rs | 3 | ||||
| -rw-r--r-- | gateway/src/client/shard.rs | 6 | ||||
| -rw-r--r-- | gateway/src/client/structs.rs | 35 | ||||
| -rw-r--r-- | gateway/src/client/traits/message_handler.rs | 3 | ||||
| -rw-r--r-- | gateway/src/client/traits/mod.rs | 1 | ||||
| -rw-r--r-- | gateway/src/client/utils.rs | 2 | ||||
| -rw-r--r-- | gateway/src/cluster_manager.rs | 46 | ||||
| -rw-r--r-- | gateway/src/config.rs | 33 | ||||
| -rw-r--r-- | gateway/src/main.rs | 26 |
12 files changed, 190 insertions, 138 deletions
diff --git a/gateway/src/client/connexion.rs b/gateway/src/client/connexion.rs index 5cd13eb..8d68158 100644 --- a/gateway/src/client/connexion.rs +++ b/gateway/src/client/connexion.rs @@ -4,12 +4,13 @@ use super::{ state::{Stage, State}, utils::get_gateway_url, }; -use flate2::write::ZlibDecoder; use futures_util::{SinkExt, StreamExt}; -use log::{debug, error, info, trace, warn}; +use log::{error, info, trace, warn}; use std::{str::from_utf8, time::Duration}; use tokio::{net::TcpStream, select, time::Instant}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message, handshake::client::Request}}; +use crate::client::structs::ClientConfig; +use tokio::sync::mpsc; #[derive(Debug)] pub enum CloseReason { @@ -18,55 +19,35 @@ pub enum CloseReason { ErrorEncountered(&'static str), ConnexionError(tungstenite::Error), } - -pub struct Config { - pub token: String, - pub compress: bool, -} - pub enum HandleResult { Success, Error(CloseReason), } -/// This struct represents a single connexion to the gateway, -/// it does not have any retry logic or reconnexion mechanism, -/// everything is handled in the Shard struct. -/// The purpose of this struct is to handle the encoding, -/// compression and other gateway-transport related stuff. -/// All the messages are send through another struct implementing -/// the MessageHandler trait. pub struct Connexion { state: State, - config: Config, - zlib: ZlibDecoder<Vec<u8>>, + config: ClientConfig, connexion: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>, - nats: Option<async_nats::Connection>, + terminate: Option<mpsc::Sender<CloseReason>>, } impl Connexion { - /// Creates a new instance of a discord websocket connexion using the options - /// this is used internally by the shard struct to initialize a single - /// websocket connexion. This instance is not initialized by default. - /// a websocket connexion like this can be re-used multiple times - /// to allow reconnexion mechanisms. - pub async fn new(config: Config) -> Self { + pub fn new(config: ClientConfig) -> Self { Connexion { state: State::default(), connexion: None, config, - zlib: ZlibDecoder::<Vec<u8>>::new(vec![]), - nats: None, + terminate: None } } /// Terminate the connexion and the "start" method related to it. - async fn _terminate_websocket(&mut self, message: CloseReason) { + async fn _terminate_websocket(&mut self, message: &CloseReason) { if let Some(connexion) = &mut self.connexion { if let Err(err) = connexion.close(None).await { error!("failed to close socket {}", err); } else { - info!("closed the socket: {:?}", message) + info!("closed the socket: {:?}", message); } } else { warn!("a termination request was sent without a connexion openned") @@ -82,7 +63,6 @@ impl Connexion { // we reset the state before starting the connection self.state = State::default(); let request = Request::builder() - .header("User-Agant", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36") .uri(get_gateway_url(false, "json", 9)) .body(()) .unwrap(); @@ -93,26 +73,34 @@ impl Connexion { return CloseReason::ConnexionError(err); } self.connexion = Some(connexion_result.unwrap().0); - self.nats = Some(async_nats::connect("localhost:4222").await.unwrap()); + + let (tx, mut rx) = mpsc::channel::<CloseReason>(1); + self.terminate = Some(tx); + // this is the loop that will maintain the whole connexion loop { if let Some(connexion) = &mut self.connexion { // if we do not have a hello message received yet, then we do not use the heartbeat interval // and we just wait for messages to arrive if self.state.stage == Stage::Unknown { - let msg = connexion.next().await; - if let HandleResult::Error(reason) = self._handle_message(&msg).await { - return reason; + select! { + msg = connexion.next() => self._handle_message(&msg).await, + Some(reason) = rx.recv() => { + // gateway termination requested + self._terminate_websocket(&reason); + return reason + } } } else { let timer = self.state.interval.as_mut().unwrap().tick(); select! { - msg = connexion.next() => { - if let HandleResult::Error(reason) = self._handle_message(&msg).await { - return reason - } - }, - _ = timer => self._do_heartbeat().await + msg = connexion.next() => self._handle_message(&msg).await, + _ = timer => self._do_heartbeat().await, + Some(reason) = rx.recv() => { + // gateway termination requested + self._terminate_websocket(&reason); + return reason + } } } } else { @@ -125,36 +113,34 @@ impl Connexion { async fn _handle_message( &mut self, data: &Option<Result<Message, tokio_tungstenite::tungstenite::Error>>, - ) -> HandleResult { + ) { if let Some(message) = data { match message { Ok(message) => match message { Message::Text(text) => { self._handle_discord_message(&text).await; - HandleResult::Success } Message::Binary(message) => { self._handle_discord_message(from_utf8(message).unwrap()) .await; - HandleResult::Success } - Message::Close(_) => { - error!("discord connexion closed"); - HandleResult::Error(CloseReason::ConnexionEnded) + Message::Close(code) => { + error!("discord connexion closed: {:?}", code); + self.terminate.as_ref().unwrap().send(CloseReason::ConnexionEnded).await.unwrap(); } - _ => HandleResult::Error(CloseReason::ErrorEncountered( + _ => self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered( "unsupported message type encountered", - )), + )).await.unwrap(), }, - Err(_error) => HandleResult::Error(CloseReason::ErrorEncountered( + Err(_error) => self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered( "error while reading a message", - )), + )).await.unwrap(), } } else { - HandleResult::Error(CloseReason::ErrorEncountered( + self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered( "error while reading a message", - )) + )).await.unwrap() } } @@ -171,18 +157,19 @@ impl Connexion { OpCodes::Dispatch => { let t = message.t.unwrap(); trace!("dispatch message received: {:?}", t); - let topic = format!("nova.gateway.{}", t); - if let Err(e) = self.nats.as_ref().unwrap().publish( - &topic, - &serde_json::to_vec(&message.d).unwrap(), - ).await { - error!("failed to publish message {}", e); - } }, - OpCodes::PresenceUpdate => todo!(), - OpCodes::VoiceStateUpdate => todo!(), - OpCodes::Reconnect => todo!(), - OpCodes::InvalidSession => todo!(), + OpCodes::PresenceUpdate => { + println!("presence update message received: {:?}", message.d); + }, + OpCodes::VoiceStateUpdate => { + println!("voice update"); + } + OpCodes::Reconnect => { + println!("reconnect {:?}", message.d); + }, + OpCodes::InvalidSession => { + println!("invalid session: {:?}", message.d); + }, OpCodes::Hello => { if let Ok(hello) = serde_json::from_value::<Hello>(message.d) { info!("server sent hello {:?}", hello); @@ -192,32 +179,24 @@ impl Connexion { Duration::from_millis(hello.heartbeat_interval), )); self.state.stage = Stage::Initialized; + let mut shard: Option<[i64; 2]> = None; + if let Some(sharding) = &self.config.shard { + shard = Some([sharding.current_shard.clone(), sharding.total_shards.clone()]); + info!("shard information: {:?}", shard); + } self._send(&MessageBase { t: None, op: OpCodes::Identify, s: None, d: serde_json::to_value(&Identify{ token: self.config.token.clone(), - intents: 1 << 0 | - 1 << 1 | - 1 << 2 | - 1 << 3 | - 1 << 4 | - 1 << 5 | - 1 << 6 | - 1 << 7 | - 1 << 8 | - 1 << 9 | - 1 << 10 | - 1 << 11 | - 1 << 12 | - 1 << 13 | - 1 << 14, + intents: self.config.intents.clone().bits(), properties: IdentifyProprerties { os: "Linux".into(), browser: "Nova".into(), device: "Linux".into(), - } + }, + shard: shard, }).unwrap(), }).await; // do login @@ -237,10 +216,9 @@ impl Connexion { async fn _do_heartbeat(&mut self) { if !self.state.last_heartbeat_acknowledged { - self._terminate_websocket(CloseReason::ErrorEncountered( + self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered( "the server did not acknowledged the last heartbeat", - )) - .await; + )).await.unwrap(); return; } self.state.last_heartbeat_acknowledged = false; @@ -261,16 +239,16 @@ impl Connexion { if let Ok(json) = serde_json::to_vec(data) { if let Err(error) = connexion.send(Message::Binary(json)).await { error!("failed to write to socket: {}", error); - self._terminate_websocket(CloseReason::ErrorEncountered( + self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered( "failed to write to the socket", )) - .await; + .await.unwrap(); } } else { - self._terminate_websocket(CloseReason::ErrorEncountered( + self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered( "failed to serialize the message", )) - .await; + .await.unwrap(); } } } diff --git a/gateway/src/client/error.rs b/gateway/src/client/error.rs deleted file mode 100644 index bac6894..0000000 --- a/gateway/src/client/error.rs +++ /dev/null @@ -1,20 +0,0 @@ -#[derive(Debug)] -struct MyError(String); - -impl fmt::Display for MyError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "There is an error: {}", self.0) - } -} - -impl Error for NovaError {} - -pub fn run() -> Result<(), Box<dyn Error>> { - let condition = true; - - if condition { - return Err(Box::new(MyError("Oops".into()))); - } - - Ok(()) -}
\ No newline at end of file diff --git a/gateway/src/client/mod.rs b/gateway/src/client/mod.rs index 179c40d..3ffd489 100644 --- a/gateway/src/client/mod.rs +++ b/gateway/src/client/mod.rs @@ -1,6 +1,5 @@ -pub mod connexion; mod utils; mod state; -mod shard; pub mod payloads; -pub mod traits;
\ No newline at end of file +pub mod connexion; +pub mod structs;
\ No newline at end of file diff --git a/gateway/src/client/payloads/payloads.rs b/gateway/src/client/payloads/payloads.rs index cf519df..0fec3dc 100644 --- a/gateway/src/client/payloads/payloads.rs +++ b/gateway/src/client/payloads/payloads.rs @@ -22,6 +22,7 @@ pub struct IdentifyProprerties { #[derive(Debug, Serialize, Deserialize)] pub struct Identify { pub token: String, - pub intents: i64, + pub intents: u16, pub properties: IdentifyProprerties, + pub shard: Option<[i64; 2]>, }
\ No newline at end of file diff --git a/gateway/src/client/shard.rs b/gateway/src/client/shard.rs deleted file mode 100644 index fb1ceda..0000000 --- a/gateway/src/client/shard.rs +++ /dev/null @@ -1,6 +0,0 @@ - - - -struct Shard { - -}
\ No newline at end of file diff --git a/gateway/src/client/structs.rs b/gateway/src/client/structs.rs new file mode 100644 index 0000000..1f186c6 --- /dev/null +++ b/gateway/src/client/structs.rs @@ -0,0 +1,35 @@ +use enumflags2::{bitflags, BitFlags}; + +#[bitflags] +#[repr(u16)] +#[derive(Clone, Copy, Debug)] +pub enum Intents { + Guilds = 1 << 0, + GuildMembers = 1 << 1, + GuildBans = 1 << 2, + GuildEmojisAndStickers = 1 << 3, + GuildIntegrations = 1 << 4, + GuildWebhoks = 1 << 5, + GuildInvites = 1 << 6, + GuildVoiceStates = 1 << 7, + GuildPresences = 1 << 8, + GuildMessages = 1 << 9, + GuildMessagesReactions = 1 << 10, + GuildMessageTyping = 1 << 11, + DirectMessages = 1 << 12, + DirectMessagesReactions = 1 << 13, + DirectMessageTyping = 1 << 14, +} + +pub struct Sharding { + pub total_shards: i64, + pub current_shard: i64 +} + +/// Config for the client connection. +pub struct ClientConfig { + pub token: String, + pub large_threshold: Option<u64>, + pub shard: Option<Sharding>, + pub intents: BitFlags<Intents> +}
\ No newline at end of file diff --git a/gateway/src/client/traits/message_handler.rs b/gateway/src/client/traits/message_handler.rs deleted file mode 100644 index a5bfd20..0000000 --- a/gateway/src/client/traits/message_handler.rs +++ /dev/null @@ -1,3 +0,0 @@ -/// This trait is used by the Connexion<H> struct -/// It implements a basic interface for handling events. -pub trait MessageHandler {}
\ No newline at end of file diff --git a/gateway/src/client/traits/mod.rs b/gateway/src/client/traits/mod.rs deleted file mode 100644 index 98d0c32..0000000 --- a/gateway/src/client/traits/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod message_handler;
\ No newline at end of file diff --git a/gateway/src/client/utils.rs b/gateway/src/client/utils.rs index 023b6b9..141740e 100644 --- a/gateway/src/client/utils.rs +++ b/gateway/src/client/utils.rs @@ -1,3 +1,5 @@ + +/// Formats a url of connection to the gateway pub fn get_gateway_url (compress: bool, encoding: &str, v: i16) -> String { return format!( "wss://gateway.discord.gg/?v={}&encoding={}&compress={}", diff --git a/gateway/src/cluster_manager.rs b/gateway/src/cluster_manager.rs new file mode 100644 index 0000000..8900979 --- /dev/null +++ b/gateway/src/cluster_manager.rs @@ -0,0 +1,46 @@ +use crate::client::connexion::Connexion; +use crate::client::structs::*; +use crate::config::Config; +use log::info; +use futures::future::select_all; + +pub struct ClusterManager { + gateway_connexions: Vec<Connexion>, + config: Config, +} + +impl ClusterManager { + + pub fn new(config: crate::config::Config) -> ClusterManager { + // create the memory for containing all the gateways + let gateways: Vec<Connexion> = Vec::with_capacity(config.clustering.shard_count as usize); + + ClusterManager { + gateway_connexions: gateways, + config, + } + } + + pub async fn start(mut self) { + info!("Starting cluster manager..."); + info!("I am cluster {}/{} managing {} discord shards", self.config.clustering.cluster_id+1, self.config.clustering.cluster_size, self.config.clustering.shard_count); + + for i in 0..self.config.clustering.shard_count { + let shard_id = self.config.clustering.cluster_id * self.config.clustering.cluster_size + i; + info!("Starting shard {} for cluster {} for discord shard {}", i, self.config.clustering.cluster_id, shard_id); + self.gateway_connexions.push(Connexion::new(ClientConfig{ + token: self.config.discord.token.clone(), + intents: self.config.discord.intents, + large_threshold: self.config.discord.large_threshold, + shard: Some(Sharding { + total_shards: self.config.clustering.cluster_size * self.config.clustering.shard_count, + current_shard: shard_id, + }), + })); + } + let tasks = self.gateway_connexions.into_iter().map(|item| Box::pin(item.start())); + select_all(tasks).await; + + info!("one shard crashed, we need a restart"); + } +}
\ No newline at end of file diff --git a/gateway/src/config.rs b/gateway/src/config.rs new file mode 100644 index 0000000..6dc2c28 --- /dev/null +++ b/gateway/src/config.rs @@ -0,0 +1,33 @@ +use enumflags2::BitFlags; +use crate::client::structs::Intents; +use serde::Deserialize; + +/// Config for the client connection. +#[derive(Debug, Deserialize, Clone, Default)] +pub struct ClusterClientConfig { + pub token: String, + pub large_threshold: Option<u64>, + pub intents: BitFlags<Intents> +} + +/// Configuration for the cluster manager +#[derive(Debug, Deserialize, Clone, Default)] +pub struct ClusterClientSharding { + pub cluster_size: i64, + pub cluster_id: i64, + pub shard_count: i64 +} + +/// Configuration for the output of the cluster +#[derive(Debug, Deserialize, Clone, Default)] +pub struct ClusterRelay { + pub relay_instances: i64 +} + +/// Configuration for the gateway component +#[derive(Debug, Deserialize, Clone, Default)] +pub struct Config { + pub discord: ClusterClientConfig, + pub clustering: ClusterClientSharding, + pub relaying: ClusterRelay, +} diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 709e26b..4e4836c 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -1,24 +1,12 @@ -use std::env; - -use client::{connexion::Config, traits::message_handler::MessageHandler}; -extern crate serde_json; -extern crate serde_repr; - mod client; +mod config; +mod cluster_manager; -struct Handler {} -impl MessageHandler for Handler {} +use common::config::Settings; #[tokio::main] async fn main() { - pretty_env_logger::init(); - for _ in 0..1 { - tokio::spawn(async move { - let con = client::connexion::Connexion::new(Config { - token: env::var("DISCORD_TOKEN").expect("A valid token is required").into(), - compress: false, - }).await; - println!("{:?}", con.start().await); - }).await.unwrap(); - } -}
\ No newline at end of file + let settings: Settings<config::Config> = Settings::new("gateway").unwrap(); + let manager = cluster_manager::ClusterManager::new(settings.config); + manager.start().await; +} |
