summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthieu <matthieu@developershouse.xyz>2021-09-08 01:51:36 +0400
committerMatthieu <matthieu@developershouse.xyz>2021-09-08 01:51:36 +0400
commit91f9396d8232af25beef2bfde184fba841dd5dcb (patch)
tree4104fb52eb80c739cd5d47d946e508e6f23079d5
parent0a4329de1d13447581767793f2d505edfef4c1bf (diff)
add base dispatch handling
-rw-r--r--gateway/Cargo.toml4
-rw-r--r--gateway/cargo/BUILD.bazel18
-rw-r--r--gateway/src/client/connexion.rs145
-rw-r--r--gateway/src/client/payloads/message.rs8
-rw-r--r--gateway/src/client/payloads/payloads.rs19
-rw-r--r--gateway/src/main.rs11
6 files changed, 156 insertions, 49 deletions
diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml
index 7b1e35d..f73d70a 100644
--- a/gateway/Cargo.toml
+++ b/gateway/Cargo.toml
@@ -5,7 +5,7 @@ edition = "2018"
[dependencies]
tokio = { version = "1", features = ["full"] }
-tokio-tungstenite = { version = "*", features = ["rustls"] }
+tokio-tungstenite = { version = "*", features = ["rustls-tls"] }
url = "2.2.2"
futures-util = "0.3.17"
log = { version = "0.4", features = ["std"] }
@@ -16,3 +16,5 @@ tokio-stream = "0.1.7"
async-stream = "0.3.2"
futures-core = "0.3.17"
serde_repr = "0.1"
+flate2 = "1.0"
+async-nats = "0.10.1" \ No newline at end of file
diff --git a/gateway/cargo/BUILD.bazel b/gateway/cargo/BUILD.bazel
index a6d9cf3..bd87c7c 100644
--- a/gateway/cargo/BUILD.bazel
+++ b/gateway/cargo/BUILD.bazel
@@ -13,6 +13,15 @@ licenses([
# Aliased targets
alias(
+ name = "async_nats",
+ actual = "@raze__async_nats__0_10_1//:async_nats",
+ tags = [
+ "cargo-raze",
+ "manual",
+ ],
+)
+
+alias(
name = "async_stream",
actual = "@raze__async_stream__0_3_2//:async_stream",
tags = [
@@ -22,6 +31,15 @@ alias(
)
alias(
+ name = "flate2",
+ actual = "@raze__flate2__1_0_21//:flate2",
+ tags = [
+ "cargo-raze",
+ "manual",
+ ],
+)
+
+alias(
name = "futures_core",
actual = "@raze__futures_core__0_3_17//:futures_core",
tags = [
diff --git a/gateway/src/client/connexion.rs b/gateway/src/client/connexion.rs
index 069bbf9..f40178b 100644
--- a/gateway/src/client/connexion.rs
+++ b/gateway/src/client/connexion.rs
@@ -1,21 +1,15 @@
-use crate::client::payloads::{message::OpCodes, payloads::Hello};
-
+use crate::client::payloads::{message::OpCodes, payloads::{Hello, Identify, IdentifyProprerties}};
use super::{
payloads::message::MessageBase,
state::{Stage, State},
utils::get_gateway_url,
};
-use futures_util::{
- SinkExt, StreamExt,
-};
-use log::{error, info, warn};
+use flate2::write::ZlibDecoder;
+use futures_util::{SinkExt, StreamExt};
+use log::{debug, error, info, warn};
use std::{str::from_utf8, time::Duration};
-use tokio::{
- net::TcpStream,
- select,
- time::{Instant},
-};
-use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message}};
+use tokio::{net::TcpStream, select, time::Instant};
+use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message, handshake::client::Request}};
#[derive(Debug)]
pub enum CloseReason {
@@ -25,6 +19,11 @@ pub enum CloseReason {
ConnexionError(tungstenite::Error),
}
+pub struct Config {
+ pub token: String,
+ pub compress: bool,
+}
+
pub enum HandleResult {
Success,
Error(CloseReason),
@@ -39,7 +38,10 @@ pub enum HandleResult {
/// the MessageHandler trait.
pub struct Connexion {
state: State,
+ config: Config,
+ zlib: ZlibDecoder<Vec<u8>>,
connexion: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
+ nats: Option<async_nats::Connection>,
}
impl Connexion {
@@ -48,10 +50,13 @@ impl Connexion {
/// websocket connexion. This instance is not initialized by default.
/// a websocket connexion like this can be re-used multiple times
/// to allow reconnexion mechanisms.
- pub async fn new() -> Self {
+ pub async fn new(config: Config) -> Self {
Connexion {
state: State::default(),
connexion: None,
+ config,
+ zlib: ZlibDecoder::<Vec<u8>>::new(vec![]),
+ nats: None,
}
}
@@ -76,14 +81,19 @@ impl Connexion {
} else {
// we reset the state before starting the connection
self.state = State::default();
+ let request = Request::builder()
+ .header("User-Agant", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36")
+ .uri(get_gateway_url(false, "json", 9))
+ .body(())
+ .unwrap();
- let connexion_result = connect_async(get_gateway_url(false, "json", 9)).await;
+ let connexion_result = connect_async(request).await;
// we connect outselves to the websocket server
if let Err(err) = connexion_result {
- return CloseReason::ConnexionError(err)
+ return CloseReason::ConnexionError(err);
}
self.connexion = Some(connexion_result.unwrap().0);
-
+ self.nats = Some(async_nats::connect("localhost:4222").await.unwrap());
// this is the loop that will maintain the whole connexion
loop {
if let Some(connexion) = &mut self.connexion {
@@ -92,7 +102,7 @@ impl Connexion {
if self.state.stage == Stage::Unknown {
let msg = connexion.next().await;
if let HandleResult::Error(reason) = self._handle_message(&msg).await {
- return reason
+ return reason;
}
} else {
let timer = self.state.interval.as_mut().unwrap().tick();
@@ -106,7 +116,7 @@ impl Connexion {
}
}
} else {
- return CloseReason::ConnexionEnded
+ return CloseReason::ConnexionEnded;
}
}
}
@@ -133,16 +143,18 @@ impl Connexion {
HandleResult::Error(CloseReason::ConnexionEnded)
}
- _ => {
- HandleResult::Error(CloseReason::ErrorEncountered("unsupported message type encountered"))
- }
+ _ => HandleResult::Error(CloseReason::ErrorEncountered(
+ "unsupported message type encountered",
+ )),
},
- Err(_error) => {
- HandleResult::Error(CloseReason::ErrorEncountered("error while reading a message"))
- }
+ Err(_error) => HandleResult::Error(CloseReason::ErrorEncountered(
+ "error while reading a message",
+ )),
}
} else {
- HandleResult::Error(CloseReason::ErrorEncountered("error while reading a message"))
+ HandleResult::Error(CloseReason::ErrorEncountered(
+ "error while reading a message",
+ ))
}
}
@@ -156,14 +168,18 @@ impl Connexion {
}
match message.op {
- OpCodes::Dispatch => todo!(),
- OpCodes::Heartbeat => todo!(),
- OpCodes::Identify => todo!(),
+ OpCodes::Dispatch => {
+ let t = message.t.unwrap();
+ info!("dispatch message received: {:?}", t);
+ let topic = format!("nova.gateway.{}", t);
+ self.nats.as_ref().unwrap().publish(
+ &topic,
+ &serde_json::to_vec(&message.d).unwrap(),
+ ).await.unwrap();
+ },
OpCodes::PresenceUpdate => todo!(),
OpCodes::VoiceStateUpdate => todo!(),
- OpCodes::Resume => todo!(),
OpCodes::Reconnect => todo!(),
- OpCodes::RequestGuildMembers => todo!(),
OpCodes::InvalidSession => todo!(),
OpCodes::Hello => {
if let Ok(hello) = serde_json::from_value::<Hello>(message.d) {
@@ -174,6 +190,36 @@ impl Connexion {
Duration::from_millis(hello.heartbeat_interval),
));
self.state.stage = Stage::Initialized;
+ self._send(&MessageBase {
+ t: None,
+ op: OpCodes::Identify,
+ s: None,
+ d: serde_json::to_value(&Identify{
+ token: self.config.token.clone(),
+ intents: 1 << 0 |
+ 1 << 1 |
+ 1 << 2 |
+ 1 << 3 |
+ 1 << 4 |
+ 1 << 5 |
+ 1 << 6 |
+ 1 << 7 |
+ 1 << 8 |
+ 1 << 9 |
+ 1 << 10 |
+ 1 << 11 |
+ 1 << 12 |
+ 1 << 13 |
+ 1 << 14,
+ properties: IdentifyProprerties {
+ os: "Linux".into(),
+ browser: "Nova".into(),
+ device: "Linux".into(),
+ }
+ }).unwrap(),
+ }).await;
+ // do login
+ // todo: session logic
}
}
OpCodes::HeartbeatACK => {
@@ -183,35 +229,46 @@ impl Connexion {
);
self.state.last_heartbeat_acknowledged = true;
}
+ _ => {} // invalid payloads
}
}
async fn _do_heartbeat(&mut self) {
if !self.state.last_heartbeat_acknowledged {
- self._terminate_websocket(CloseReason::ErrorEncountered("the server did not acknowledged the last heartbeat")).await;
+ self._terminate_websocket(CloseReason::ErrorEncountered(
+ "the server did not acknowledged the last heartbeat",
+ ))
+ .await;
return;
}
self.state.last_heartbeat_acknowledged = false;
info!("sending heartbeat");
- self._send(
- serde_json::to_vec(&MessageBase {
- t: None,
- d: serde_json::to_value(self.state.sequence).unwrap(),
- s: None,
- op: OpCodes::Heartbeat,
- })
- .unwrap(),
- )
+ self._send(&MessageBase {
+ t: None,
+ d: serde_json::to_value(self.state.sequence).unwrap(),
+ s: None,
+ op: OpCodes::Heartbeat,
+ })
.await;
self.state.last_heartbeat_time = std::time::Instant::now();
}
- async fn _send(&mut self, data: Vec<u8>) {
+ async fn _send(&mut self, data: &MessageBase) {
if let Some(connexion) = &mut self.connexion {
- if let Err(error) = connexion.send(Message::Binary(data)).await {
- error!("failed to write to socket: {}", error);
- self._terminate_websocket(CloseReason::ErrorEncountered("failed to write to the socket")).await;
+ if let Ok(json) = serde_json::to_vec(data) {
+ if let Err(error) = connexion.send(Message::Binary(json)).await {
+ error!("failed to write to socket: {}", error);
+ self._terminate_websocket(CloseReason::ErrorEncountered(
+ "failed to write to the socket",
+ ))
+ .await;
+ }
+ } else {
+ self._terminate_websocket(CloseReason::ErrorEncountered(
+ "failed to serialize the message",
+ ))
+ .await;
}
}
}
diff --git a/gateway/src/client/payloads/message.rs b/gateway/src/client/payloads/message.rs
index 4b2a657..966136f 100644
--- a/gateway/src/client/payloads/message.rs
+++ b/gateway/src/client/payloads/message.rs
@@ -19,6 +19,14 @@ pub enum OpCodes {
HeartbeatACK = 11,
}
+#[derive(Serialize, Deserialize)]
+pub enum Dispatch {
+ #[serde(rename = "READY")]
+ Ready,
+ #[serde(rename = "RESUMED")]
+ Resumed,
+}
+
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub struct MessageBase {
pub t: Option<String>,
diff --git a/gateway/src/client/payloads/payloads.rs b/gateway/src/client/payloads/payloads.rs
index bcbdeb0..cf519df 100644
--- a/gateway/src/client/payloads/payloads.rs
+++ b/gateway/src/client/payloads/payloads.rs
@@ -7,4 +7,21 @@ pub struct Hello {
}
#[derive(Debug, Serialize, Deserialize)]
-pub struct HeartbeatACK {} \ No newline at end of file
+pub struct HeartbeatACK {}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct IdentifyProprerties {
+ #[serde(rename = "$os")]
+ pub os: String,
+ #[serde(rename = "$browser")]
+ pub browser: String,
+ #[serde(rename = "$device")]
+ pub device: String,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Identify {
+ pub token: String,
+ pub intents: i64,
+ pub properties: IdentifyProprerties,
+} \ No newline at end of file
diff --git a/gateway/src/main.rs b/gateway/src/main.rs
index ada348d..709e26b 100644
--- a/gateway/src/main.rs
+++ b/gateway/src/main.rs
@@ -1,4 +1,6 @@
-use client::traits::message_handler::MessageHandler;
+use std::env;
+
+use client::{connexion::Config, traits::message_handler::MessageHandler};
extern crate serde_json;
extern crate serde_repr;
@@ -12,8 +14,11 @@ async fn main() {
pretty_env_logger::init();
for _ in 0..1 {
tokio::spawn(async move {
- let con = client::connexion::Connexion::new().await;
- con.start().await;
+ let con = client::connexion::Connexion::new(Config {
+ token: env::var("DISCORD_TOKEN").expect("A valid token is required").into(),
+ compress: false,
+ }).await;
+ println!("{:?}", con.start().await);
}).await.unwrap();
}
} \ No newline at end of file