]> git.puffer.fish Git - matthieu/nova.git/commitdiff
work on the gateway
authorMatthieu <matthieu@developershouse.xyz>
Fri, 17 Sep 2021 20:17:22 +0000 (00:17 +0400)
committerMatthieu <matthieu@developershouse.xyz>
Fri, 17 Sep 2021 20:17:22 +0000 (00:17 +0400)
28 files changed:
.github/workflows/bazel-build.yml
Cargo.lock
common/rust/src/error.rs [new file with mode: 0644]
common/rust/src/lib.rs
common/rust/src/payloads.rs
gateway/Cargo.toml
gateway/config/default.toml [new file with mode: 0644]
gateway/src/client/connection/actions.rs [deleted file]
gateway/src/client/connection/mod.rs
gateway/src/client/connection/state.rs [deleted file]
gateway/src/client/connection/stream.rs
gateway/src/client/connection/utils.rs [new file with mode: 0644]
gateway/src/client/error_utils.rs [new file with mode: 0644]
gateway/src/client/mod.rs
gateway/src/client/payloads/dispatch.rs [new file with mode: 0644]
gateway/src/client/payloads/gateway.rs [new file with mode: 0644]
gateway/src/client/payloads/message.rs [deleted file]
gateway/src/client/payloads/mod.rs
gateway/src/client/payloads/payloads.rs [deleted file]
gateway/src/client/payloads/payloads/hello.rs [new file with mode: 0644]
gateway/src/client/payloads/payloads/identify.rs [new file with mode: 0644]
gateway/src/client/payloads/payloads/mod.rs [new file with mode: 0644]
gateway/src/client/shard/actions.rs
gateway/src/client/shard/connection.rs
gateway/src/client/shard/mod.rs
gateway/src/client/shard/state.rs
gateway/src/main.rs
webhook/src/handler/handler.rs

index 7648c7d8680b55271d232b3028727f06c15df858..2baaea1dba843163a370c5a1642309fddb23f514 100644 (file)
@@ -61,7 +61,7 @@ jobs:
       - name: Execute build script\r
         shell: bash\r
         run: |\r
-           bazel build //:package\r
+           bazel build //:packages\r
 \r
       - name: Bazel artifacts\r
         uses: actions/upload-artifact@v2\r
@@ -75,5 +75,5 @@ jobs:
       - name: Publish docker images\r
         shell: bash\r
         run: |\r
-           bazel run --define docker_repo=ghcr.io --define docker_tag=${{ steps.extract_branch.outputs.branch }} //:publish\r
+           bazel run --define docker_repo=ghcr.io --define docker_tag=${{ steps.extract_branch.outputs.branch }} //:container_publish\r
         if: matrix.os == 'ubuntu-latest'\r
index 218fe32df68e600e238465105462d529ca583b1f..a5a9375a87bd448f84ab9384e1c15413bd1adcb7 100644 (file)
@@ -674,6 +674,7 @@ version = "0.1.0"
 dependencies = [
  "async-nats",
  "async-stream",
+ "async-trait",
  "common",
  "enumflags2",
  "flate2",
diff --git a/common/rust/src/error.rs b/common/rust/src/error.rs
new file mode 100644 (file)
index 0000000..dcb7a54
--- /dev/null
@@ -0,0 +1,18 @@
+use std::fmt;
+
+pub struct NovaError {
+    pub message: String,
+}
+
+impl fmt::Display for NovaError {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "An error occured wihind the nova system: {}", self.message) // user-facing output
+    }
+}
+
+impl fmt::Debug for NovaError {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "{{ file: {}, line: {} }}", file!(), line!()) // programmer-facing output
+    }
+}
+
index 24e16ec097ec0c92822f0e189ff71d7fab3ba009..943d7ccdbae5abd414cd1f5208bf5dc7b27121b0 100644 (file)
@@ -3,4 +3,5 @@
 pub mod config;
 pub mod monitoring;
 pub mod nats;
