summaryrefslogtreecommitdiff
path: root/gateway/src
diff options
context:
space:
mode:
Diffstat (limited to 'gateway/src')
-rw-r--r--gateway/src/connection/stream.rs2
-rw-r--r--gateway/src/connection/utils.rs2
-rw-r--r--gateway/src/error.rs4
-rw-r--r--gateway/src/management/mod.rs0
-rw-r--r--gateway/src/payloads/dispatch.rs2
-rw-r--r--gateway/src/payloads/events/resume.rs0
-rw-r--r--gateway/src/payloads/gateway.rs64
-rw-r--r--gateway/src/shard/actions.rs2
-rw-r--r--gateway/src/shard/connection.rs10
9 files changed, 40 insertions, 46 deletions
diff --git a/gateway/src/connection/stream.rs b/gateway/src/connection/stream.rs
index 767feec..5a12daf 100644
--- a/gateway/src/connection/stream.rs
+++ b/gateway/src/connection/stream.rs
@@ -2,7 +2,7 @@ use crate::{error::GatewayError, payloads::gateway::BaseMessage};
use super::Connection;
use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
-use log::info;
+use common::log::info;
use serde::Serialize;
use std::{
pin::Pin,
diff --git a/gateway/src/connection/utils.rs b/gateway/src/connection/utils.rs
index fb07229..bb425da 100644
--- a/gateway/src/connection/utils.rs
+++ b/gateway/src/connection/utils.rs
@@ -1,6 +1,6 @@
use std::str::from_utf8;
use tokio_tungstenite::tungstenite::Message;
-use log::info;
+use common::log::info;
use crate::error::GatewayError;
diff --git a/gateway/src/error.rs b/gateway/src/error.rs
index 603caab..eb3a245 100644
--- a/gateway/src/error.rs
+++ b/gateway/src/error.rs
@@ -16,9 +16,7 @@ impl From<tokio_tungstenite::tungstenite::Error> for GatewayError {
impl From<String> for GatewayError {
fn from(e: String) -> Self {
GatewayError {
- 0: NovaError {
- message: e,
- },
+ 0: NovaError { message: e },
}
}
}
diff --git a/gateway/src/management/mod.rs b/gateway/src/management/mod.rs
deleted file mode 100644
index e69de29..0000000
--- a/gateway/src/management/mod.rs
+++ /dev/null
diff --git a/gateway/src/payloads/dispatch.rs b/gateway/src/payloads/dispatch.rs
index be5f237..9eca9c5 100644
--- a/gateway/src/payloads/dispatch.rs
+++ b/gateway/src/payloads/dispatch.rs
@@ -1,5 +1,3 @@
-use futures::io::Read;
-use log::info;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
diff --git a/gateway/src/payloads/events/resume.rs b/gateway/src/payloads/events/resume.rs
deleted file mode 100644
index e69de29..0000000
--- a/gateway/src/payloads/events/resume.rs
+++ /dev/null
diff --git a/gateway/src/payloads/gateway.rs b/gateway/src/payloads/gateway.rs
index 6ec2285..4f24890 100644
--- a/gateway/src/payloads/gateway.rs
+++ b/gateway/src/payloads/gateway.rs
@@ -1,7 +1,10 @@
-use super::{dispatch::Dispatch, opcodes::{OpCodes, hello::Hello}};
+use super::{
+ dispatch::Dispatch,
+ opcodes::{hello::Hello, OpCodes},
+};
+use serde::de::Error;
use serde::{Deserialize, Serialize};
use serde_json::Value;
-use serde::de::Error;
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
#[serde(bound(deserialize = "T: Deserialize<'de> + std::fmt::Debug"))]
@@ -24,7 +27,10 @@ pub enum Message {
}
impl<'de> serde::Deserialize<'de> for Message {
- fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> where D::Error : Error {
+ fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error>
+ where
+ D::Error: Error,
+ {
let value = Value::deserialize(d)?;
let val = value.get("op").and_then(Value::as_u64).unwrap();
@@ -36,42 +42,32 @@ impl<'de> serde::Deserialize<'de> for Message {
let sequence = value.get("s").unwrap().as_u64();
// we need to find a better solution than clone
- match serde_json::from_value(value) {
- Ok(data) => {
- Ok(Message::Dispatch(BaseMessage {
- op,
- t,
- sequence,
- data
- }))
- },
- Err(e) => Err(Error::custom(e)),
- }
- },
-
- OpCodes::Reconnect => {
- match BaseMessage::deserialize(value) {
- Ok(data) => Ok(Message::Reconnect(data)),
+ match Dispatch::deserialize(value) {
+ Ok(data) => Ok(Message::Dispatch(BaseMessage {
+ op,
+ t,
+ sequence,
+ data,
+ })),
Err(e) => Err(Error::custom(e)),
}
+ }
+
+ OpCodes::Reconnect => match BaseMessage::deserialize(value) {
+ Ok(data) => Ok(Message::Reconnect(data)),
+ Err(e) => Err(Error::custom(e)),
},
- OpCodes::InvalidSession => {
- match BaseMessage::deserialize(value) {
- Ok(data) => Ok(Message::InvalidSession(data)),
- Err(e) => Err(Error::custom(e)),
- }
+ OpCodes::InvalidSession => match BaseMessage::deserialize(value) {
+ Ok(data) => Ok(Message::InvalidSession(data)),
+ Err(e) => Err(Error::custom(e)),
},
- OpCodes::Hello => {
- match BaseMessage::deserialize(value) {
- Ok(data) => Ok(Message::Hello(data)),
- Err(e) => Err(Error::custom(e)),
- }
+ OpCodes::Hello => match BaseMessage::deserialize(value) {
+ Ok(data) => Ok(Message::Hello(data)),
+ Err(e) => Err(Error::custom(e)),
},
- OpCodes::HeartbeatACK => {
- match BaseMessage::deserialize(value) {
- Ok(data) => Ok(Message::HeartbeatACK(data)),
- Err(e) => Err(Error::custom(e)),
- }
+ OpCodes::HeartbeatACK => match BaseMessage::deserialize(value) {
+ Ok(data) => Ok(Message::HeartbeatACK(data)),
+ Err(e) => Err(Error::custom(e)),
},
_ => panic!("Cannot convert"),
}
diff --git a/gateway/src/shard/actions.rs b/gateway/src/shard/actions.rs
index 86e5f98..b6ef038 100644
--- a/gateway/src/shard/actions.rs
+++ b/gateway/src/shard/actions.rs
@@ -1,7 +1,7 @@
use std::env;
use futures::SinkExt;
-use log::{debug, error, info};
+use common::log::{debug, error, info};
use serde::Serialize;
use serde_json::Value;
use std::fmt::Debug;
diff --git a/gateway/src/shard/connection.rs b/gateway/src/shard/connection.rs
index a0fa98a..8f8ddc6 100644
--- a/gateway/src/shard/connection.rs
+++ b/gateway/src/shard/connection.rs
@@ -7,8 +7,8 @@ use crate::{connection::Connection, error::GatewayError, payloads::{
use super::{state::ConnectionState, ConnectionWithState, Shard};
use futures::StreamExt;
-use log::{error, info};
-use tokio::{select, time::{Instant, sleep}};
+use common::log::{error, info};
+use tokio::{select, time::{Instant, interval_at, sleep}};
impl Shard {
pub async fn start(self: &mut Self) {
@@ -159,7 +159,7 @@ impl Shard {
info!("Server hello received");
self._util_set_seq(msg.sequence);
if let Some(conn) = &mut self.connection {
- conn.state.interval = Some(tokio::time::interval_at(
+ conn.state.interval = Some(interval_at(
Instant::now() + Duration::from_millis(msg.data.heartbeat_interval),
Duration::from_millis(msg.data.heartbeat_interval),
));
@@ -185,7 +185,9 @@ impl Shard {
session_id: ready.session_id.clone(),
});
}
- Dispatch::Other(data) => { }
+ Dispatch::Other(_data) => {
+ // todo: build dispatch & forward to nats
+ }
}
}
}