summaryrefslogtreecommitdiff
path: root/gateway/src
diff options
context:
space:
mode:
Diffstat (limited to 'gateway/src')
-rw-r--r--gateway/src/client/connexion.rs148
-rw-r--r--gateway/src/client/error.rs20
-rw-r--r--gateway/src/client/mod.rs5
-rw-r--r--gateway/src/client/payloads/payloads.rs3
-rw-r--r--gateway/src/client/shard.rs6
-rw-r--r--gateway/src/client/structs.rs35
-rw-r--r--gateway/src/client/traits/message_handler.rs3
-rw-r--r--gateway/src/client/traits/mod.rs1
-rw-r--r--gateway/src/client/utils.rs2
-rw-r--r--gateway/src/cluster_manager.rs46
-rw-r--r--gateway/src/config.rs33
-rw-r--r--gateway/src/main.rs26
12 files changed, 190 insertions, 138 deletions
diff --git a/gateway/src/client/connexion.rs b/gateway/src/client/connexion.rs
index 5cd13eb..8d68158 100644
--- a/gateway/src/client/connexion.rs
+++ b/gateway/src/client/connexion.rs
@@ -4,12 +4,13 @@ use super::{
state::{Stage, State},
utils::get_gateway_url,
};
-use flate2::write::ZlibDecoder;
use futures_util::{SinkExt, StreamExt};
-use log::{debug, error, info, trace, warn};
+use log::{error, info, trace, warn};
use std::{str::from_utf8, time::Duration};
use tokio::{net::TcpStream, select, time::Instant};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message, handshake::client::Request}};
+use crate::client::structs::ClientConfig;
+use tokio::sync::mpsc;
#[derive(Debug)]
pub enum CloseReason {
@@ -18,55 +19,35 @@ pub enum CloseReason {
ErrorEncountered(&'static str),
ConnexionError(tungstenite::Error),
}
-
-pub struct Config {
- pub token: String,
- pub compress: bool,
-}
-
pub enum HandleResult {
Success,
Error(CloseReason),
}
-/// This struct represents a single connexion to the gateway,
-/// it does not have any retry logic or reconnexion mechanism,
-/// everything is handled in the Shard struct.
-/// The purpose of this struct is to handle the encoding,
-/// compression and other gateway-transport related stuff.
-/// All the messages are send through another struct implementing
-/// the MessageHandler trait.
pub struct Connexion {
state: State,
- config: Config,
- zlib: ZlibDecoder<Vec<u8>>,
+ config: ClientConfig,
connexion: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
- nats: Option<async_nats::Connection>,
+ terminate: Option<mpsc::Sender<CloseReason>>,
}
impl Connexion {
- /// Creates a new instance of a discord websocket connexion using the options
- /// this is used internally by the shard struct to initialize a single
- /// websocket connexion. This instance is not initialized by default.
- /// a websocket connexion like this can be re-used multiple times
- /// to allow reconnexion mechanisms.
- pub async fn new(config: Config) -> Self {
+ pub fn new(config: ClientConfig) -> Self {
Connexion {
state: State::default(),
connexion: None,
config,
- zlib: ZlibDecoder::<Vec<u8>>::new(vec![]),
- nats: None,
+ terminate: None
}
}
/// Terminate the connexion and the "start" method related to it.
- async fn _terminate_websocket(&mut self, message: CloseReason) {
+ async fn _terminate_websocket(&mut self, message: &CloseReason) {
if let Some(connexion) = &mut self.connexion {
if let Err(err) = connexion.close(None).await {
error!("failed to close socket {}", err);
} else {
- info!("closed the socket: {:?}", message)
+ info!("closed the socket: {:?}", message);
}
} else {
warn!("a termination request was sent without a connexion openned")
@@ -82,7 +63,6 @@ impl Connexion {
// we reset the state before starting the connection
self.state = State::default();
let request = Request::builder()
- .header("User-Agant", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36")
.uri(get_gateway_url(false, "json", 9))
.body(())
.unwrap();
@@ -93,26 +73,34 @@ impl Connexion {
return CloseReason::ConnexionError(err);
}
self.connexion = Some(connexion_result.unwrap().0);
- self.nats = Some(async_nats::connect("localhost:4222").await.unwrap());
+
+ let (tx, mut rx) = mpsc::channel::<CloseReason>(1);
+ self.terminate = Some(tx);
+
// this is the loop that will maintain the whole connexion
loop {
if let Some(connexion) = &mut self.connexion {
// if we do not have a hello message received yet, then we do not use the heartbeat interval
// and we just wait for messages to arrive
if self.state.stage == Stage::Unknown {
- let msg = connexion.next().await;
- if let HandleResult::Error(reason) = self._handle_message(&msg).await {
- return reason;
+ select! {
+ msg = connexion.next() => self._handle_message(&msg).await,
+ Some(reason) = rx.recv() => {
+ // gateway termination requested
+ self._terminate_websocket(&reason);
+ return reason
+ }
}
} else {
let timer = self.state.interval.as_mut().unwrap().tick();
select! {
- msg = connexion.next() => {
- if let HandleResult::Error(reason) = self._handle_message(&msg).await {
- return reason
- }
- },
- _ = timer => self._do_heartbeat().await
+ msg = connexion.next() => self._handle_message(&msg).await,
+ _ = timer => self._do_heartbeat().await,
+ Some(reason) = rx.recv() => {
+ // gateway termination requested
+ self._terminate_websocket(&reason);
+ return reason
+ }
}
}
} else {
@@ -125,36 +113,34 @@ impl Connexion {
async fn _handle_message(
&mut self,
data: &Option<Result<Message, tokio_tungstenite::tungstenite::Error>>,
- ) -> HandleResult {
+ ) {
if let Some(message) = data {
match message {
Ok(message) => match message {
Message::Text(text) => {
self._handle_discord_message(&text).await;
- HandleResult::Success
}
Message::Binary(message) => {
self._handle_discord_message(from_utf8(message).unwrap())
.await;
- HandleResult::Success
}
- Message::Close(_) => {
- error!("discord connexion closed");
- HandleResult::Error(CloseReason::ConnexionEnded)
+ Message::Close(code) => {
+ error!("discord connexion closed: {:?}", code);
+ self.terminate.as_ref().unwrap().send(CloseReason::ConnexionEnded).await.unwrap();
}
- _ => HandleResult::Error(CloseReason::ErrorEncountered(
+ _ => self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered(
"unsupported message type encountered",
- )),
+ )).await.unwrap(),
},
- Err(_error) => HandleResult::Error(CloseReason::ErrorEncountered(
+ Err(_error) => self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered(
"error while reading a message",
- )),
+ )).await.unwrap(),
}
} else {
- HandleResult::Error(CloseReason::ErrorEncountered(
+ self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered(
"error while reading a message",
- ))
+ )).await.unwrap()
}
}
@@ -171,18 +157,19 @@ impl Connexion {
OpCodes::Dispatch => {
let t = message.t.unwrap();
trace!("dispatch message received: {:?}", t);
- let topic = format!("nova.gateway.{}", t);
- if let Err(e) = self.nats.as_ref().unwrap().publish(
- &topic,
- &serde_json::to_vec(&message.d).unwrap(),
- ).await {
- error!("failed to publish message {}", e);
- }
},
- OpCodes::PresenceUpdate => todo!(),
- OpCodes::VoiceStateUpdate => todo!(),
- OpCodes::Reconnect => todo!(),
- OpCodes::InvalidSession => todo!(),
+ OpCodes::PresenceUpdate => {
+ println!("presence update message received: {:?}", message.d);
+ },
+ OpCodes::VoiceStateUpdate => {
+ println!("voice update");
+ }
+ OpCodes::Reconnect => {
+ println!("reconnect {:?}", message.d);
+ },
+ OpCodes::InvalidSession => {
+ println!("invalid session: {:?}", message.d);
+ },
OpCodes::Hello => {
if let Ok(hello) = serde_json::from_value::<Hello>(message.d) {
info!("server sent hello {:?}", hello);
@@ -192,32 +179,24 @@ impl Connexion {
Duration::from_millis(hello.heartbeat_interval),
));
self.state.stage = Stage::Initialized;
+ let mut shard: Option<[i64; 2]> = None;
+ if let Some(sharding) = &self.config.shard {
+ shard = Some([sharding.current_shard.clone(), sharding.total_shards.clone()]);
+ info!("shard information: {:?}", shard);
+ }
self._send(&MessageBase {
t: None,
op: OpCodes::Identify,
s: None,
d: serde_json::to_value(&Identify{
token: self.config.token.clone(),
- intents: 1 << 0 |
- 1 << 1 |
- 1 << 2 |
- 1 << 3 |
- 1 << 4 |
- 1 << 5 |
- 1 << 6 |
- 1 << 7 |
- 1 << 8 |
- 1 << 9 |
- 1 << 10 |
- 1 << 11 |
- 1 << 12 |
- 1 << 13 |
- 1 << 14,
+ intents: self.config.intents.clone().bits(),
properties: IdentifyProprerties {
os: "Linux".into(),
browser: "Nova".into(),
device: "Linux".into(),
- }
+ },
+ shard: shard,
}).unwrap(),
}).await;
// do login
@@ -237,10 +216,9 @@ impl Connexion {
async fn _do_heartbeat(&mut self) {
if !self.state.last_heartbeat_acknowledged {
- self._terminate_websocket(CloseReason::ErrorEncountered(
+ self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered(
"the server did not acknowledged the last heartbeat",
- ))
- .await;
+ )).await.unwrap();
return;
}
self.state.last_heartbeat_acknowledged = false;
@@ -261,16 +239,16 @@ impl Connexion {
if let Ok(json) = serde_json::to_vec(data) {
if let Err(error) = connexion.send(Message::Binary(json)).await {
error!("failed to write to socket: {}", error);
- self._terminate_websocket(CloseReason::ErrorEncountered(
+ self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered(
"failed to write to the socket",
))
- .await;
+ .await.unwrap();
}
} else {
- self._terminate_websocket(CloseReason::ErrorEncountered(
+ self.terminate.as_ref().unwrap().send(CloseReason::ErrorEncountered(
"failed to serialize the message",
))
- .await;
+ .await.unwrap();
}
}
}
diff --git a/gateway/src/client/error.rs b/gateway/src/client/error.rs
deleted file mode 100644
index bac6894..0000000
--- a/gateway/src/client/error.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-#[derive(Debug)]
-struct MyError(String);
-
-impl fmt::Display for MyError {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "There is an error: {}", self.0)
- }
-}
-
-impl Error for NovaError {}
-
-pub fn run() -> Result<(), Box<dyn Error>> {
- let condition = true;
-
- if condition {
- return Err(Box::new(MyError("Oops".into())));
- }
-
- Ok(())
-} \ No newline at end of file
diff --git a/gateway/src/client/mod.rs b/gateway/src/client/mod.rs
index 179c40d..3ffd489 100644
--- a/gateway/src/client/mod.rs
+++ b/gateway/src/client/mod.rs
@@ -1,6 +1,5 @@
-pub mod connexion;
mod utils;
mod state;
-mod shard;
pub mod payloads;
-pub mod traits; \ No newline at end of file
+pub mod connexion;
+pub mod structs; \ No newline at end of file
diff --git a/gateway/src/client/payloads/payloads.rs b/gateway/src/client/payloads/payloads.rs
index cf519df..0fec3dc 100644
--- a/gateway/src/client/payloads/payloads.rs
+++ b/gateway/src/client/payloads/payloads.rs
@@ -22,6 +22,7 @@ pub struct IdentifyProprerties {
#[derive(Debug, Serialize, Deserialize)]
pub struct Identify {
pub token: String,
- pub intents: i64,
+ pub intents: u16,
pub properties: IdentifyProprerties,
+ pub shard: Option<[i64; 2]>,
} \ No newline at end of file
diff --git a/gateway/src/client/shard.rs b/gateway/src/client/shard.rs
deleted file mode 100644
index fb1ceda..0000000
--- a/gateway/src/client/shard.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-struct Shard {
-
-} \ No newline at end of file
diff --git a/gateway/src/client/structs.rs b/gateway/src/client/structs.rs
new file mode 100644
index 0000000..1f186c6
--- /dev/null
+++ b/gateway/src/client/structs.rs
@@ -0,0 +1,35 @@
+use enumflags2::{bitflags, BitFlags};
+
+#[bitflags]
+#[repr(u16)]
+#[derive(Clone, Copy, Debug)]
+pub enum Intents {
+ Guilds = 1 << 0,
+ GuildMembers = 1 << 1,
+ GuildBans = 1 << 2,
+ GuildEmojisAndStickers = 1 << 3,
+ GuildIntegrations = 1 << 4,
+ GuildWebhoks = 1 << 5,
+ GuildInvites = 1 << 6,
+ GuildVoiceStates = 1 << 7,
+ GuildPresences = 1 << 8,
+ GuildMessages = 1 << 9,
+ GuildMessagesReactions = 1 << 10,
+ GuildMessageTyping = 1 << 11,
+ DirectMessages = 1 << 12,
+ DirectMessagesReactions = 1 << 13,
+ DirectMessageTyping = 1 << 14,
+}
+
+pub struct Sharding {
+ pub total_shards: i64,
+ pub current_shard: i64
+}
+
+/// Config for the client connection.
+pub struct ClientConfig {
+ pub token: String,
+ pub large_threshold: Option<u64>,
+ pub shard: Option<Sharding>,
+ pub intents: BitFlags<Intents>
+} \ No newline at end of file
diff --git a/gateway/src/client/traits/message_handler.rs b/gateway/src/client/traits/message_handler.rs
deleted file mode 100644
index a5bfd20..0000000
--- a/gateway/src/client/traits/message_handler.rs
+++ /dev/null
@@ -1,3 +0,0 @@
-/// This trait is used by the Connexion<H> struct
-/// It implements a basic interface for handling events.
-pub trait MessageHandler {} \ No newline at end of file
diff --git a/gateway/src/client/traits/mod.rs b/gateway/src/client/traits/mod.rs
deleted file mode 100644
index 98d0c32..0000000
--- a/gateway/src/client/traits/mod.rs
+++ /dev/null
@@ -1 +0,0 @@
-pub mod message_handler; \ No newline at end of file
diff --git a/gateway/src/client/utils.rs b/gateway/src/client/utils.rs
index 023b6b9..141740e 100644
--- a/gateway/src/client/utils.rs
+++ b/gateway/src/client/utils.rs
@@ -1,3 +1,5 @@
+
+/// Formats a url of connection to the gateway
pub fn get_gateway_url (compress: bool, encoding: &str, v: i16) -> String {
return format!(
"wss://gateway.discord.gg/?v={}&encoding={}&compress={}",
diff --git a/gateway/src/cluster_manager.rs b/gateway/src/cluster_manager.rs
new file mode 100644
index 0000000..8900979
--- /dev/null
+++ b/gateway/src/cluster_manager.rs
@@ -0,0 +1,46 @@
+use crate::client::connexion::Connexion;
+use crate::client::structs::*;
+use crate::config::Config;
+use log::info;
+use futures::future::select_all;
+
+pub struct ClusterManager {
+ gateway_connexions: Vec<Connexion>,
+ config: Config,
+}
+
+impl ClusterManager {
+
+ pub fn new(config: crate::config::Config) -> ClusterManager {
+ // create the memory for containing all the gateways
+ let gateways: Vec<Connexion> = Vec::with_capacity(config.clustering.shard_count as usize);
+
+ ClusterManager {
+ gateway_connexions: gateways,
+ config,
+ }
+ }
+
+ pub async fn start(mut self) {
+ info!("Starting cluster manager...");
+ info!("I am cluster {}/{} managing {} discord shards", self.config.clustering.cluster_id+1, self.config.clustering.cluster_size, self.config.clustering.shard_count);
+
+ for i in 0..self.config.clustering.shard_count {
+ let shard_id = self.config.clustering.cluster_id * self.config.clustering.cluster_size + i;
+ info!("Starting shard {} for cluster {} for discord shard {}", i, self.config.clustering.cluster_id, shard_id);
+ self.gateway_connexions.push(Connexion::new(ClientConfig{
+ token: self.config.discord.token.clone(),
+ intents: self.config.discord.intents,
+ large_threshold: self.config.discord.large_threshold,
+ shard: Some(Sharding {
+ total_shards: self.config.clustering.cluster_size * self.config.clustering.shard_count,
+ current_shard: shard_id,
+ }),
+ }));
+ }
+ let tasks = self.gateway_connexions.into_iter().map(|item| Box::pin(item.start()));
+ select_all(tasks).await;
+
+ info!("one shard crashed, we need a restart");
+ }
+} \ No newline at end of file
diff --git a/gateway/src/config.rs b/gateway/src/config.rs
new file mode 100644
index 0000000..6dc2c28
--- /dev/null
+++ b/gateway/src/config.rs
@@ -0,0 +1,33 @@
+use enumflags2::BitFlags;
+use crate::client::structs::Intents;
+use serde::Deserialize;
+
+/// Config for the client connection.
+#[derive(Debug, Deserialize, Clone, Default)]
+pub struct ClusterClientConfig {
+ pub token: String,
+ pub large_threshold: Option<u64>,
+ pub intents: BitFlags<Intents>
+}
+
+/// Configuration for the cluster manager
+#[derive(Debug, Deserialize, Clone, Default)]
+pub struct ClusterClientSharding {
+ pub cluster_size: i64,
+ pub cluster_id: i64,
+ pub shard_count: i64
+}
+
+/// Configuration for the output of the cluster
+#[derive(Debug, Deserialize, Clone, Default)]
+pub struct ClusterRelay {
+ pub relay_instances: i64
+}
+
+/// Configuration for the gateway component
+#[derive(Debug, Deserialize, Clone, Default)]
+pub struct Config {
+ pub discord: ClusterClientConfig,
+ pub clustering: ClusterClientSharding,
+ pub relaying: ClusterRelay,
+}
diff --git a/gateway/src/main.rs b/gateway/src/main.rs
index 709e26b..4e4836c 100644
--- a/gateway/src/main.rs
+++ b/gateway/src/main.rs
@@ -1,24 +1,12 @@
-use std::env;
-
-use client::{connexion::Config, traits::message_handler::MessageHandler};
-extern crate serde_json;
-extern crate serde_repr;
-
mod client;
+mod config;
+mod cluster_manager;
-struct Handler {}
-impl MessageHandler for Handler {}
+use common::config::Settings;
#[tokio::main]
async fn main() {
- pretty_env_logger::init();
- for _ in 0..1 {
- tokio::spawn(async move {
- let con = client::connexion::Connexion::new(Config {
- token: env::var("DISCORD_TOKEN").expect("A valid token is required").into(),
- compress: false,
- }).await;
- println!("{:?}", con.start().await);
- }).await.unwrap();
- }
-} \ No newline at end of file
+ let settings: Settings<config::Config> = Settings::new("gateway").unwrap();
+ let manager = cluster_manager::ClusterManager::new(settings.config);
+ manager.start().await;
+}