-pub mod payloads;
\ No newline at end of file
+pub mod payloads;
+pub mod error;
\ No newline at end of file
index 195707771d1db4dc3919a8366742f8391c317759..6eb35c8d9b2dc3ff72cfc67f03712700cc68eb08 100644 (file)
@@ -4,6 +4,12 @@ use serde::{Deserialize, Serialize};
 #[derive(Serialize, Deserialize, Debug, Clone)]
 #[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))]
 pub struct CachePayload<T> {
-    pub tracing: (),
+    pub tracing: Tracing,
     pub data: T
 }
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct Tracing {
+    pub node_id: String,
+    pub span: Option<String>
+}
\ No newline at end of file
index 70da62e9333c93e48c86b8a198d46f8ff093ac21..b72a91edf1c10498869d0f8430d8b50a055a5e75 100644 (file)
@@ -21,4 +21,5 @@ async-nats = "0.10.1"
 enumflags2 = { version ="0.7.1", features = ["serde"] }
 common = { path = "../common/rust" }
 tokio-scoped = "0.1.0"
-futures = "0.3.17"
\ No newline at end of file
+futures = "0.3.17"
+async-trait = "0.1.51"
\ No newline at end of file
diff --git a/gateway/config/default.toml b/gateway/config/default.toml
new file mode 100644 (file)
index 0000000..d999fc9
--- /dev/null
@@ -0,0 +1,7 @@
+[monitoring]
+enabled = false
+
+[nats]
+host = "localhost"
+
+[gateway]
diff --git a/gateway/src/client/connection/actions.rs b/gateway/src/client/connection/actions.rs
deleted file mode 100644 (file)
index ed57613..0000000
+++ /dev/null
@@ -1,8 +0,0 @@
-use super::{Connection, state::ConnectionState};
-
-impl Connection {
-    /// Returns the current state of the connection.
-    pub fn state(&self) -> ConnectionState {
-        return self.state.clone();
-    }
-}
\ No newline at end of file
index a5867a77ad5c799899ac2133dcc714e733d1b1da..24ef334f2766f433dc8522ad07daf1fb6d8a1ff8 100644 (file)
@@ -1,24 +1,36 @@
+use super::{error_utils::GatewayError, utils::get_gateway_url};
 use tokio::net::TcpStream;
-use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
-
+use tokio_tungstenite::{
+    connect_async, tungstenite::handshake::client::Request, MaybeTlsStream, WebSocketStream,
+};
 mod stream;
-mod state;
-mod actions;
+mod utils;
 
 /// Underlying representation of a Discord event stream
 /// that streams the Event payloads to the shard structure
 pub struct Connection {
     /// The channel given by tokio_tungstenite that represents the websocket connection
     connection: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
-    /// The state of the connection
-    state: state::ConnectionState,
 }
 
 impl Connection {
     pub fn new() -> Self {
-        Connection {
-            connection: None,
-            state: state::ConnectionState::default()
+        Connection { connection: None }
+    }
+
+    pub async fn start(&mut self) -> Result<(), GatewayError> {
+        let request = Request::builder()
+            .uri(get_gateway_url(false, "json", 9))
+            .body(())
+            .unwrap();
+
+        let connection_result = connect_async(request).await;
+        // we connect outselves to the websocket server
+        if let Err(err) = connection_result {
+            Err(GatewayError::from(err))
+        } else {
+            self.connection = Some(connection_result.unwrap().0);
+            Ok(())
         }
     }
-}
\ No newline at end of file
+}
diff --git a/gateway/src/client/connection/state.rs b/gateway/src/client/connection/state.rs
deleted file mode 100644 (file)
index 972d79e..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-/// This struct represents the state of a connection
-#[derive(Debug, Clone)]
-pub struct ConnectionState {}
-impl Default for ConnectionState {
-    fn default() -> Self {
-        Self {  }
-    }
-}
-
-impl ConnectionState {}
\ No newline at end of file
index 9bfeacee27c4cf3a6b5597702b6cf582f0e84587..6a6f5c9dc00308b7580bace38e7c54457710a53b 100644 (file)
@@ -1,36 +1,88 @@
+use super::Connection;
+use crate::client::{error_utils::GatewayError};
+use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
+use log::info;
+use serde::Serialize;
 use std::{
     pin::Pin,
     task::{Context, Poll},
 };
