From 91f9396d8232af25beef2bfde184fba841dd5dcb Mon Sep 17 00:00:00 2001 From: Matthieu Date: Wed, 8 Sep 2021 01:51:36 +0400 Subject: [PATCH] add base dispatch handling --- gateway/Cargo.toml | 4 +- gateway/cargo/BUILD.bazel | 18 +++ gateway/src/client/connexion.rs | 145 +++++++++++++++++------- gateway/src/client/payloads/message.rs | 8 ++ gateway/src/client/payloads/payloads.rs | 19 +++- gateway/src/main.rs | 11 +- 6 files changed, 156 insertions(+), 49 deletions(-) diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 7b1e35d..f73d70a 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -5,7 +5,7 @@ edition = "2018" [dependencies] tokio = { version = "1", features = ["full"] } -tokio-tungstenite = { version = "*", features = ["rustls"] } +tokio-tungstenite = { version = "*", features = ["rustls-tls"] } url = "2.2.2" futures-util = "0.3.17" log = { version = "0.4", features = ["std"] } @@ -16,3 +16,5 @@ tokio-stream = "0.1.7" async-stream = "0.3.2" futures-core = "0.3.17" serde_repr = "0.1" +flate2 = "1.0" +async-nats = "0.10.1" \ No newline at end of file diff --git a/gateway/cargo/BUILD.bazel b/gateway/cargo/BUILD.bazel index a6d9cf3..bd87c7c 100644 --- a/gateway/cargo/BUILD.bazel +++ b/gateway/cargo/BUILD.bazel @@ -12,6 +12,15 @@ licenses([ ]) # Aliased targets +alias( + name = "async_nats", + actual = "@raze__async_nats__0_10_1//:async_nats", + tags = [ + "cargo-raze", + "manual", + ], +) + alias( name = "async_stream", actual = "@raze__async_stream__0_3_2//:async_stream", @@ -21,6 +30,15 @@ alias( ], ) +alias( + name = "flate2", + actual = "@raze__flate2__1_0_21//:flate2", + tags = [ + "cargo-raze", + "manual", + ], +) + alias( name = "futures_core", actual = "@raze__futures_core__0_3_17//:futures_core", diff --git a/gateway/src/client/connexion.rs b/gateway/src/client/connexion.rs index 069bbf9..f40178b 100644 --- a/gateway/src/client/connexion.rs +++ b/gateway/src/client/connexion.rs @@ -1,21 +1,15 @@ -use crate::client::payloads::{message::OpCodes, payloads::Hello}; - +use crate::client::payloads::{message::OpCodes, payloads::{Hello, Identify, IdentifyProprerties}}; use super::{ payloads::message::MessageBase, state::{Stage, State}, utils::get_gateway_url, }; -use futures_util::{ - SinkExt, StreamExt, -}; -use log::{error, info, warn}; +use flate2::write::ZlibDecoder; +use futures_util::{SinkExt, StreamExt}; +use log::{debug, error, info, warn}; use std::{str::from_utf8, time::Duration}; -use tokio::{ - net::TcpStream, - select, - time::{Instant}, -}; -use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message}}; +use tokio::{net::TcpStream, select, time::Instant}; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message, handshake::client::Request}}; #[derive(Debug)] pub enum CloseReason { @@ -25,6 +19,11 @@ pub enum CloseReason { ConnexionError(tungstenite::Error), } +pub struct Config { + pub token: String, + pub compress: bool, +} + pub enum HandleResult { Success, Error(CloseReason), @@ -39,7 +38,10 @@ pub enum HandleResult { /// the MessageHandler trait. pub struct Connexion { state: State, + config: Config, + zlib: ZlibDecoder>, connexion: Option>>, + nats: Option, } impl Connexion { @@ -48,10 +50,13 @@ impl Connexion { /// websocket connexion. This instance is not initialized by default. /// a websocket connexion like this can be re-used multiple times /// to allow reconnexion mechanisms. - pub async fn new() -> Self { + pub async fn new(config: Config) -> Self { Connexion { state: State::default(), connexion: None, + config, + zlib: ZlibDecoder::>::new(vec![]), + nats: None, } } @@ -76,14 +81,19 @@ impl Connexion { } else { // we reset the state before starting the connection self.state = State::default(); + let request = Request::builder() + .header("User-Agant", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36") + .uri(get_gateway_url(false, "json", 9)) + .body(()) + .unwrap(); - let connexion_result = connect_async(get_gateway_url(false, "json", 9)).await; + let connexion_result = connect_async(request).await; // we connect outselves to the websocket server if let Err(err) = connexion_result { - return CloseReason::ConnexionError(err) + return CloseReason::ConnexionError(err); } self.connexion = Some(connexion_result.unwrap().0); - + self.nats = Some(async_nats::connect("localhost:4222").await.unwrap()); // this is the loop that will maintain the whole connexion loop { if let Some(connexion) = &mut self.connexion { @@ -92,7 +102,7 @@ impl Connexion { if self.state.stage == Stage::Unknown { let msg = connexion.next().await; if let HandleResult::Error(reason) = self._handle_message(&msg).await { - return reason + return reason; } } else { let timer = self.state.interval.as_mut().unwrap().tick(); @@ -106,7 +116,7 @@ impl Connexion { } } } else { - return CloseReason::ConnexionEnded + return CloseReason::ConnexionEnded; } } } @@ -133,16 +143,18 @@ impl Connexion { HandleResult::Error(CloseReason::ConnexionEnded) } - _ => { - HandleResult::Error(CloseReason::ErrorEncountered("unsupported message type encountered")) - } + _ => HandleResult::Error(CloseReason::ErrorEncountered( + "unsupported message type encountered", + )), }, - Err(_error) => { - HandleResult::Error(CloseReason::ErrorEncountered("error while reading a message")) - } + Err(_error) => HandleResult::Error(CloseReason::ErrorEncountered( + "error while reading a message", + )), } } else { - HandleResult::Error(CloseReason::ErrorEncountered("error while reading a message")) + HandleResult::Error(CloseReason::ErrorEncountered( + "error while reading a message", + )) } } @@ -156,14 +168,18 @@ impl Connexion { } match message.op { - OpCodes::Dispatch => todo!(), - OpCodes::Heartbeat => todo!(), - OpCodes::Identify => todo!(), + OpCodes::Dispatch => { + let t = message.t.unwrap(); + info!("dispatch message received: {:?}", t); + let topic = format!("nova.gateway.{}", t); + self.nats.as_ref().unwrap().publish( + &topic, + &serde_json::to_vec(&message.d).unwrap(), + ).await.unwrap(); + }, OpCodes::PresenceUpdate => todo!(), OpCodes::VoiceStateUpdate => todo!(), - OpCodes::Resume => todo!(), OpCodes::Reconnect => todo!(), - OpCodes::RequestGuildMembers => todo!(), OpCodes::InvalidSession => todo!(), OpCodes::Hello => { if let Ok(hello) = serde_json::from_value::(message.d) { @@ -174,6 +190,36 @@ impl Connexion { Duration::from_millis(hello.heartbeat_interval), )); self.state.stage = Stage::Initialized; + self._send(&MessageBase { + t: None, + op: OpCodes::Identify, + s: None, + d: serde_json::to_value(&Identify{ + token: self.config.token.clone(), + intents: 1 << 0 | + 1 << 1 | + 1 << 2 | + 1 << 3 | + 1 << 4 | + 1 << 5 | + 1 << 6 | + 1 << 7 | + 1 << 8 | + 1 << 9 | + 1 << 10 | + 1 << 11 | + 1 << 12 | + 1 << 13 | + 1 << 14, + properties: IdentifyProprerties { + os: "Linux".into(), + browser: "Nova".into(), + device: "Linux".into(), + } + }).unwrap(), + }).await; + // do login + // todo: session logic } } OpCodes::HeartbeatACK => { @@ -183,35 +229,46 @@ impl Connexion { ); self.state.last_heartbeat_acknowledged = true; } + _ => {} // invalid payloads } } async fn _do_heartbeat(&mut self) { if !self.state.last_heartbeat_acknowledged { - self._terminate_websocket(CloseReason::ErrorEncountered("the server did not acknowledged the last heartbeat")).await; + self._terminate_websocket(CloseReason::ErrorEncountered( + "the server did not acknowledged the last heartbeat", + )) + .await; return; } self.state.last_heartbeat_acknowledged = false; info!("sending heartbeat"); - self._send( - serde_json::to_vec(&MessageBase { - t: None, - d: serde_json::to_value(self.state.sequence).unwrap(), - s: None, - op: OpCodes::Heartbeat, - }) - .unwrap(), - ) + self._send(&MessageBase { + t: None, + d: serde_json::to_value(self.state.sequence).unwrap(), + s: None, + op: OpCodes::Heartbeat, + }) .await; self.state.last_heartbeat_time = std::time::Instant::now(); } - async fn _send(&mut self, data: Vec) { + async fn _send(&mut self, data: &MessageBase) { if let Some(connexion) = &mut self.connexion { - if let Err(error) = connexion.send(Message::Binary(data)).await { - error!("failed to write to socket: {}", error); - self._terminate_websocket(CloseReason::ErrorEncountered("failed to write to the socket")).await; + if let Ok(json) = serde_json::to_vec(data) { + if let Err(error) = connexion.send(Message::Binary(json)).await { + error!("failed to write to socket: {}", error); + self._terminate_websocket(CloseReason::ErrorEncountered( + "failed to write to the socket", + )) + .await; + } + } else { + self._terminate_websocket(CloseReason::ErrorEncountered( + "failed to serialize the message", + )) + .await; } } } diff --git a/gateway/src/client/payloads/message.rs b/gateway/src/client/payloads/message.rs index 4b2a657..966136f 100644 --- a/gateway/src/client/payloads/message.rs +++ b/gateway/src/client/payloads/message.rs @@ -19,6 +19,14 @@ pub enum OpCodes { HeartbeatACK = 11, } +#[derive(Serialize, Deserialize)] +pub enum Dispatch { + #[serde(rename = "READY")] + Ready, + #[serde(rename = "RESUMED")] + Resumed, +} + #[derive(Serialize, Deserialize, PartialEq, Debug)] pub struct MessageBase { pub t: Option, diff --git a/gateway/src/client/payloads/payloads.rs b/gateway/src/client/payloads/payloads.rs index bcbdeb0..cf519df 100644 --- a/gateway/src/client/payloads/payloads.rs +++ b/gateway/src/client/payloads/payloads.rs @@ -7,4 +7,21 @@ pub struct Hello { } #[derive(Debug, Serialize, Deserialize)] -pub struct HeartbeatACK {} \ No newline at end of file +pub struct HeartbeatACK {} + +#[derive(Debug, Serialize, Deserialize)] +pub struct IdentifyProprerties { + #[serde(rename = "$os")] + pub os: String, + #[serde(rename = "$browser")] + pub browser: String, + #[serde(rename = "$device")] + pub device: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Identify { + pub token: String, + pub intents: i64, + pub properties: IdentifyProprerties, +} \ No newline at end of file diff --git a/gateway/src/main.rs b/gateway/src/main.rs index ada348d..709e26b 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -1,4 +1,6 @@ -use client::traits::message_handler::MessageHandler; +use std::env; + +use client::{connexion::Config, traits::message_handler::MessageHandler}; extern crate serde_json; extern crate serde_repr; @@ -12,8 +14,11 @@ async fn main() { pretty_env_logger::init(); for _ in 0..1 { tokio::spawn(async move { - let con = client::connexion::Connexion::new().await; - con.start().await; + let con = client::connexion::Connexion::new(Config { + token: env::var("DISCORD_TOKEN").expect("A valid token is required").into(), + compress: false, + }).await; + println!("{:?}", con.start().await); }).await.unwrap(); } } \ No newline at end of file -- 2.39.5