]> git.puffer.fish Git - matthieu/nova.git/commitdiff
add base dispatch handling
authorMatthieu <matthieu@developershouse.xyz>
Tue, 7 Sep 2021 21:51:36 +0000 (01:51 +0400)
committerMatthieu <matthieu@developershouse.xyz>
Tue, 7 Sep 2021 21:51:36 +0000 (01:51 +0400)
gateway/Cargo.toml
gateway/cargo/BUILD.bazel
gateway/src/client/connexion.rs
gateway/src/client/payloads/message.rs
gateway/src/client/payloads/payloads.rs
gateway/src/main.rs

index 7b1e35dcd6f6414817ac79020aa785e36800e1d1..f73d70a8c1893f041b574112412685f82df2b395 100644 (file)
@@ -5,7 +5,7 @@ edition = "2018"
 
 [dependencies]
 tokio = { version = "1", features = ["full"] }
-tokio-tungstenite = { version = "*", features = ["rustls"] }
+tokio-tungstenite = { version = "*", features = ["rustls-tls"] }
 url = "2.2.2"
 futures-util = "0.3.17"
 log = { version = "0.4", features = ["std"] }
@@ -16,3 +16,5 @@ tokio-stream = "0.1.7"
 async-stream = "0.3.2"
 futures-core = "0.3.17"
 serde_repr = "0.1"
+flate2 = "1.0"
+async-nats = "0.10.1"
\ No newline at end of file
index a6d9cf339edf2a18515547b0bd01c1c30e919e16..bd87c7cfe4a832818af6fbd8c3baa3f8bd9316f8 100644 (file)
@@ -12,6 +12,15 @@ licenses([
 ])
 
 # Aliased targets
+alias(
+    name = "async_nats",
+    actual = "@raze__async_nats__0_10_1//:async_nats",
+    tags = [
+        "cargo-raze",
+        "manual",
+    ],
+)
+
 alias(
     name = "async_stream",
     actual = "@raze__async_stream__0_3_2//:async_stream",
@@ -21,6 +30,15 @@ alias(
     ],
 )
 
