From 88300c45202a228d54c1e99dd3b295ef3fb9aabd Mon Sep 17 00:00:00 2001 From: Matthieu Date: Sat, 18 Sep 2021 00:17:22 +0400 Subject: [PATCH] work on the gateway --- .github/workflows/bazel-build.yml | 4 +- Cargo.lock | 1 + common/rust/src/error.rs | 18 ++++ common/rust/src/lib.rs | 3 +- common/rust/src/payloads.rs | 8 +- gateway/Cargo.toml | 3 +- gateway/config/default.toml | 7 ++ gateway/src/client/connection/actions.rs | 8 -- gateway/src/client/connection/mod.rs | 32 ++++--- gateway/src/client/connection/state.rs | 10 --- gateway/src/client/connection/stream.rs | 86 +++++++++++++++---- gateway/src/client/connection/utils.rs | 29 +++++++ gateway/src/client/error_utils.rs | 24 ++++++ gateway/src/client/mod.rs | 4 +- gateway/src/client/payloads/dispatch.rs | 20 +++++ gateway/src/client/payloads/gateway.rs | 86 +++++++++++++++++++ gateway/src/client/payloads/message.rs | 36 -------- gateway/src/client/payloads/mod.rs | 3 +- gateway/src/client/payloads/payloads/hello.rs | 7 ++ .../{payloads.rs => payloads/identify.rs} | 11 +-- gateway/src/client/payloads/payloads/mod.rs | 2 + gateway/src/client/shard/actions.rs | 35 ++++---- gateway/src/client/shard/connection.rs | 6 +- gateway/src/client/shard/mod.rs | 2 - gateway/src/client/shard/state.rs | 22 +++++ gateway/src/main.rs | 37 +++++++- webhook/src/handler/handler.rs | 5 +- 27 files changed, 388 insertions(+), 121 deletions(-) create mode 100644 common/rust/src/error.rs create mode 100644 gateway/config/default.toml delete mode 100644 gateway/src/client/connection/actions.rs delete mode 100644 gateway/src/client/connection/state.rs create mode 100644 gateway/src/client/connection/utils.rs create mode 100644 gateway/src/client/error_utils.rs create mode 100644 gateway/src/client/payloads/dispatch.rs create mode 100644 gateway/src/client/payloads/gateway.rs delete mode 100644 gateway/src/client/payloads/message.rs create mode 100644 gateway/src/client/payloads/payloads/hello.rs rename gateway/src/client/payloads/{payloads.rs => payloads/identify.rs} (63%) create mode 100644 gateway/src/client/payloads/payloads/mod.rs diff --git a/.github/workflows/bazel-build.yml b/.github/workflows/bazel-build.yml index 7648c7d..2baaea1 100644 --- a/.github/workflows/bazel-build.yml +++ b/.github/workflows/bazel-build.yml @@ -61,7 +61,7 @@ jobs: - name: Execute build script shell: bash run: | - bazel build //:package + bazel build //:packages - name: Bazel artifacts uses: actions/upload-artifact@v2 @@ -75,5 +75,5 @@ jobs: - name: Publish docker images shell: bash run: | - bazel run --define docker_repo=ghcr.io --define docker_tag=${{ steps.extract_branch.outputs.branch }} //:publish + bazel run --define docker_repo=ghcr.io --define docker_tag=${{ steps.extract_branch.outputs.branch }} //:container_publish if: matrix.os == 'ubuntu-latest' diff --git a/Cargo.lock b/Cargo.lock index 218fe32..a5a9375 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -674,6 +674,7 @@ version = "0.1.0" dependencies = [ "async-nats", "async-stream", + "async-trait", "common", "enumflags2", "flate2", diff --git a/common/rust/src/error.rs b/common/rust/src/error.rs new file mode 100644 index 0000000..dcb7a54 --- /dev/null +++ b/common/rust/src/error.rs @@ -0,0 +1,18 @@ +use std::fmt; + +pub struct NovaError { + pub message: String, +} + +impl fmt::Display for NovaError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "An error occured wihind the nova system: {}", self.message) // user-facing output + } +} + +impl fmt::Debug for NovaError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{{ file: {}, line: {} }}", file!(), line!()) // programmer-facing output + } +} + diff --git a/common/rust/src/lib.rs b/common/rust/src/lib.rs index 24e16ec..943d7cc 100644 --- a/common/rust/src/lib.rs +++ b/common/rust/src/lib.rs @@ -3,4 +3,5 @@ pub mod config; pub mod monitoring; pub mod nats; -pub mod payloads; \ No newline at end of file +pub mod payloads; +pub mod error; \ No newline at end of file diff --git a/common/rust/src/payloads.rs b/common/rust/src/payloads.rs index 1957077..6eb35c8 100644 --- a/common/rust/src/payloads.rs +++ b/common/rust/src/payloads.rs @@ -4,6 +4,12 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))] pub struct CachePayload { - pub tracing: (), + pub tracing: Tracing, pub data: T } + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Tracing { + pub node_id: String, + pub span: Option +} \ No newline at end of file diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 70da62e..b72a91e 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -21,4 +21,5 @@ async-nats = "0.10.1" enumflags2 = { version ="0.7.1", features = ["serde"] } common = { path = "../common/rust" } tokio-scoped = "0.1.0" -futures = "0.3.17" \ No newline at end of file +futures = "0.3.17" +async-trait = "0.1.51" \ No newline at end of file diff --git a/gateway/config/default.toml b/gateway/config/default.toml new file mode 100644 index 0000000..d999fc9 --- /dev/null +++ b/gateway/config/default.toml @@ -0,0 +1,7 @@ +[monitoring] +enabled = false + +[nats] +host = "localhost" + +[gateway] diff --git a/gateway/src/client/connection/actions.rs b/gateway/src/client/connection/actions.rs deleted file mode 100644 index ed57613..0000000 --- a/gateway/src/client/connection/actions.rs +++ /dev/null @@ -1,8 +0,0 @@ -use super::{Connection, state::ConnectionState}; - -impl Connection { - /// Returns the current state of the connection. - pub fn state(&self) -> ConnectionState { - return self.state.clone(); - } -} \ No newline at end of file diff --git a/gateway/src/client/connection/mod.rs b/gateway/src/client/connection/mod.rs index a5867a7..24ef334 100644 --- a/gateway/src/client/connection/mod.rs +++ b/gateway/src/client/connection/mod.rs @@ -1,24 +1,36 @@ +use super::{error_utils::GatewayError, utils::get_gateway_url}; use tokio::net::TcpStream; -use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; - +use tokio_tungstenite::{ + connect_async, tungstenite::handshake::client::Request, MaybeTlsStream, WebSocketStream, +}; mod stream; -mod state; -mod actions; +mod utils; /// Underlying representation of a Discord event stream /// that streams the Event payloads to the shard structure pub struct Connection { /// The channel given by tokio_tungstenite that represents the websocket connection connection: Option>>, - /// The state of the connection - state: state::ConnectionState, } impl Connection { pub fn new() -> Self { - Connection { - connection: None, - state: state::ConnectionState::default() + Connection { connection: None } + } + + pub async fn start(&mut self) -> Result<(), GatewayError> { + let request = Request::builder() + .uri(get_gateway_url(false, "json", 9)) + .body(()) + .unwrap(); + + let connection_result = connect_async(request).await; + // we connect outselves to the websocket server + if let Err(err) = connection_result { + Err(GatewayError::from(err)) + } else { + self.connection = Some(connection_result.unwrap().0); + Ok(()) } } -} \ No newline at end of file +} diff --git a/gateway/src/client/connection/state.rs b/gateway/src/client/connection/state.rs deleted file mode 100644 index 972d79e..0000000 --- a/gateway/src/client/connection/state.rs +++ /dev/null @@ -1,10 +0,0 @@ -/// This struct represents the state of a connection -#[derive(Debug, Clone)] -pub struct ConnectionState {} -impl Default for ConnectionState { - fn default() -> Self { - Self { } - } -} - -impl ConnectionState {} \ No newline at end of file diff --git a/gateway/src/client/connection/stream.rs b/gateway/src/client/connection/stream.rs index 9bfeace..6a6f5c9 100644 --- a/gateway/src/client/connection/stream.rs +++ b/gateway/src/client/connection/stream.rs @@ -1,36 +1,88 @@ +use super::Connection; +use crate::client::{error_utils::GatewayError}; +use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt}; +use log::info; +use serde::Serialize; use std::{ pin::Pin, task::{Context, Poll}, }; - -use super::Connection; -use crate::client::payloads::message::MessageBase; -use futures::{Sink, Stream}; +use tokio_tungstenite::tungstenite::Message; impl Stream for Connection { - type Item = MessageBase; + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // first, when a poll is called, we check if the connection is still open + if let Some(conn) = &mut self.connection { + // we need to wait poll the message using the tokio_tungstenite stream + let message = conn.poll_next_unpin(cx); - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Pending + match message { + Poll::Ready(packet) => { + // if data is available, we can continue + match packet { + Some(result) => match result { + Ok(message) => { + match Box::pin(self._handle_message(&message)).poll_unpin(cx) { + Poll::Ready(data) => match data { + Ok(d) => Poll::Ready(Some(Ok(d))), + Err(e) => Poll::Ready(Some(Err(e))), + }, + // unknown behaviour? + Poll::Pending => unimplemented!(), + } + } + Err(e) => Poll::Ready(Some(Err(GatewayError::from(e)))), + }, + // if no message is available, we return none, it's the end of the stream + None => { + info!("tokio_tungstenite stream finished successfully"); + Box::pin(conn.close(None)).poll_unpin(cx); + self.connection = None; + Poll::Ready(None) + } + } + } + // if the message is pending, we return the same result + Poll::Pending => Poll::Pending, + } + } else { + Poll::Pending + } } } -impl Sink for Connection { - type Error = (); +impl Sink> for Connection { + type Error = GatewayError; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - todo!() + #[allow(dead_code)] + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Some(_) = &self.connection { + // a connection exists, we can send data + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } - fn start_send(self: Pin<&mut Self>, item: MessageBase) -> Result<(), Self::Error> { - todo!() + #[allow(dead_code)] + fn start_send(mut self: Pin<&mut Self>, item: crate::client::payloads::gateway::FullMessage) -> Result<(), Self::Error> { + if let Some(conn) = &mut self.connection { + let message = serde_json::to_string(&item); + conn.start_send_unpin(Message::Text(message.unwrap())) + .unwrap(); + } + Ok(()) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - todo!() + #[allow(dead_code)] + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - todo!() + #[allow(dead_code)] + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } diff --git a/gateway/src/client/connection/utils.rs b/gateway/src/client/connection/utils.rs new file mode 100644 index 0000000..49ccbcc --- /dev/null +++ b/gateway/src/client/connection/utils.rs @@ -0,0 +1,29 @@ +use super::Connection; +use crate::client::{error_utils::GatewayError}; +use std::str::from_utf8; +use tokio_tungstenite::tungstenite::Message; + +impl Connection { + pub(crate) async fn _handle_message( + &mut self, + data: &Message, + ) -> Result { + match data { + Message::Text(text) => self._handle_discord_message(&text).await, + Message::Binary(message) => { + self._handle_discord_message(from_utf8(message).unwrap()) + .await + } + _ => Err(GatewayError::from("unknown error".to_string())), + } + } + + async fn _handle_discord_message( + &mut self, + raw_message: &str, + ) -> Result { + let a: Result = serde_json::from_str(raw_message); + let message = a.unwrap(); + Ok(message) + } +} diff --git a/gateway/src/client/error_utils.rs b/gateway/src/client/error_utils.rs new file mode 100644 index 0000000..603caab --- /dev/null +++ b/gateway/src/client/error_utils.rs @@ -0,0 +1,24 @@ +use common::error::NovaError; + +#[derive(Debug)] +pub struct GatewayError(NovaError); + +impl From for GatewayError { + fn from(e: tokio_tungstenite::tungstenite::Error) -> Self { + GatewayError { + 0: NovaError { + message: e.to_string(), + }, + } + } +} + +impl From for GatewayError { + fn from(e: String) -> Self { + GatewayError { + 0: NovaError { + message: e, + }, + } + } +} diff --git a/gateway/src/client/mod.rs b/gateway/src/client/mod.rs index 5e99ebc..51d8995 100644 --- a/gateway/src/client/mod.rs +++ b/gateway/src/client/mod.rs @@ -1,3 +1,5 @@ pub mod connection; pub mod payloads; -pub mod shard; \ No newline at end of file +pub mod shard; +pub mod utils; +mod error_utils; \ No newline at end of file diff --git a/gateway/src/client/payloads/dispatch.rs b/gateway/src/client/payloads/dispatch.rs new file mode 100644 index 0000000..62893b1 --- /dev/null +++ b/gateway/src/client/payloads/dispatch.rs @@ -0,0 +1,20 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Ready { + #[serde(rename = "v")] + version: u64, + user: Value, + guilds: Vec, + session_id: String, + shard: Option<[i64;2]>, + application: Value, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(tag = "t", content = "d")] +pub enum Dispatch { + #[serde(rename = "READY")] + Ready(Ready), +} \ No newline at end of file diff --git a/gateway/src/client/payloads/gateway.rs b/gateway/src/client/payloads/gateway.rs new file mode 100644 index 0000000..788a05b --- /dev/null +++ b/gateway/src/client/payloads/gateway.rs @@ -0,0 +1,86 @@ +use super::dispatch::Dispatch; +use super::payloads::hello::Hello; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use serde_repr::{Deserialize_repr, Serialize_repr}; + +macro_rules! num_to_enum { + ($num:expr => $enm:ident<$tpe:ty>{ $($fld:ident),+ }; $err:expr) => ({ + match $num { + $(_ if $num == $enm::$fld as $tpe => { $enm::$fld })+ + _ => $err + } + }); +} + +#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)] +#[repr(u8)] +pub enum OpCodes { + Dispatch = 0, + Heartbeat = 1, + Identify = 2, + PresenceUpdate = 3, + VoiceStateUpdate = 4, + Resume = 6, + Reconnect = 7, + RequestGuildMembers = 8, + InvalidSession = 9, + Hello = 10, + HeartbeatACK = 11, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[serde(bound(deserialize = "T: Deserialize<'de>"))] +pub struct FullMessage { + #[serde(rename = "d")] + pub dispatch_type: Option, + #[serde(rename = "s")] + pub sequence: Option, + pub op: OpCodes, + #[serde(rename = "d")] + pub data: T, +} + +pub enum Message { + Dispatch(FullMessage), + Reconnect(FullMessage<()>), + InvalidSession(FullMessage), + Hello(FullMessage), + HeartbeatACK(FullMessage<()>), +} + +impl<'de> serde::Deserialize<'de> for Message { + fn deserialize>(d: D) -> Result { + let value = Value::deserialize(d)?; + let val = value.get("op").and_then(Value::as_u64).unwrap(); + let op_code = num_to_enum!( + val => OpCodes{ + Dispatch, + Heartbeat, + Identify, + PresenceUpdate, + VoiceStateUpdate, + Resume, + Reconnect, + RequestGuildMembers, + InvalidSession, + Hello, + HeartbeatACK + }; + panic!("Cannot convert number to `MyEnum`") + ); + + match op_code { + OpCodes::Dispatch => Ok(Message::Dispatch(FullMessage::deserialize(value).unwrap())), + OpCodes::Reconnect => Ok(Message::Reconnect(FullMessage::deserialize(value).unwrap())), + OpCodes::InvalidSession => Ok(Message::InvalidSession( + FullMessage::deserialize(value).unwrap(), + )), + OpCodes::Hello => Ok(Message::Hello(FullMessage::deserialize(value).unwrap())), + OpCodes::HeartbeatACK => Ok(Message::HeartbeatACK( + FullMessage::deserialize(value).unwrap(), + )), + _ => panic!("Cannot convert"), + } + } +} diff --git a/gateway/src/client/payloads/message.rs b/gateway/src/client/payloads/message.rs deleted file mode 100644 index 966136f..0000000 --- a/gateway/src/client/payloads/message.rs +++ /dev/null @@ -1,36 +0,0 @@ -use serde_json::Value; -use serde_repr::{Serialize_repr, Deserialize_repr}; -use serde::{Deserialize, Serialize}; - - -#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)] -#[repr(u8)] -pub enum OpCodes { - Dispatch = 0, - Heartbeat = 1, - Identify = 2, - PresenceUpdate = 3, - VoiceStateUpdate = 4, - Resume = 6, - Reconnect = 7, - RequestGuildMembers = 8, - InvalidSession = 9, - Hello = 10, - HeartbeatACK = 11, -} - -#[derive(Serialize, Deserialize)] -pub enum Dispatch { - #[serde(rename = "READY")] - Ready, - #[serde(rename = "RESUMED")] - Resumed, -} - -#[derive(Serialize, Deserialize, PartialEq, Debug)] -pub struct MessageBase { - pub t: Option, - pub s: Option, - pub op: OpCodes, - pub d: Value -} diff --git a/gateway/src/client/payloads/mod.rs b/gateway/src/client/payloads/mod.rs index d0a5e38..e43a323 100644 --- a/gateway/src/client/payloads/mod.rs +++ b/gateway/src/client/payloads/mod.rs @@ -1,2 +1,3 @@ pub mod payloads; -pub mod message; \ No newline at end of file +pub mod dispatch; +pub mod gateway; \ No newline at end of file diff --git a/gateway/src/client/payloads/payloads/hello.rs b/gateway/src/client/payloads/payloads/hello.rs new file mode 100644 index 0000000..0690a61 --- /dev/null +++ b/gateway/src/client/payloads/payloads/hello.rs @@ -0,0 +1,7 @@ +use serde::{Serialize, Deserialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Hello { + #[serde(rename = "heartbeat_interval")] + pub heartbeat_interval: u64 +} diff --git a/gateway/src/client/payloads/payloads.rs b/gateway/src/client/payloads/payloads/identify.rs similarity index 63% rename from gateway/src/client/payloads/payloads.rs rename to gateway/src/client/payloads/payloads/identify.rs index 0fec3dc..83f038a 100644 --- a/gateway/src/client/payloads/payloads.rs +++ b/gateway/src/client/payloads/payloads/identify.rs @@ -1,13 +1,4 @@ -use serde::{Serialize, Deserialize}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Hello { - #[serde(rename = "heartbeat_interval")] - pub heartbeat_interval: u64 -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct HeartbeatACK {} +use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] pub struct IdentifyProprerties { diff --git a/gateway/src/client/payloads/payloads/mod.rs b/gateway/src/client/payloads/payloads/mod.rs new file mode 100644 index 0000000..aa5a6de --- /dev/null +++ b/gateway/src/client/payloads/payloads/mod.rs @@ -0,0 +1,2 @@ +pub mod hello; +pub mod identify; \ No newline at end of file diff --git a/gateway/src/client/shard/actions.rs b/gateway/src/client/shard/actions.rs index cb212bf..cb29ace 100644 --- a/gateway/src/client/shard/actions.rs +++ b/gateway/src/client/shard/actions.rs @@ -2,7 +2,7 @@ use futures::SinkExt; use log::error; use serde_json::Value; -use crate::client::payloads::message::{MessageBase, OpCodes}; +use crate::client::payloads::gateway::{FullMessage, OpCodes}; use super::Shard; @@ -13,14 +13,15 @@ impl Shard { pub async fn presence_update(&mut self) -> Result<(), ()> { if let Some(connection) = &mut self.connection { connection - .send(MessageBase { - t: None, - s: None, + .send(FullMessage { + dispatch_type: None, + sequence: None, op: OpCodes::PresenceUpdate, // todo: proper payload for this - d: Value::Null, + data: Value::Null, }) - .await?; + .await + .unwrap(); } else { error!("the connection is not open") } @@ -31,14 +32,15 @@ impl Shard { pub async fn voice_state_update(&mut self) -> Result<(), ()> { if let Some(connection) = &mut self.connection { connection - .send(MessageBase { - t: None, - s: None, + .send(FullMessage { + dispatch_type: None, + sequence: None, op: OpCodes::VoiceStateUpdate, // todo: proper payload for this - d: Value::Null, + data: Value::Null, }) - .await?; + .await + .unwrap(); } else { error!("the connection is not open") } @@ -49,14 +51,15 @@ impl Shard { pub async fn request_guild_members(&mut self) -> Result<(), ()> { if let Some(connection) = &mut self.connection { connection - .send(MessageBase { - t: None, - s: None, + .send(FullMessage { + dispatch_type: None, + sequence: None, op: OpCodes::RequestGuildMembers, // todo: proper payload for this - d: Value::Null, + data: Value::Null, }) - .await?; + .await + .unwrap(); } else { error!("the connection is not open") } diff --git a/gateway/src/client/shard/connection.rs b/gateway/src/client/shard/connection.rs index ca0ef07..3395ff2 100644 --- a/gateway/src/client/shard/connection.rs +++ b/gateway/src/client/shard/connection.rs @@ -3,13 +3,15 @@ use crate::client::connection::Connection; use log::info; impl Shard { - async fn start(self: &mut Self) { + pub async fn start(self: &mut Self) { let mut should_exit = false; while !should_exit { info!("Starting connection for shard"); // create the new connection - self.connection = Some(Connection::new()); + let mut connection = Connection::new(); + connection.start().await.unwrap(); + self.connection = Some(connection); should_exit = true; } } diff --git a/gateway/src/client/shard/mod.rs b/gateway/src/client/shard/mod.rs index 1962d07..aec93d6 100644 --- a/gateway/src/client/shard/mod.rs +++ b/gateway/src/client/shard/mod.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use self::state::SessionState; use super::connection::Connection; diff --git a/gateway/src/client/shard/state.rs b/gateway/src/client/shard/state.rs index 6c10871..4d40911 100644 --- a/gateway/src/client/shard/state.rs +++ b/gateway/src/client/shard/state.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + /// This struct represents the state of a session pub struct SessionState { pub sequence: u64, @@ -12,3 +14,23 @@ impl Default for SessionState { } } } + +/// This struct represents the state of a connection +#[derive(Debug, Clone)] +pub struct ConnectionState { + pub sequence: u64, + pub last_heartbeat_acknowledged: bool, + pub last_heartbeat_time: Instant, + +} +impl Default for ConnectionState { + fn default() -> Self { + Self { + sequence: 0, + last_heartbeat_acknowledged: true, + last_heartbeat_time: Instant::now(), + } + } +} + +impl ConnectionState {} \ No newline at end of file diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 9733487..003a903 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -1,8 +1,41 @@ mod client; +use client::connection::Connection; use common::config::Settings; +use futures::StreamExt; +use log::info; +use serde_json::Value; + +use crate::client::payloads::{dispatch::Dispatch, gateway::{FullMessage, Message, OpCodes}, payloads::identify::{Identify, IdentifyProprerties}}; #[tokio::main] -async fn main() { - let settings: Settings<()> = Settings::new("gateway").unwrap(); +async fn main() { + let settings: Settings = Settings::new("gateway").unwrap(); + + let mut conn = Connection::new(); + conn.start().await.unwrap(); + + loop { + if let Some(val) = conn.next().await { + let data = val.as_ref().unwrap(); + match data { + Message::Dispatch(dispatch) => { + match &dispatch.data { + Dispatch::Ready(_ready) => { + + }, + } + }, + Message::Reconnect(_) => todo!(), + Message::InvalidSession(_) => todo!(), + Message::Hello(_hello) => { + info!("Server said hello! {:?}", _hello); + }, + Message::HeartbeatACK(_) => todo!(), + } + + } else { + break; + } + } } diff --git a/webhook/src/handler/handler.rs b/webhook/src/handler/handler.rs index 923c650..b993aaa 100644 --- a/webhook/src/handler/handler.rs +++ b/webhook/src/handler/handler.rs @@ -88,7 +88,10 @@ impl Service> for HandlerService { }, _ => { let payload = serde_json::to_string(&common::payloads::CachePayload { - tracing: (), + tracing: common::payloads::Tracing { + node_id: "".to_string(), + span: None, + }, data: value, }).unwrap(); -- 2.39.5