-
-use super::Connection;
-use crate::client::payloads::message::MessageBase;
-use futures::{Sink, Stream};
+use tokio_tungstenite::tungstenite::Message;
 
 impl Stream for Connection {
-    type Item = MessageBase;
+    type Item = Result<crate::client::payloads::gateway::Message, GatewayError>;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        // first, when a poll is called, we check if the connection is still open
+        if let Some(conn) = &mut self.connection {
+            // we need to wait poll the message using the tokio_tungstenite stream
+            let message = conn.poll_next_unpin(cx);
 
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        Poll::Pending
+            match message {
+                Poll::Ready(packet) => {
+                    // if data is available, we can continue
+                    match packet {
+                        Some(result) => match result {
+                            Ok(message) => {
+                                match Box::pin(self._handle_message(&message)).poll_unpin(cx) {
+                                    Poll::Ready(data) => match data {
+                                        Ok(d) => Poll::Ready(Some(Ok(d))),
+                                        Err(e) => Poll::Ready(Some(Err(e))),
+                                    },
+                                    // unknown behaviour?
+                                    Poll::Pending => unimplemented!(),
+                                }
+                            }
+                            Err(e) => Poll::Ready(Some(Err(GatewayError::from(e)))),
+                        },
+                        // if no message is available, we return none, it's the end of the stream
+                        None => {
+                            info!("tokio_tungstenite stream finished successfully");
+                            Box::pin(conn.close(None)).poll_unpin(cx);
+                            self.connection = None;
+                            Poll::Ready(None)
+                        }
+                    }
+                }
+                // if the message is pending, we return the same result
+                Poll::Pending => Poll::Pending,
+            }
+        } else {
+            Poll::Pending
+        }
     }
 }
 
-impl Sink<MessageBase> for Connection {
-    type Error = ();
+impl<T: Serialize> Sink<crate::client::payloads::gateway::FullMessage<T>> for Connection {
+    type Error = GatewayError;
 
-    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        todo!()
+    #[allow(dead_code)]
+    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        if let Some(_) = &self.connection {
+            // a connection exists, we can send data
+            Poll::Ready(Ok(()))
+        } else {
+            Poll::Pending
+        }
     }
 
-    fn start_send(self: Pin<&mut Self>, item: MessageBase) -> Result<(), Self::Error> {
-        todo!()
+    #[allow(dead_code)]
+    fn start_send(mut self: Pin<&mut Self>, item: crate::client::payloads::gateway::FullMessage<T>) -> Result<(), Self::Error> {
+        if let Some(conn) = &mut self.connection {
+            let message = serde_json::to_string(&item);
+            conn.start_send_unpin(Message::Text(message.unwrap()))
+                .unwrap();
+        }
+        Ok(())
     }
 
-    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        todo!()
+    #[allow(dead_code)]
+    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
     }
 
-    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        todo!()
+    #[allow(dead_code)]
+    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
     }
 }
