From: Matthieu Date: Sun, 19 Sep 2021 15:20:44 +0000 (+0400) Subject: fix dispatch deserialization X-Git-Tag: v0.1~64^2^2~9 X-Git-Url: https://git.puffer.fish/?a=commitdiff_plain;h=b7c921906fd8bee3ab19f31e8a93129122b3f522;p=matthieu%2Fnova.git fix dispatch deserialization --- diff --git a/gateway/src/payloads/dispatch.rs b/gateway/src/payloads/dispatch.rs index b2ddd89..be5f237 100644 --- a/gateway/src/payloads/dispatch.rs +++ b/gateway/src/payloads/dispatch.rs @@ -1,38 +1,48 @@ +use futures::io::Read; use log::info; -use serde::{Deserialize, Deserializer}; +use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; -use super::{events::ready::Ready, opcodes::OpCodes}; +use super::gateway::BaseMessage; -/// Represents an unknown event not handled by the gateway itself -#[derive(Clone, Debug, PartialEq, Deserialize)] -pub struct UnknownDispatch { - pub t: String, - pub d: Value, - pub s: i64, - pub op: OpCodes, +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct Ready { + #[serde(rename = "v")] + pub version: u64, + pub user: Value, + pub guilds: Vec, + pub session_id: String, + pub shard: Option<[i64;2]>, + pub application: Value, } #[derive(Clone, Debug, PartialEq, Deserialize)] #[serde(tag = "t", content = "d")] -#[serde(remote = "Dispatch")] -pub enum Dispatch { +pub enum FakeDispatch { #[serde(rename = "READY")] Ready(Ready), - #[serde(rename = "RESUMED")] - Resumed(()), + Other(Value), +} - #[serde(skip_deserializing)] - Other(UnknownDispatch), +#[derive(Clone, Debug, PartialEq)] +pub enum Dispatch { + Ready(Ready), + Other(BaseMessage) } impl<'de> Deserialize<'de> for Dispatch { - fn deserialize(deserializer: D) -> Result - where D: Deserializer<'de> + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, { - info!("hey"); - let s = UnknownDispatch::deserialize(deserializer)?; - Ok(Self::Other(s)) + // todo: error handling + let value = Value::deserialize(d)?; + + if value.get("t").unwrap() == "READY" { + Ok(Dispatch::Ready(Ready::deserialize(value.get("d").unwrap()).unwrap())) + } else { + Ok(Dispatch::Other(BaseMessage::deserialize(value).unwrap())) + } } -} +} \ No newline at end of file diff --git a/gateway/src/payloads/gateway.rs b/gateway/src/payloads/gateway.rs index e8dff96..6ec2285 100644 --- a/gateway/src/payloads/gateway.rs +++ b/gateway/src/payloads/gateway.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use serde::de::Error; -#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] #[serde(bound(deserialize = "T: Deserialize<'de> + std::fmt::Debug"))] pub struct BaseMessage { pub t: Option, @@ -31,22 +31,13 @@ impl<'de> serde::Deserialize<'de> for Message { if let Some(op) = num::FromPrimitive::from_u64(val) { match op { OpCodes::Dispatch => { - match Dispatch::deserialize(&value) { - Ok(data) => { - - let mut t = None; - if let Some(t_value) = &value.get("t") { - // this is safe since we know this is a string - t = Some(t_value.to_string()); - } - let mut sequence = None; - - if let Some(sequence_value) = value.get("s") { - if let Some(sequence_uint) = sequence_value.as_u64() { - sequence = Some(sequence_uint); - } - } + // todo: remove unwrap + let t = Some(value.get("t").unwrap().to_string()); + let sequence = value.get("s").unwrap().as_u64(); + // we need to find a better solution than clone + match serde_json::from_value(value) { + Ok(data) => { Ok(Message::Dispatch(BaseMessage { op, t, @@ -85,7 +76,7 @@ impl<'de> serde::Deserialize<'de> for Message { _ => panic!("Cannot convert"), } } else { - todo!(); + Err(Error::custom("unknown opcode")) } } } diff --git a/gateway/src/shard/actions.rs b/gateway/src/shard/actions.rs index 513d1a8..a58d51e 100644 --- a/gateway/src/shard/actions.rs +++ b/gateway/src/shard/actions.rs @@ -1,7 +1,7 @@ use std::env; use futures::SinkExt; -use log::{debug, error}; +use log::{debug, error, info}; use serde::Serialize; use serde_json::Value; use std::fmt::Debug; @@ -15,7 +15,7 @@ impl Shard { /// sends a message through the websocket pub async fn _send(&mut self, message: BaseMessage) -> Result<(), GatewayError> { - debug!("senging message {:?}", message); + debug!("Senging message {:?}", message); if let Some(connection) = &mut self.connection { if let Err(e) = connection.conn.send(message).await { error!("failed to send message {:?}", e); @@ -29,7 +29,8 @@ impl Shard { } pub async fn _identify(&mut self) -> Result<(), GatewayError> { - if let Some(state) = self.state.clone() { + if let Some(state) = self.state.clone() { + info!("Using session"); self._send(BaseMessage{ t: None, sequence: None, @@ -41,6 +42,7 @@ impl Shard { }, }).await } else { + info!("Sending login"); self._send(BaseMessage{ t: None, sequence: None, diff --git a/gateway/src/shard/connection.rs b/gateway/src/shard/connection.rs index 6f8503c..fabcf9e 100644 --- a/gateway/src/shard/connection.rs +++ b/gateway/src/shard/connection.rs @@ -33,10 +33,7 @@ impl Shard { if reconnects < self.config.max_reconnects { let time = min( self.config.reconnect_delay_maximum, - max( - ((reconnects as f32) * self.config.reconnect_delay_growth_factor) as usize, - self.config.reconnect_delay_minimum, - ), + self.config.reconnect_delay_minimum * (((reconnects - 1) as f32) * self.config.reconnect_delay_growth_factor) as usize, ); info!( "The shard got disconnected, waiting for reconnect ({}ms)", @@ -99,7 +96,7 @@ impl Shard { // we need to reconnect to the gateway Message::Reconnect(msg) => { self._util_set_seq(msg.sequence); - info!("gateway disconnect requested"); + info!("Gateway disconnect requested"); self._disconnect().await; } Message::InvalidSession(msg) => { @@ -107,21 +104,21 @@ impl Shard { info!("invalid session"); let data = msg.data; if !data { - info!("session removed"); + info!("Session removed"); // reset the session data self.state = None; if let Err(e) = self._identify().await { - error!("error while sending identify: {:?}", e); + error!("Error while sending identify: {:?}", e); } } } Message::HeartbeatACK(msg) => { self._util_set_seq(msg.sequence); - info!("heartbeat ack received"); + info!("Heartbeat ack received"); } Message::Hello(msg) => { self._util_set_seq(msg.sequence); - info!("server hello received"); + info!("Server hello received"); if let Err(e) = self._identify().await { error!("error while sending identify: {:?}", e); } @@ -132,7 +129,7 @@ impl Shard { async fn _dispatch(&mut self, dispatch: &BaseMessage) { match &dispatch.data { Dispatch::Ready(ready) => { - info!("received gateway dispatch hello"); + info!("Received gateway dispatch ready"); info!( "Logged in as {}", ready.user.get("username").unwrap().to_string() @@ -142,10 +139,9 @@ impl Shard { session_id: ready.session_id.clone(), }); } - Dispatch::Resumed(_) => { - info!("session resumed"); + Dispatch::Other(data) => { + } - Dispatch::Other(data) => {} } } } diff --git a/gateway/src/shard/state.rs b/gateway/src/shard/state.rs index caed268..1fe7c1a 100644 --- a/gateway/src/shard/state.rs +++ b/gateway/src/shard/state.rs @@ -19,7 +19,6 @@ impl Default for SessionState { /// This struct represents the state of a connection #[derive(Debug, Clone)] pub struct ConnectionState { - pub sequence: u64, pub last_heartbeat_acknowledged: bool, pub last_heartbeat_time: Instant, @@ -27,7 +26,6 @@ pub struct ConnectionState { impl Default for ConnectionState { fn default() -> Self { Self { - sequence: 0, last_heartbeat_acknowledged: true, last_heartbeat_time: Instant::now(), }