+alias(
+    name = "flate2",
+    actual = "@raze__flate2__1_0_21//:flate2",
+    tags = [
+        "cargo-raze",
+        "manual",
+    ],
+)
+
 alias(
     name = "futures_core",
     actual = "@raze__futures_core__0_3_17//:futures_core",
index 069bbf9995ce823d9d34436b2f3c1a0f0616e2ba..f40178bfd2cd36db00c282efd062313636bb27a5 100644 (file)
@@ -1,21 +1,15 @@
-use crate::client::payloads::{message::OpCodes, payloads::Hello};
-
+use crate::client::payloads::{message::OpCodes, payloads::{Hello, Identify, IdentifyProprerties}};
 use super::{
     payloads::message::MessageBase,
     state::{Stage, State},
     utils::get_gateway_url,
 };
-use futures_util::{
-    SinkExt, StreamExt,
-};
-use log::{error, info, warn};
+use flate2::write::ZlibDecoder;
+use futures_util::{SinkExt, StreamExt};
+use log::{debug, error, info, warn};
 use std::{str::from_utf8, time::Duration};
-use tokio::{
-    net::TcpStream,
-    select,
-    time::{Instant},
-};
-use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message}};
+use tokio::{net::TcpStream, select, time::Instant};
+use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message, handshake::client::Request}};
 
 #[derive(Debug)]
 pub enum CloseReason {
@@ -25,6 +19,11 @@ pub enum CloseReason {
     ConnexionError(tungstenite::Error),
 }
 
+pub struct Config {
+    pub token: String,
+    pub compress: bool,
+}
+
 pub enum HandleResult {
     Success,
     Error(CloseReason),
@@ -39,7 +38,10 @@ pub enum HandleResult {
 /// the MessageHandler trait.
 pub struct Connexion {
     state: State,
+    config: Config,
+    zlib: ZlibDecoder<Vec<u8>>,
     connexion: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
+    nats: Option<async_nats::Connection>,
 }
 
 impl Connexion {
@@ -48,10 +50,13 @@ impl Connexion {
     /// websocket connexion. This instance is not initialized by default.
     /// a websocket connexion like this can be re-used multiple times
     /// to allow reconnexion mechanisms.
-    pub async fn new() -> Self {
+    pub async fn new(config: Config) -> Self {
         Connexion {
             state: State::default(),
             connexion: None,
+            config,
+            zlib: ZlibDecoder::<Vec<u8>>::new(vec![]),
+            nats: None,
         }
     }
 
@@ -76,14 +81,19 @@ impl Connexion {
         } else {
             // we reset the state before starting the connection
             self.state = State::default();
+            let request = Request::builder()
+                .header("User-Agant", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36")
+                .uri(get_gateway_url(false, "json", 9))
+                .body(())
+                .unwrap();
 
-            let connexion_result = connect_async(get_gateway_url(false, "json", 9)).await;
+            let connexion_result = connect_async(request).await;
             // we connect outselves to the websocket server
             if let Err(err) = connexion_result {
-                return CloseReason::ConnexionError(err)
+                return CloseReason::ConnexionError(err);
             }
             self.connexion = Some(connexion_result.unwrap().0);
-
+            self.nats = Some(async_nats::connect("localhost:4222").await.unwrap());
             // this is the loop that will maintain the whole connexion
             loop {
                 if let Some(connexion) = &mut self.connexion {
@@ -92,7 +102,7 @@ impl Connexion {
                     if self.state.stage == Stage::Unknown {
                         let msg = connexion.next().await;
                         if let HandleResult::Error(reason) = self._handle_message(&msg).await {
-                            return reason
+                            return reason;
                         }
                     } else {
                         let timer = self.state.interval.as_mut().unwrap().tick();
@@ -106,7 +116,7 @@ impl Connexion {
                         }
                     }
                 } else {
-                    return CloseReason::ConnexionEnded
+                    return CloseReason::ConnexionEnded;
                 }
             }
         }
@@ -133,16 +143,18 @@ impl Connexion {
                         HandleResult::Error(CloseReason::ConnexionEnded)
                     }
 
-                    _ => {
-                        HandleResult::Error(CloseReason::ErrorEncountered("unsupported message type encountered"))
-                    }
+                    _ => HandleResult::Error(CloseReason::ErrorEncountered(
+                        "unsupported message type encountered",
+                    )),
                 },
-                Err(_error) => {
-                    HandleResult::Error(CloseReason::ErrorEncountered("error while reading a message"))
-                }
+                Err(_error) => HandleResult::Error(CloseReason::ErrorEncountered(
+                    "error while reading a message",
+                )),
             }
         } else {
-            HandleResult::Error(CloseReason::ErrorEncountered("error while reading a message"))
+            HandleResult::Error(CloseReason::ErrorEncountered(
+                "error while reading a message",
+            ))
         }
     }
 
@@ -156,14 +168,18 @@ impl Connexion {
         }
 
         match message.op {
-            OpCodes::Dispatch => todo!(),
-            OpCodes::Heartbeat => todo!(),
-            OpCodes::Identify => todo!(),
+            OpCodes::Dispatch => {
+                let t = message.t.unwrap();
+                info!("dispatch message received: {:?}", t);
+                let topic = format!("nova.gateway.{}", t);
+                self.nats.as_ref().unwrap().publish(
+                    &topic,
+                    &serde_json::to_vec(&message.d).unwrap(),
+                ).await.unwrap();
+            },
             OpCodes::PresenceUpdate => todo!(),
             OpCodes::VoiceStateUpdate => todo!(),
-            OpCodes::Resume => todo!(),
             OpCodes::Reconnect => todo!(),
-            OpCodes::RequestGuildMembers => todo!(),
             OpCodes::InvalidSession => todo!(),
             OpCodes::Hello => {
                 if let Ok(hello) = serde_json::from_value::<Hello>(message.d) {
@@ -174,6 +190,36 @@ impl Connexion {
                         Duration::from_millis(hello.heartbeat_interval),
                     ));
                     self.state.stage = Stage::Initialized;
+                    self._send(&MessageBase {
+                        t: None,
+                        op: OpCodes::Identify,
+                        s: None,
+                        d: serde_json::to_value(&Identify{
+                            token: self.config.token.clone(),
+                            intents: 1 << 0 |
+                                1 << 1 |
+                                1 << 2 |
+                                1 << 3 |
+                                1 << 4 |
+                                1 << 5 |
+                                1 << 6 |
+                                1 << 7 |
+                                1 << 8 |
+                                1 << 9 | 
+                                1 << 10 |
+                                1 << 11 |
+                                1 << 12 |
+                                1 << 13 |
+                                1 << 14,
+                            properties: IdentifyProprerties {
+                                os: "Linux".into(),
+                                browser: "Nova".into(),
+                                device: "Linux".into(),
+                            }
+                        }).unwrap(),
+                    }).await;
+                    // do login
+                    // todo: session logic
                 }
             }
             OpCodes::HeartbeatACK => {
@@ -183,35 +229,46 @@ impl Connexion {
                 );
                 self.state.last_heartbeat_acknowledged = true;
             }
+            _ => {} // invalid payloads
         }
     }
 
     async fn _do_heartbeat(&mut self) {
         if !self.state.last_heartbeat_acknowledged {
-            self._terminate_websocket(CloseReason::ErrorEncountered("the server did not acknowledged the last heartbeat")).await;
+            self._terminate_websocket(CloseReason::ErrorEncountered(
+                "the server did not acknowledged the last heartbeat",
+            ))
+            .await;
             return;
         }
         self.state.last_heartbeat_acknowledged = false;
 
         info!("sending heartbeat");
-        self._send(
-            serde_json::to_vec(&MessageBase {
-                t: None,
-                d: serde_json::to_value(self.state.sequence).unwrap(),
-                s: None,
-                op: OpCodes::Heartbeat,
-            })
-            .unwrap(),
-        )
+        self._send(&MessageBase {
+            t: None,
+            d: serde_json::to_value(self.state.sequence).unwrap(),
+            s: None,
+            op: OpCodes::Heartbeat,
+        })
         .await;
         self.state.last_heartbeat_time = std::time::Instant::now();
     }
 
-    async fn _send(&mut self, data: Vec<u8>) {
+    async fn _send(&mut self, data: &MessageBase) {
         if let Some(connexion) = &mut self.connexion {
-            if let Err(error) = connexion.send(Message::Binary(data)).await {
-                error!("failed to write to socket: {}", error);
-                self._terminate_websocket(CloseReason::ErrorEncountered("failed to write to the socket")).await;
+            if let Ok(json) = serde_json::to_vec(data) {
+                if let Err(error) = connexion.send(Message::Binary(json)).await {
+                    error!("failed to write to socket: {}", error);
+                    self._terminate_websocket(CloseReason::ErrorEncountered(
+                        "failed to write to the socket",
+                    ))
+                    .await;
+                }
+            } else {
+                self._terminate_websocket(CloseReason::ErrorEncountered(
+                    "failed to serialize the message",
+                ))
+                .await;
             }
         }
     }
index 4b2a657d73914826705afce3318315376f689350..966136f8b1d9f192d2d23701f2dc19b6c01ee3c8 100644 (file)
@@ -19,6 +19,14 @@ pub enum OpCodes {
     HeartbeatACK = 11,
 }
 
+#[derive(Serialize, Deserialize)]
+pub enum Dispatch {
+    #[serde(rename = "READY")]
+    Ready,
+    #[serde(rename = "RESUMED")]
+    Resumed,
+}
+
 #[derive(Serialize, Deserialize, PartialEq, Debug)]
 pub struct MessageBase {
     pub t: Option<String>,
index bcbdeb04939c20c7e21ba0e6263cedfb239589d8..cf519df5ea8f2fab55bebb6eca6b161f3e24561a 100644 (file)
@@ -7,4 +7,21 @@ pub struct Hello {
 }
 
 #[derive(Debug, Serialize, Deserialize)]
-pub struct HeartbeatACK {}
\ No newline at end of file
+pub struct HeartbeatACK {}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct IdentifyProprerties {
+    #[serde(rename = "$os")]
+    pub os: String,
+    #[serde(rename = "$browser")]
+    pub browser: String,
+    #[serde(rename = "$device")]
+    pub device: String,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Identify {
+    pub token: String,
+    pub intents: i64,
+    pub properties: IdentifyProprerties,
+}
\ No newline at end of file
index ada348d7ba38f3964108cb2f74e65d3eb10cef7e..709e26b4e64d2ee230811a423294ce8b9aca369f 100644 (file)
@@ -1,4 +1,6 @@
-use client::traits::message_handler::MessageHandler;
+use std::env;
+
+use client::{connexion::Config, traits::message_handler::MessageHandler};
 extern crate serde_json;
 extern crate serde_repr;
 
@@ -12,8 +14,11 @@ async fn main() {
     pretty_env_logger::init();
     for _ in 0..1 {
         tokio::spawn(async move {
-            let con = client::connexion::Connexion::new().await;
-            con.start().await;
+            let con = client::connexion::Connexion::new(Config {
+                token: env::var("DISCORD_TOKEN").expect("A valid token is required").into(),
+                compress: false,
+            }).await;
+            println!("{:?}", con.start().await);
         }).await.unwrap();
     }
 }
\ No newline at end of file