diff --git a/gateway/src/client/connection/utils.rs b/gateway/src/client/connection/utils.rs
new file mode 100644 (file)
index 0000000..49ccbcc
--- /dev/null
@@ -0,0 +1,29 @@
+use super::Connection;
+use crate::client::{error_utils::GatewayError};
+use std::str::from_utf8;
+use tokio_tungstenite::tungstenite::Message;
+
+impl Connection {
+    pub(crate) async fn _handle_message(
+        &mut self,
+        data: &Message,
+    ) -> Result<crate::client::payloads::gateway::Message, GatewayError> {
+        match data {
+            Message::Text(text) => self._handle_discord_message(&text).await,
+            Message::Binary(message) => {
+                self._handle_discord_message(from_utf8(message).unwrap())
+                    .await
+            }
+            _ => Err(GatewayError::from("unknown error".to_string())),
+        }
+    }
+
+    async fn _handle_discord_message(
+        &mut self,
+        raw_message: &str,
+    ) -> Result<crate::client::payloads::gateway::Message, GatewayError> {
+        let a: Result<crate::client::payloads::gateway::Message, serde_json::Error> = serde_json::from_str(raw_message);
+        let message = a.unwrap();
+        Ok(message)
+    }
+}
diff --git a/gateway/src/client/error_utils.rs b/gateway/src/client/error_utils.rs
new file mode 100644 (file)
index 0000000..603caab
--- /dev/null
@@ -0,0 +1,24 @@
+use common::error::NovaError;
+
+#[derive(Debug)]
+pub struct GatewayError(NovaError);
+
+impl From<tokio_tungstenite::tungstenite::Error> for GatewayError {
+    fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
+        GatewayError {
+            0: NovaError {
+                message: e.to_string(),
+            },
+        }
+    }
+}
+
+impl From<String> for GatewayError {
+    fn from(e: String) -> Self {
+        GatewayError {
+            0: NovaError {
+                message: e,
+            },
+        }
+    }
+}
index 5e99ebcd99f1513eb472eb719768ce49203c6bff..51d8995a03cc1739e91f72fd43a40dcce1e08b8b 100644 (file)
@@ -1,3 +1,5 @@
 pub mod connection;
 pub mod payloads;
-pub mod shard;
\ No newline at end of file
+pub mod shard;
+pub mod utils;
+mod error_utils;
\ No newline at end of file
diff --git a/gateway/src/client/payloads/dispatch.rs b/gateway/src/client/payloads/dispatch.rs
new file mode 100644 (file)
index 0000000..62893b1
--- /dev/null
@@ -0,0 +1,20 @@
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+#[derive(Serialize, Deserialize, Clone, Debug)]
+pub struct Ready {
+    #[serde(rename = "v")]
+    version: u64,
+    user: Value,
+    guilds: Vec<Value>,
+    session_id: String,
+    shard: Option<[i64;2]>,
+    application: Value,
+}
+
+#[derive(Serialize, Deserialize, Clone, Debug)]
+#[serde(tag = "t", content = "d")]
+pub enum Dispatch {
+    #[serde(rename = "READY")]
+    Ready(Ready),
+}
\ No newline at end of file
diff --git a/gateway/src/client/payloads/gateway.rs b/gateway/src/client/payloads/gateway.rs
new file mode 100644 (file)
index 0000000..788a05b
--- /dev/null
@@ -0,0 +1,86 @@
+use super::dispatch::Dispatch;
+use super::payloads::hello::Hello;
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+use serde_repr::{Deserialize_repr, Serialize_repr};
+
+macro_rules! num_to_enum {
+    ($num:expr => $enm:ident<$tpe:ty>{ $($fld:ident),+ }; $err:expr) => ({
+        match $num {
+            $(_ if $num == $enm::$fld as $tpe => { $enm::$fld })+
+            _ => $err
+        }
+    });
+}
+
+#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)]
+#[repr(u8)]
+pub enum OpCodes {
+    Dispatch = 0,
+    Heartbeat = 1,
+    Identify = 2,
+    PresenceUpdate = 3,
+    VoiceStateUpdate = 4,
+    Resume = 6,
+    Reconnect = 7,
+    RequestGuildMembers = 8,
+    InvalidSession = 9,
+    Hello = 10,
+    HeartbeatACK = 11,
+}
+
+#[derive(Serialize, Deserialize, PartialEq, Debug)]
+#[serde(bound(deserialize = "T: Deserialize<'de>"))]
+pub struct FullMessage<T> {
+    #[serde(rename = "d")]
+    pub dispatch_type: Option<String>,
+    #[serde(rename = "s")]
+    pub sequence: Option<OpCodes>,
+    pub op: OpCodes,
+    #[serde(rename = "d")]
+    pub data: T,
+}
+
+pub enum Message {
+    Dispatch(FullMessage<Dispatch>),
+    Reconnect(FullMessage<()>),
+    InvalidSession(FullMessage<bool>),
+    Hello(FullMessage<Hello>),
+    HeartbeatACK(FullMessage<()>),
+}
+
+impl<'de> serde::Deserialize<'de> for Message {
+    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
+        let value = Value::deserialize(d)?;
+        let val = value.get("op").and_then(Value::as_u64).unwrap();
+        let op_code = num_to_enum!(
+            val => OpCodes<u64>{
+                Dispatch,
+                Heartbeat,
+                Identify,
+                PresenceUpdate,
+                VoiceStateUpdate,
+                Resume,
+                Reconnect,
+                RequestGuildMembers,
+                InvalidSession,
+                Hello,
+                HeartbeatACK
+            };
+            panic!("Cannot convert number to `MyEnum`")
+        );
+
+        match op_code {
+            OpCodes::Dispatch => Ok(Message::Dispatch(FullMessage::deserialize(value).unwrap())),
+            OpCodes::Reconnect => Ok(Message::Reconnect(FullMessage::deserialize(value).unwrap())),
+            OpCodes::InvalidSession => Ok(Message::InvalidSession(
+                FullMessage::deserialize(value).unwrap(),
+            )),
+            OpCodes::Hello => Ok(Message::Hello(FullMessage::deserialize(value).unwrap())),
+            OpCodes::HeartbeatACK => Ok(Message::HeartbeatACK(
+                FullMessage::deserialize(value).unwrap(),
+            )),
+            _ => panic!("Cannot convert"),
+        }
+    }
+}
diff --git a/gateway/src/client/payloads/message.rs b/gateway/src/client/payloads/message.rs
deleted file mode 100644 (file)
index 966136f..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-use serde_json::Value;
-use serde_repr::{Serialize_repr, Deserialize_repr};
-use serde::{Deserialize, Serialize};
-
-
-#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)]
-#[repr(u8)]
-pub enum OpCodes {
-    Dispatch = 0,
-    Heartbeat = 1,
-    Identify = 2,
-    PresenceUpdate = 3,
-    VoiceStateUpdate = 4,
-    Resume = 6,
-    Reconnect = 7,
-    RequestGuildMembers = 8,
-    InvalidSession = 9,
-    Hello = 10,
-    HeartbeatACK = 11,
-}
-
-#[derive(Serialize, Deserialize)]
-pub enum Dispatch {
-    #[serde(rename = "READY")]
-    Ready,
-    #[serde(rename = "RESUMED")]
-    Resumed,
-}
-
-#[derive(Serialize, Deserialize, PartialEq, Debug)]
-pub struct MessageBase {
-    pub t: Option<String>,
-    pub s: Option<i64>,
-    pub op: OpCodes,
-    pub d: Value
-}
index d0a5e38b585ee14100a0aa7a6b23934d31934566..e43a323e5abc90adcaab5e5f4179d16f70eb597c 100644 (file)
@@ -1,2 +1,3 @@
 pub mod payloads;
-pub mod message;
\ No newline at end of file
+pub mod dispatch;
+pub mod gateway;
\ No newline at end of file
diff --git a/gateway/src/client/payloads/payloads.rs b/gateway/src/client/payloads/payloads.rs
deleted file mode 100644 (file)
index 0fec3dc..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-use serde::{Serialize, Deserialize};
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct Hello {
-    #[serde(rename = "heartbeat_interval")]
-    pub heartbeat_interval: u64
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct HeartbeatACK {}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct IdentifyProprerties {
-    #[serde(rename = "$os")]
-    pub os: String,
-    #[serde(rename = "$browser")]
-    pub browser: String,
-    #[serde(rename = "$device")]
-    pub device: String,
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct Identify {
-    pub token: String,
-    pub intents: u16,
-    pub properties: IdentifyProprerties,
-    pub shard: Option<[i64; 2]>,
-}
\ No newline at end of file
diff --git a/gateway/src/client/payloads/payloads/hello.rs b/gateway/src/client/payloads/payloads/hello.rs
new file mode 100644 (file)
index 0000000..0690a61
--- /dev/null
@@ -0,0 +1,7 @@
+use serde::{Serialize, Deserialize};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Hello {
+    #[serde(rename = "heartbeat_interval")]
+    pub heartbeat_interval: u64
+}
diff --git a/gateway/src/client/payloads/payloads/identify.rs b/gateway/src/client/payloads/payloads/identify.rs
new file mode 100644 (file)
index 0000000..83f038a
--- /dev/null
@@ -0,0 +1,19 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct IdentifyProprerties {
+    #[serde(rename = "$os")]
+    pub os: String,
+    #[serde(rename = "$browser")]
+    pub browser: String,
+    #[serde(rename = "$device")]
+    pub device: String,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Identify {
+    pub token: String,
+    pub intents: u16,
+    pub properties: IdentifyProprerties,
+    pub shard: Option<[i64; 2]>,
+}
\ No newline at end of file
diff --git a/gateway/src/client/payloads/payloads/mod.rs b/gateway/src/client/payloads/payloads/mod.rs
new file mode 100644 (file)
index 0000000..aa5a6de
--- /dev/null
@@ -0,0 +1,2 @@
+pub mod hello;
+pub mod identify;
\ No newline at end of file
index cb212bf0b3ac751928fa967ec725fcd366f801cb..cb29ace5c5b21b80bc826a2a62e132a2fd7fed80 100644 (file)
@@ -2,7 +2,7 @@ use futures::SinkExt;
 use log::error;
 use serde_json::Value;
 
-use crate::client::payloads::message::{MessageBase, OpCodes};
+use crate::client::payloads::gateway::{FullMessage, OpCodes};
 
 use super::Shard;
 
@@ -13,14 +13,15 @@ impl Shard {
     pub async fn presence_update(&mut self) -> Result<(), ()> {
         if let Some(connection) = &mut self.connection {
             connection
-                .send(MessageBase {
-                    t: None,
-                    s: None,
+                .send(FullMessage {
+                    dispatch_type: None,
+                    sequence: None,
                     op: OpCodes::PresenceUpdate,
                     // todo: proper payload for this
-                    d: Value::Null,
+                    data: Value::Null,
                 })
-                .await?;
+                .await
+                .unwrap();
         } else {
             error!("the connection is not open")
         }
@@ -31,14 +32,15 @@ impl Shard {
     pub async fn voice_state_update(&mut self) -> Result<(), ()> {
         if let Some(connection) = &mut self.connection {
             connection
-                .send(MessageBase {
-                    t: None,
-                    s: None,
+                .send(FullMessage {
+                    dispatch_type: None,
+                    sequence: None,
                     op: OpCodes::VoiceStateUpdate,
                     // todo: proper payload for this
-                    d: Value::Null,
+                    data: Value::Null,
                 })
-                .await?;
+                .await
+                .unwrap();
         } else {
             error!("the connection is not open")
         }
@@ -49,14 +51,15 @@ impl Shard {
     pub async fn request_guild_members(&mut self) -> Result<(), ()> {
         if let Some(connection) = &mut self.connection {
             connection
-                .send(MessageBase {
-                    t: None,
-                    s: None,
+                .send(FullMessage {
+                    dispatch_type: None,
+                    sequence: None,
                     op: OpCodes::RequestGuildMembers,
                     // todo: proper payload for this
-                    d: Value::Null,
+                    data: Value::Null,
                 })
-                .await?;
+                .await
+                .unwrap();
         } else {
             error!("the connection is not open")
         }
index ca0ef078fc960df792129e7a8152e567965cdd2f..3395ff2e1a293e21c01a8aff19f18a849054783f 100644 (file)
@@ -3,13 +3,15 @@ use crate::client::connection::Connection;
 use log::info;
 
 impl Shard {
-    async fn start(self: &mut Self) {
+    pub async fn start(self: &mut Self) {
         let mut should_exit = false;
 
         while !should_exit {
             info!("Starting connection for shard");
             // create the new connection
-            self.connection = Some(Connection::new());
+            let mut connection = Connection::new();
+            connection.start().await.unwrap();
+            self.connection = Some(connection);
             should_exit = true;
         }
     }
index 1962d07ea0d63e856c6b2dcafce253da8e4cae7f..aec93d6a8d42b8907fbf8a270c7838b617ff9536 100644 (file)
@@ -1,5 +1,3 @@
-use std::sync::Arc;
-
 use self::state::SessionState;
 
 use super::connection::Connection;
index 6c10871657c21c551d9e981ab48b99d2a54ebba3..4d40911a06c12d2ad83ed59cec94f2387af7e3c8 100644 (file)
@@ -1,3 +1,5 @@
+use std::time::Instant;
+
 /// This struct represents the state of a session
 pub struct SessionState {
     pub sequence: u64,
@@ -12,3 +14,23 @@ impl Default for SessionState {
         }
     }
 }
+
+/// This struct represents the state of a connection
+#[derive(Debug, Clone)]
+pub struct ConnectionState {
+    pub sequence: u64,
+    pub last_heartbeat_acknowledged: bool,
+    pub last_heartbeat_time: Instant,
+    
+}
+impl Default for ConnectionState {
+    fn default() -> Self {
+        Self {
+            sequence: 0,
+            last_heartbeat_acknowledged: true,
+            last_heartbeat_time: Instant::now(),
+        }
+    }
+}
+
+impl ConnectionState {}
\ No newline at end of file
index 97334878e0e4a67c578849372433b381114c0a38..003a9035c5df1315191f1576b933567e3c1af391 100644 (file)
@@ -1,8 +1,41 @@
 mod client;
 
+use client::connection::Connection;
 use common::config::Settings;
+use futures::StreamExt;
+use log::info;
+use serde_json::Value;
+
+use crate::client::payloads::{dispatch::Dispatch, gateway::{FullMessage, Message, OpCodes}, payloads::identify::{Identify, IdentifyProprerties}};
 
 #[tokio::main]
-async fn main() {    
-    let settings: Settings<()> = Settings::new("gateway").unwrap();
+async fn main() {
+    let settings: Settings<Value> = Settings::new("gateway").unwrap();
+
+    let mut conn = Connection::new();
+    conn.start().await.unwrap();
+
+    loop {
+        if let Some(val) = conn.next().await {
+            let data = val.as_ref().unwrap();
+            match data {
+                Message::Dispatch(dispatch) => {
+                    match &dispatch.data {
+                        Dispatch::Ready(_ready) => {
+                            
+                        },
+                    }
+                },
+                Message::Reconnect(_) => todo!(),
+                Message::InvalidSession(_) => todo!(),
+                Message::Hello(_hello) => {
+                    info!("Server said hello! {:?}", _hello);
+                },
+                Message::HeartbeatACK(_) => todo!(),
+            }
+
+        } else {
+            break;
+        }
+    }
 }
index 923c650c5fb3c465937d5bd3f34c01f761883225..b993aaaab81a59127d8d297fae46ada3793616ee 100644 (file)
@@ -88,7 +88,10 @@ impl Service<Request<Body>> for HandlerService {
                         },
                         _ => {
                             let payload = serde_json::to_string(&common::payloads::CachePayload {
-                                tracing: (),
+                                tracing: common::payloads::Tracing {
+                                    node_id: "".to_string(),
+                                    span: None,
+                                },
                                 data: value,
                             }).unwrap();