bazel-*\r
.vscode\r
ratelimiter/target\r
-target/
\ No newline at end of file
+target/\r
+**/local*
\ No newline at end of file
"futures-core",
"futures-util",
"log",
+ "num",
+ "num-derive",
+ "num-traits 0.2.14",
"pretty_env_logger",
"serde 1.0.130",
"serde_json",
"rand 0.8.4",
]
+[[package]]
+name = "num"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606"
+dependencies = [
+ "num-bigint",
+ "num-complex",
+ "num-integer",
+ "num-iter",
+ "num-rational",
+ "num-traits 0.2.14",
+]
+
+[[package]]
+name = "num-bigint"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "74e768dff5fb39a41b3bcd30bb25cf989706c90d028d1ad71971987aa309d535"
+dependencies = [
+ "autocfg",
+ "num-integer",
+ "num-traits 0.2.14",
+]
+
+[[package]]
+name = "num-complex"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26873667bbbb7c5182d4a37c1add32cdf09f841af72da53318fdb81543c15085"
+dependencies = [
+ "num-traits 0.2.14",
+]
+
+[[package]]
+name = "num-derive"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "num-integer"
version = "0.1.44"
"num-traits 0.2.14",
]
+[[package]]
+name = "num-iter"
+version = "0.1.42"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59"
+dependencies = [
+ "autocfg",
+ "num-integer",
+ "num-traits 0.2.14",
+]
+
+[[package]]
+name = "num-rational"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a"
+dependencies = [
+ "autocfg",
+ "num-bigint",
+ "num-integer",
+ "num-traits 0.2.14",
+]
+
[[package]]
name = "num-traits"
version = "0.1.43"
load("@io_bazel_rules_docker//toolchains/docker:toolchain.bzl", "toolchain_configure")
load("@io_bazel_rules_docker//repositories:repositories.bzl", "repositories")
load("@io_bazel_rules_docker//repositories:deps.bzl", "deps")
-load("@io_bazel_rules_docker//container:container.bzl", "container_pull")
+load("@io_bazel_rules_docker//container:container.bzl", "container_pull", "container_image")
load("@io_bazel_rules_docker//docker/package_managers:download_pkgs.bzl", "download_pkgs")
load("@io_bazel_rules_docker//docker/package_managers:install_pkgs.bzl", "install_pkgs")
-load("@io_bazel_rules_docker//container:container.bzl", "container_image")
load(
"@io_bazel_rules_docker//go:image.bzl",
use std::fmt;
+#[derive(Debug)]
pub struct NovaError {
pub message: String,
}
write!(f, "An error occured wihind the nova system: {}", self.message) // user-facing output
}
}
-
-impl fmt::Debug for NovaError {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "{{ file: {}, line: {} }}", file!(), line!()) // programmer-facing output
- }
-}
-
common = { path = "../common/rust" }
tokio-scoped = "0.1.0"
futures = "0.3.17"
-async-trait = "0.1.51"
\ No newline at end of file
+async-trait = "0.1.51"
+num-traits = "0.2"
+num-derive = "0.3"
+num = "0.4"
\ No newline at end of file
host = "localhost"
[gateway]
+max_reconnects = 5
+reconnect_delay_growth_factor = 1.25
+reconnect_delay_minimum = 5000
+reconnect_delay_maximum = 60000
+intents = 32767
\ No newline at end of file
+++ /dev/null
-use super::{error_utils::GatewayError, utils::get_gateway_url};
-use tokio::net::TcpStream;
-use tokio_tungstenite::{
- connect_async, tungstenite::handshake::client::Request, MaybeTlsStream, WebSocketStream,
-};
-mod stream;
-mod utils;
-
-/// Underlying representation of a Discord event stream
-/// that streams the Event payloads to the shard structure
-pub struct Connection {
- /// The channel given by tokio_tungstenite that represents the websocket connection
- connection: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
-}
-
-impl Connection {
- pub fn new() -> Self {
- Connection { connection: None }
- }
-
- pub async fn start(&mut self) -> Result<(), GatewayError> {
- let request = Request::builder()
- .uri(get_gateway_url(false, "json", 9))
- .body(())
- .unwrap();
-
- let connection_result = connect_async(request).await;
- // we connect outselves to the websocket server
- if let Err(err) = connection_result {
- Err(GatewayError::from(err))
- } else {
- self.connection = Some(connection_result.unwrap().0);
- Ok(())
- }
- }
-}
+++ /dev/null
-use super::Connection;
-use crate::client::{error_utils::GatewayError};
-use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
-use log::info;
-use serde::Serialize;
-use std::{
- pin::Pin,
- task::{Context, Poll},
-};
-use tokio_tungstenite::tungstenite::Message;
-
-impl Stream for Connection {
- type Item = Result<crate::client::payloads::gateway::Message, GatewayError>;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- // first, when a poll is called, we check if the connection is still open
- if let Some(conn) = &mut self.connection {
- // we need to wait poll the message using the tokio_tungstenite stream
- let message = conn.poll_next_unpin(cx);
-
- match message {
- Poll::Ready(packet) => {
- // if data is available, we can continue
- match packet {
- Some(result) => match result {
- Ok(message) => {
- match Box::pin(self._handle_message(&message)).poll_unpin(cx) {
- Poll::Ready(data) => match data {
- Ok(d) => Poll::Ready(Some(Ok(d))),
- Err(e) => Poll::Ready(Some(Err(e))),
- },
- // unknown behaviour?
- Poll::Pending => unimplemented!(),
- }
- }
- Err(e) => Poll::Ready(Some(Err(GatewayError::from(e)))),
- },
- // if no message is available, we return none, it's the end of the stream
- None => {
- info!("tokio_tungstenite stream finished successfully");
- Box::pin(conn.close(None)).poll_unpin(cx);
- self.connection = None;
- Poll::Ready(None)
- }
- }
- }
- // if the message is pending, we return the same result
- Poll::Pending => Poll::Pending,
- }
- } else {
- Poll::Pending
- }
- }
-}
-
-impl<T: Serialize> Sink<crate::client::payloads::gateway::FullMessage<T>> for Connection {
- type Error = GatewayError;
-
- #[allow(dead_code)]
- fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- if let Some(_) = &self.connection {
- // a connection exists, we can send data
- Poll::Ready(Ok(()))
- } else {
- Poll::Pending
- }
- }
-
- #[allow(dead_code)]
- fn start_send(mut self: Pin<&mut Self>, item: crate::client::payloads::gateway::FullMessage<T>) -> Result<(), Self::Error> {
- if let Some(conn) = &mut self.connection {
- let message = serde_json::to_string(&item);
- conn.start_send_unpin(Message::Text(message.unwrap()))
- .unwrap();
- }
- Ok(())
- }
-
- #[allow(dead_code)]
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- #[allow(dead_code)]
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-}
+++ /dev/null
-use super::Connection;
-use crate::client::{error_utils::GatewayError};
-use std::str::from_utf8;
-use tokio_tungstenite::tungstenite::Message;
-
-impl Connection {
- pub(crate) async fn _handle_message(
- &mut self,
- data: &Message,
- ) -> Result<crate::client::payloads::gateway::Message, GatewayError> {
- match data {
- Message::Text(text) => self._handle_discord_message(&text).await,
- Message::Binary(message) => {
- self._handle_discord_message(from_utf8(message).unwrap())
- .await
- }
- _ => Err(GatewayError::from("unknown error".to_string())),
- }
- }
-
- async fn _handle_discord_message(
- &mut self,
- raw_message: &str,
- ) -> Result<crate::client::payloads::gateway::Message, GatewayError> {
- let a: Result<crate::client::payloads::gateway::Message, serde_json::Error> = serde_json::from_str(raw_message);
- let message = a.unwrap();
- Ok(message)
- }
-}
+++ /dev/null
-use common::error::NovaError;
-
-#[derive(Debug)]
-pub struct GatewayError(NovaError);
-
-impl From<tokio_tungstenite::tungstenite::Error> for GatewayError {
- fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
- GatewayError {
- 0: NovaError {
- message: e.to_string(),
- },
- }
- }
-}
-
-impl From<String> for GatewayError {
- fn from(e: String) -> Self {
- GatewayError {
- 0: NovaError {
- message: e,
- },
- }
- }
-}
+++ /dev/null
-pub mod connection;
-pub mod payloads;
-pub mod shard;
-pub mod utils;
-mod error_utils;
\ No newline at end of file
+++ /dev/null
-use serde::{Deserialize, Serialize};
-use serde_json::Value;
-
-#[derive(Serialize, Deserialize, Clone, Debug)]
-pub struct Ready {
- #[serde(rename = "v")]
- version: u64,
- user: Value,
- guilds: Vec<Value>,
- session_id: String,
- shard: Option<[i64;2]>,
- application: Value,
-}
-
-#[derive(Serialize, Deserialize, Clone, Debug)]
-#[serde(tag = "t", content = "d")]
-pub enum Dispatch {
- #[serde(rename = "READY")]
- Ready(Ready),
-}
\ No newline at end of file
+++ /dev/null
-use super::dispatch::Dispatch;
-use super::payloads::hello::Hello;
-use serde::{Deserialize, Serialize};
-use serde_json::Value;
-use serde_repr::{Deserialize_repr, Serialize_repr};
-
-macro_rules! num_to_enum {
- ($num:expr => $enm:ident<$tpe:ty>{ $($fld:ident),+ }; $err:expr) => ({
- match $num {
- $(_ if $num == $enm::$fld as $tpe => { $enm::$fld })+
- _ => $err
- }
- });
-}
-
-#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)]
-#[repr(u8)]
-pub enum OpCodes {
- Dispatch = 0,
- Heartbeat = 1,
- Identify = 2,
- PresenceUpdate = 3,
- VoiceStateUpdate = 4,
- Resume = 6,
- Reconnect = 7,
- RequestGuildMembers = 8,
- InvalidSession = 9,
- Hello = 10,
- HeartbeatACK = 11,
-}
-
-#[derive(Serialize, Deserialize, PartialEq, Debug)]
-#[serde(bound(deserialize = "T: Deserialize<'de>"))]
-pub struct FullMessage<T> {
- #[serde(rename = "d")]
- pub dispatch_type: Option<String>,
- #[serde(rename = "s")]
- pub sequence: Option<OpCodes>,
- pub op: OpCodes,
- #[serde(rename = "d")]
- pub data: T,
-}
-
-pub enum Message {
- Dispatch(FullMessage<Dispatch>),
- Reconnect(FullMessage<()>),
- InvalidSession(FullMessage<bool>),
- Hello(FullMessage<Hello>),
- HeartbeatACK(FullMessage<()>),
-}
-
-impl<'de> serde::Deserialize<'de> for Message {
- fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
- let value = Value::deserialize(d)?;
- let val = value.get("op").and_then(Value::as_u64).unwrap();
- let op_code = num_to_enum!(
- val => OpCodes<u64>{
- Dispatch,
- Heartbeat,
- Identify,
- PresenceUpdate,
- VoiceStateUpdate,
- Resume,
- Reconnect,
- RequestGuildMembers,
- InvalidSession,
- Hello,
- HeartbeatACK
- };
- panic!("Cannot convert number to `MyEnum`")
- );
-
- match op_code {
- OpCodes::Dispatch => Ok(Message::Dispatch(FullMessage::deserialize(value).unwrap())),
- OpCodes::Reconnect => Ok(Message::Reconnect(FullMessage::deserialize(value).unwrap())),
- OpCodes::InvalidSession => Ok(Message::InvalidSession(
- FullMessage::deserialize(value).unwrap(),
- )),
- OpCodes::Hello => Ok(Message::Hello(FullMessage::deserialize(value).unwrap())),
- OpCodes::HeartbeatACK => Ok(Message::HeartbeatACK(
- FullMessage::deserialize(value).unwrap(),
- )),
- _ => panic!("Cannot convert"),
- }
- }
-}
+++ /dev/null
-pub mod payloads;
-pub mod dispatch;
-pub mod gateway;
\ No newline at end of file
+++ /dev/null
-use serde::{Serialize, Deserialize};
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct Hello {
- #[serde(rename = "heartbeat_interval")]
- pub heartbeat_interval: u64
-}
+++ /dev/null
-use serde::{Deserialize, Serialize};
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct IdentifyProprerties {
- #[serde(rename = "$os")]
- pub os: String,
- #[serde(rename = "$browser")]
- pub browser: String,
- #[serde(rename = "$device")]
- pub device: String,
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct Identify {
- pub token: String,
- pub intents: u16,
- pub properties: IdentifyProprerties,
- pub shard: Option<[i64; 2]>,
-}
\ No newline at end of file
+++ /dev/null
-pub mod hello;
-pub mod identify;
\ No newline at end of file
+++ /dev/null
-use enumflags2::{bitflags, BitFlags};
-
-#[bitflags]
-#[repr(u16)]
-#[derive(Clone, Copy, Debug)]
-pub enum Intents {
- Guilds = 1 << 0,
- GuildMembers = 1 << 1,
- GuildBans = 1 << 2,
- GuildEmojisAndStickers = 1 << 3,
- GuildIntegrations = 1 << 4,
- GuildWebhoks = 1 << 5,
- GuildInvites = 1 << 6,
- GuildVoiceStates = 1 << 7,
- GuildPresences = 1 << 8,
- GuildMessages = 1 << 9,
- GuildMessagesReactions = 1 << 10,
- GuildMessageTyping = 1 << 11,
- DirectMessages = 1 << 12,
- DirectMessagesReactions = 1 << 13,
- DirectMessageTyping = 1 << 14,
-}
-
-pub struct Sharding {
- pub total_shards: i64,
- pub current_shard: i64
-}
-
-/// Config for the client connection.
-pub struct ClientConfig {
- pub token: String,
- pub large_threshold: Option<u64>,
- pub shard: Option<Sharding>,
- pub intents: BitFlags<Intents>
-}
\ No newline at end of file
+++ /dev/null
-use futures::SinkExt;
-use log::error;
-use serde_json::Value;
-
-use crate::client::payloads::gateway::{FullMessage, OpCodes};
-
-use super::Shard;
-
-/// Implement the available actions for nova in the gateway.
-impl Shard {
- /// Updates the presence of the current shard.
- #[allow(dead_code)]
- pub async fn presence_update(&mut self) -> Result<(), ()> {
- if let Some(connection) = &mut self.connection {
- connection
- .send(FullMessage {
- dispatch_type: None,
- sequence: None,
- op: OpCodes::PresenceUpdate,
- // todo: proper payload for this
- data: Value::Null,
- })
- .await
- .unwrap();
- } else {
- error!("the connection is not open")
- }
- Ok(())
- }
- /// Updates the voice status of the current shard in a certain channel.
- #[allow(dead_code)]
- pub async fn voice_state_update(&mut self) -> Result<(), ()> {
- if let Some(connection) = &mut self.connection {
- connection
- .send(FullMessage {
- dispatch_type: None,
- sequence: None,
- op: OpCodes::VoiceStateUpdate,
- // todo: proper payload for this
- data: Value::Null,
- })
- .await
- .unwrap();
- } else {
- error!("the connection is not open")
- }
- Ok(())
- }
- /// Ask discord for more informations about offline guild members.
- #[allow(dead_code)]
- pub async fn request_guild_members(&mut self) -> Result<(), ()> {
- if let Some(connection) = &mut self.connection {
- connection
- .send(FullMessage {
- dispatch_type: None,
- sequence: None,
- op: OpCodes::RequestGuildMembers,
- // todo: proper payload for this
- data: Value::Null,
- })
- .await
- .unwrap();
- } else {
- error!("the connection is not open")
- }
- Ok(())
- }
-}
+++ /dev/null
-use super::Shard;
-use crate::client::connection::Connection;
-use log::info;
-
-impl Shard {
- pub async fn start(self: &mut Self) {
- let mut should_exit = false;
-
- while !should_exit {
- info!("Starting connection for shard");
- // create the new connection
- let mut connection = Connection::new();
- connection.start().await.unwrap();
- self.connection = Some(connection);
- should_exit = true;
- }
- }
-}
+++ /dev/null
-use self::state::SessionState;
-
-use super::connection::Connection;
-mod actions;
-mod connection;
-mod state;
-
-/// Represents a shard & all the reconnection logic related to it
-pub struct Shard {
- connection: Option<Connection>,
- state: SessionState,
-}
-
-impl Shard {
- /// Creates a new shard instance
- pub fn new() -> Self {
- Shard {
- connection: None,
- state: SessionState::default(),
- }
- }
-}
+++ /dev/null
-use std::time::Instant;
-
-/// This struct represents the state of a session
-pub struct SessionState {
- pub sequence: u64,
- pub session_id: String,
-}
-
-impl Default for SessionState {
- fn default() -> Self {
- Self {
- sequence: Default::default(),
- session_id: Default::default(),
- }
- }
-}
-
-/// This struct represents the state of a connection
-#[derive(Debug, Clone)]
-pub struct ConnectionState {
- pub sequence: u64,
- pub last_heartbeat_acknowledged: bool,
- pub last_heartbeat_time: Instant,
-
-}
-impl Default for ConnectionState {
- fn default() -> Self {
- Self {
- sequence: 0,
- last_heartbeat_acknowledged: true,
- last_heartbeat_time: Instant::now(),
- }
- }
-}
-
-impl ConnectionState {}
\ No newline at end of file
+++ /dev/null
-
-/// Formats a url of connection to the gateway
-pub fn get_gateway_url (compress: bool, encoding: &str, v: i16) -> String {
- return format!(
- "wss://gateway.discord.gg/?v={}&encoding={}&compress={}",
- v, encoding,
- if compress { "zlib-stream" } else { "" }
- );
-}
\ No newline at end of file
--- /dev/null
+use tokio::net::TcpStream;
+use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::http::Request};
+
+use crate::{error::GatewayError, utils::get_gateway_url};
+
+mod stream;
+mod utils;
+
+/// Underlying representation of a Discord event stream
+/// that streams the Event payloads to the shard structure
+pub struct Connection {
+ /// The channel given by tokio_tungstenite that represents the websocket connection
+ connection: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
+}
+
+impl Connection {
+ pub fn new() -> Self {
+ Connection { connection: None }
+ }
+
+ pub async fn start(&mut self) -> Result<(), GatewayError> {
+ let request = Request::builder()
+ .uri(get_gateway_url(false, "json", 9))
+ .body(())
+ .unwrap();
+
+ let connection_result = connect_async(request).await;
+ // we connect outselves to the websocket server
+ if let Err(err) = connection_result {
+ Err(GatewayError::from(err))
+ } else {
+ self.connection = Some(connection_result.unwrap().0);
+ Ok(())
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+use crate::{error::GatewayError, payloads::gateway::BaseMessage};
+
+use super::Connection;
+use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
+use log::info;
+use serde::Serialize;
+use std::{
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tokio_tungstenite::tungstenite::Message;
+
+/// Implementation of the Stream trait for the Connection
+impl Stream for Connection {
+ type Item = Result<crate::payloads::gateway::Message, GatewayError>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ // first, when a poll is called, we check if the connection is still open
+ if let Some(conn) = &mut self.connection {
+ // we need to wait poll the message using the tokio_tungstenite stream
+ let message = conn.poll_next_unpin(cx);
+
+ match message {
+ Poll::Ready(packet) => {
+ // if data is available, we can continue
+ match packet {
+ Some(result) => match result {
+ Ok(message) => {
+ match Box::pin(self._handle_message(&message)).poll_unpin(cx) {
+ Poll::Ready(data) => match data {
+ Ok(d) => Poll::Ready(Some(Ok(d))),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ },
+ // unknown behaviour?
+ Poll::Pending => unreachable!(),
+ }
+ }
+ Err(e) => Poll::Ready(Some(Err(GatewayError::from(e)))),
+ },
+ // if no message is available, we return none, it's the end of the stream
+ None => {
+ info!("tokio_tungstenite stream finished successfully");
+ let _ = Box::pin(conn.close(None)).poll_unpin(cx);
+ self.connection = None;
+ Poll::Ready(None)
+ }
+ }
+ }
+ // if the message is pending, we return the same result
+ Poll::Pending => Poll::Pending,
+ }
+ } else {
+ Poll::Pending
+ }
+ }
+}
+
+/// Implementation of the Sink trait for the Connection
+impl<T: Serialize> Sink<BaseMessage<T>> for Connection {
+ type Error = tokio_tungstenite::tungstenite::Error;
+
+ #[allow(dead_code)]
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if let Some(conn) = &mut self.connection {
+ // a connection exists, we can send data
+ conn.poll_ready_unpin(cx)
+ } else {
+ Poll::Pending
+ }
+ }
+
+ #[allow(dead_code)]
+ fn start_send(mut self: Pin<&mut Self>, item: BaseMessage<T>) -> Result<(), Self::Error> {
+ if let Some(conn) = &mut self.connection {
+ let message = serde_json::to_string(&item);
+ conn.start_send_unpin(Message::Text(message.unwrap()))
+ .unwrap();
+ }
+ Ok(())
+ }
+
+ #[allow(dead_code)]
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if let Some(conn) = &mut self.connection {
+ conn.poll_flush_unpin(cx)
+ } else {
+ Poll::Pending
+ }
+ }
+
+ #[allow(dead_code)]
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if let Some(conn) = &mut self.connection {
+ conn.poll_close_unpin(cx)
+ } else {
+ Poll::Pending
+ }
+ }
+}
--- /dev/null
+use std::str::from_utf8;
+use tokio_tungstenite::tungstenite::Message;
+use log::info;
+
+use crate::error::GatewayError;
+
+use super::Connection;
+
+impl Connection {
+
+ /// Handles the websocket events and calls the _handle_discord_message function for the deserialization.
+ pub(super) async fn _handle_message(
+ &mut self,
+ data: &Message,
+ ) -> Result<crate::payloads::gateway::Message, GatewayError> {
+ match data {
+ Message::Text(text) => self._handle_discord_message(&text).await,
+ Message::Binary(message) => {
+ match from_utf8(message) {
+ Ok(data) => self._handle_discord_message(data).await,
+ Err(err) => Err(GatewayError::from(err.to_string())),
+ }
+ },
+ Message::Close(close_frame) => {
+ info!("Discord connection closed {:?}", close_frame);
+ Err(GatewayError::from("connection closed".to_string()))
+ },
+ _ => Err(GatewayError::from(format!("unknown variant of message specified to the handler {}", data).to_string())),
+ }
+ }
+
+ /// Handle the decompression and deserialization process of a discord payload.
+ pub(super) async fn _handle_discord_message(
+ &mut self,
+ raw_message: &str,
+ ) -> Result<crate::payloads::gateway::Message, GatewayError> {
+ match serde_json::from_str(raw_message) {
+ Ok(message) => Ok(message),
+ Err(err) => Err(GatewayError::from(err.to_string())),
+ }
+ }
+}
--- /dev/null
+use common::error::NovaError;
+
+#[derive(Debug)]
+pub struct GatewayError(NovaError);
+
+impl From<tokio_tungstenite::tungstenite::Error> for GatewayError {
+ fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
+ GatewayError {
+ 0: NovaError {
+ message: e.to_string(),
+ },
+ }
+ }
+}
+
+impl From<String> for GatewayError {
+ fn from(e: String) -> Self {
+ GatewayError {
+ 0: NovaError {
+ message: e,
+ },
+ }
+ }
+}
-mod client;
-
-use client::connection::Connection;
use common::config::Settings;
-use futures::StreamExt;
-use log::info;
-use serde_json::Value;
-
-use crate::client::payloads::{dispatch::Dispatch, gateway::{FullMessage, Message, OpCodes}, payloads::identify::{Identify, IdentifyProprerties}};
+use shard::{Shard, ShardConfig};
+#[macro_use]
+extern crate num_derive;
-#[tokio::main]
-async fn main() {
- let settings: Settings<Value> = Settings::new("gateway").unwrap();
+pub mod connection;
+mod error;
+mod utils;
+mod shard;
+mod payloads;
- let mut conn = Connection::new();
- conn.start().await.unwrap();
- loop {
- if let Some(val) = conn.next().await {
- let data = val.as_ref().unwrap();
- match data {
- Message::Dispatch(dispatch) => {
- match &dispatch.data {
- Dispatch::Ready(_ready) => {
-
- },
- }
- },
- Message::Reconnect(_) => todo!(),
- Message::InvalidSession(_) => todo!(),
- Message::Hello(_hello) => {
- info!("Server said hello! {:?}", _hello);
- },
- Message::HeartbeatACK(_) => todo!(),
- }
- } else {
- break;
- }
- }
+#[tokio::main]
+async fn main() {
+ let settings: Settings<ShardConfig> = Settings::new("gateway").unwrap();
+ let mut shard = Shard::new(settings.config);
+ shard.start().await;
}
--- /dev/null
+use log::info;
+use serde::{Deserialize, Deserializer};
+
+use serde_json::Value;
+
+use super::{events::ready::Ready, opcodes::OpCodes};
+
+/// Represents an unknown event not handled by the gateway itself
+#[derive(Clone, Debug, PartialEq, Deserialize)]
+pub struct UnknownDispatch {
+ pub t: String,
+ pub d: Value,
+ pub s: i64,
+ pub op: OpCodes,
+}
+
+#[derive(Clone, Debug, PartialEq, Deserialize)]
+#[serde(tag = "t", content = "d")]
+#[serde(remote = "Dispatch")]
+pub enum Dispatch {
+ #[serde(rename = "READY")]
+ Ready(Ready),
+ #[serde(rename = "RESUMED")]
+ Resumed(()),
+
+ #[serde(skip_deserializing)]
+ Other(UnknownDispatch),
+}
+
+impl<'de> Deserialize<'de> for Dispatch {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where D: Deserializer<'de>
+ {
+ info!("hey");
+ let s = UnknownDispatch::deserialize(deserializer)?;
+ Ok(Self::Other(s))
+ }
+}
--- /dev/null
+pub mod ready;
\ No newline at end of file
--- /dev/null
+use serde::Deserialize;
+use serde_json::Value;
+
+#[derive(Deserialize, Clone, Debug, PartialEq)]
+pub struct Ready {
+ #[serde(rename = "v")]
+ pub version: u64,
+ pub user: Value,
+ pub guilds: Vec<Value>,
+ pub session_id: String,
+ pub shard: Option<[i64;2]>,
+ pub application: Value,
+}
--- /dev/null
+use super::{dispatch::Dispatch, opcodes::{OpCodes, hello::Hello}};
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+use serde::de::Error;
+
+#[derive(Serialize, Deserialize, PartialEq, Debug)]
+#[serde(bound(deserialize = "T: Deserialize<'de> + std::fmt::Debug"))]
+pub struct BaseMessage<T> {
+ pub t: Option<String>,
+ #[serde(rename = "s")]
+ pub sequence: Option<u64>,
+ pub op: OpCodes,
+ #[serde(rename = "d")]
+ pub data: T,
+}
+
+#[derive(Debug)]
+pub enum Message {
+ Dispatch(BaseMessage<Dispatch>),
+ Reconnect(BaseMessage<()>),
+ InvalidSession(BaseMessage<bool>),
+ Hello(BaseMessage<Hello>),
+ HeartbeatACK(BaseMessage<()>),
+}
+
+impl<'de> serde::Deserialize<'de> for Message {
+ fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> where D::Error : Error {
+ let value = Value::deserialize(d)?;
+ let val = value.get("op").and_then(Value::as_u64).unwrap();
+
+ if let Some(op) = num::FromPrimitive::from_u64(val) {
+ match op {
+ OpCodes::Dispatch => {
+ match Dispatch::deserialize(&value) {
+ Ok(data) => {
+
+ let mut t = None;
+ if let Some(t_value) = &value.get("t") {
+ // this is safe since we know this is a string
+ t = Some(t_value.to_string());
+ }
+ let mut sequence = None;
+
+ if let Some(sequence_value) = value.get("s") {
+ if let Some(sequence_uint) = sequence_value.as_u64() {
+ sequence = Some(sequence_uint);
+ }
+ }
+
+ Ok(Message::Dispatch(BaseMessage {
+ op,
+ t,
+ sequence,
+ data
+ }))
+ },
+ Err(e) => Err(Error::custom(e)),
+ }
+ },
+
+ OpCodes::Reconnect => {
+ match BaseMessage::deserialize(value) {
+ Ok(data) => Ok(Message::Reconnect(data)),
+ Err(e) => Err(Error::custom(e)),
+ }
+ },
+ OpCodes::InvalidSession => {
+ match BaseMessage::deserialize(value) {
+ Ok(data) => Ok(Message::InvalidSession(data)),
+ Err(e) => Err(Error::custom(e)),
+ }
+ },
+ OpCodes::Hello => {
+ match BaseMessage::deserialize(value) {
+ Ok(data) => Ok(Message::Hello(data)),
+ Err(e) => Err(Error::custom(e)),
+ }
+ },
+ OpCodes::HeartbeatACK => {
+ match BaseMessage::deserialize(value) {
+ Ok(data) => Ok(Message::HeartbeatACK(data)),
+ Err(e) => Err(Error::custom(e)),
+ }
+ },
+ _ => panic!("Cannot convert"),
+ }
+ } else {
+ todo!();
+ }
+ }
+}
--- /dev/null
+pub mod opcodes;
+pub mod dispatch;
+pub mod gateway;
+pub mod events;
--- /dev/null
+use serde::{Serialize, Deserialize};
+
+/// The first message sent by the gateway to initialize the heartbeating
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Hello {
+ #[serde(rename = "heartbeat_interval")]
+ pub heartbeat_interval: u64
+}
--- /dev/null
+use enumflags2::{BitFlags, bitflags};
+use serde::{Deserialize, Serialize};
+use super::presence::PresenceUpdate;
+
+
+#[bitflags]
+#[repr(u16)]
+#[derive(Clone, Copy, Debug)]
+pub enum Intents {
+ Guilds = 1 << 0,
+ GuildMembers = 1 << 1,
+ GuildBans = 1 << 2,
+ GuildEmojisAndStickers = 1 << 3,
+ GuildIntegrations = 1 << 4,
+ GuildWebhoks = 1 << 5,
+ GuildInvites = 1 << 6,
+ GuildVoiceStates = 1 << 7,
+ GuildPresences = 1 << 8,
+ GuildMessages = 1 << 9,
+ GuildMessagesReactions = 1 << 10,
+ GuildMessageTyping = 1 << 11,
+ DirectMessages = 1 << 12,
+ DirectMessagesReactions = 1 << 13,
+ DirectMessageTyping = 1 << 14,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct IdentifyProprerties {
+ #[serde(rename = "$os")]
+ pub os: String,
+ #[serde(rename = "$browser")]
+ pub browser: String,
+ #[serde(rename = "$device")]
+ pub device: String,
+}
+
+/// Messages sent by the shard to log-in to the gateway.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Identify {
+ pub token: String,
+ pub properties: IdentifyProprerties,
+ pub compress: Option<bool>,
+ pub large_threshold: Option<u64>,
+ pub shard: Option<[u64; 2]>,
+ pub presence: Option<PresenceUpdate>,
+ pub intents: BitFlags<Intents>,
+}
\ No newline at end of file
--- /dev/null
+pub mod hello;
+pub mod identify;
+pub mod resume;
+pub mod presence;
+use serde_repr::{Deserialize_repr, Serialize_repr};
+
+#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug, Clone, FromPrimitive, ToPrimitive)]
+#[repr(u8)]
+
+pub enum OpCodes {
+ Dispatch = 0,
+ Heartbeat = 1,
+ Identify = 2,
+ PresenceUpdate = 3,
+ VoiceStateUpdate = 4,
+ Resume = 6,
+ Reconnect = 7,
+ RequestGuildMembers = 8,
+ InvalidSession = 9,
+ Hello = 10,
+ HeartbeatACK = 11,
+}
\ No newline at end of file
--- /dev/null
+use serde_repr::{Deserialize_repr, Serialize_repr};
+use serde::{Deserialize, Serialize};
+#[derive(Serialize_repr, Deserialize_repr, Debug)]
+#[repr(u8)]
+pub enum ActivityType {
+ Game = 0,
+ Streaming = 1,
+ Listening = 2,
+ Watching = 3,
+ Custom = 4,
+ Competing = 5
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ActivityTimestamps {
+ start: u64,
+ end: u64,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ActivityEmoji {
+ name: String,
+ id: Option<String>,
+ animated: Option<bool>
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct Activity {
+ name: String,
+ #[serde(rename = "type")]
+ t: ActivityType,
+
+ url: Option<String>,
+ created_at: i64,
+ timestamp: Option<ActivityTimestamps>,
+ application_id: Option<String>,
+ details: Option<String>,
+ state: Option<String>,
+ emoji: Option<ActivityEmoji>,
+ // todo: implement more
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum PresenceStatus {
+ #[serde(rename = "online")]
+ Online,
+ #[serde(rename = "dnd")]
+ Dnd,
+ #[serde(rename = "idle")]
+ Idle,
+ #[serde(rename = "invisible")]
+ Invisible,
+ #[serde(rename = "offline")]
+ Offline
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct PresenceUpdate {
+ since: u64,
+ activities: Vec<Activity>,
+ status: PresenceStatus,
+ afk: bool,
+}
--- /dev/null
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Resume {
+ pub token: String,
+ pub session_id: String,
+ pub seq: u64,
+}
\ No newline at end of file
--- /dev/null
+use std::env;
+
+use futures::SinkExt;
+use log::{debug, error};
+use serde::Serialize;
+use serde_json::Value;
+use std::fmt::Debug;
+
+use crate::{error::GatewayError, payloads::{gateway::BaseMessage, opcodes::{OpCodes, identify::{Identify, IdentifyProprerties}, presence::PresenceUpdate, resume::Resume}}};
+
+use super::Shard;
+
+/// Implement the available actions for nova in the gateway.
+impl Shard {
+
+ /// sends a message through the websocket
+ pub async fn _send<T: Serialize + Debug>(&mut self, message: BaseMessage<T>) -> Result<(), GatewayError> {
+ debug!("senging message {:?}", message);
+ if let Some(connection) = &mut self.connection {
+ if let Err(e) = connection.conn.send(message).await {
+ error!("failed to send message {:?}", e);
+ Err(GatewayError::from(e))
+ } else {
+ Ok(())
+ }
+ } else {
+ Err(GatewayError::from("no open connection".to_string()))
+ }
+ }
+
+ pub async fn _identify(&mut self) -> Result<(), GatewayError> {
+ if let Some(state) = self.state.clone() {
+ self._send(BaseMessage{
+ t: None,
+ sequence: None,
+ op: OpCodes::Resume,
+ data: Resume {
+ token: self.config.token.clone(),
+ seq: state.sequence,
+ session_id: state.session_id.clone(),
+ },
+ }).await
+ } else {
+ self._send(BaseMessage{
+ t: None,
+ sequence: None,
+ op: OpCodes::Identify,
+ data: Identify {
+ token: self.config.token.clone(),
+ intents: self.config.intents,
+ properties: IdentifyProprerties {
+ os: env::consts::OS.to_string(),
+ browser: "Nova".to_string(),
+ device: "Nova".to_string(),
+ },
+ shard: Some([0,2]),
+ compress: Some(false),
+ large_threshold: Some(500),
+ presence: None,
+ },
+ }).await
+ }
+ }
+
+ pub async fn _disconnect(&mut self) {}
+
+ /// Updates the presence of the current shard.
+ #[allow(dead_code)]
+ pub async fn presence_update(&mut self, update: PresenceUpdate) -> Result<(), GatewayError> {
+ self._send(BaseMessage{
+ t: None,
+ sequence: None,
+ op: OpCodes::PresenceUpdate,
+ data: update,
+ }).await
+ }
+ /// Updates the voice status of the current shard in a certain channel.
+ #[allow(dead_code)]
+ pub async fn voice_state_update(&mut self) -> Result<(), GatewayError> {
+ if let Some(connection) = &mut self.connection {
+ connection.conn
+ .send(BaseMessage {
+ t: None,
+ sequence: None,
+ op: OpCodes::VoiceStateUpdate,
+ // todo: proper payload for this
+ data: Value::Null,
+ })
+ .await?
+ } else {
+ error!("the connection is not open")
+ }
+ Ok(())
+ }
+ /// Ask discord for more informations about offline guild members.
+ #[allow(dead_code)]
+ pub async fn request_guild_members(&mut self) -> Result<(), GatewayError> {
+ if let Some(connection) = &mut self.connection {
+ connection.conn
+ .send(BaseMessage {
+ t: None,
+ sequence: None,
+ op: OpCodes::RequestGuildMembers,
+ // todo: proper payload for this
+ data: Value::Null,
+ })
+ .await?
+ } else {
+ error!("the connection is not open")
+ }
+ Ok(())
+ }
+}
--- /dev/null
+use std::{
+ cmp::{max, min},
+ convert::TryInto,
+ time::Duration,
+};
+
+use crate::{
+ connection::Connection,
+ payloads::{
+ dispatch::Dispatch,
+ gateway::{BaseMessage, Message},
+ },
+ shard::state::SessionState,
+};
+
+use super::{state::ConnectionState, ConnectionWithState, Shard};
+use futures::StreamExt;
+use log::{error, info};
+use tokio::{select, time::sleep};
+
+impl Shard {
+ pub async fn start(self: &mut Self) {
+ let mut reconnects = 1;
+ info!("Starting shard");
+
+ while reconnects < self.config.max_reconnects {
+ info!("Starting connection for shard");
+ self._shard_task().await;
+ // when the shard got disconnected, the shard task ends
+ reconnects += 1;
+
+ // wait reconnects min(max(reconnects * reconnect_delay_growth_factor, reconnect_delay_minimum),reconnect_delay_maximum)
+ if reconnects < self.config.max_reconnects {
+ let time = min(
+ self.config.reconnect_delay_maximum,
+ max(
+ ((reconnects as f32) * self.config.reconnect_delay_growth_factor) as usize,
+ self.config.reconnect_delay_minimum,
+ ),
+ );
+ info!(
+ "The shard got disconnected, waiting for reconnect ({}ms)",
+ time
+ );
+ sleep(Duration::from_millis(time.try_into().unwrap())).await;
+ }
+ }
+ info!(
+ "The shard got disconnected too many times and reached the maximum {}",
+ self.config.max_reconnects
+ );
+ }
+
+ async fn _shard_task(&mut self) {
+ // create the new connection
+ let mut connection = Connection::new();
+ connection.start().await.unwrap();
+ self.connection = Some(ConnectionWithState {
+ conn: connection,
+ state: ConnectionState::default(),
+ });
+ loop {
+ if let Some(connection) = &mut self.connection {
+ select!(
+ payload = connection.conn.next() => {
+ match payload {
+ Some(data) => match data {
+ Ok(message) => self._handle(&message).await,
+ Err(error) => {
+ error!("An error occured while being connected to Discord: {:?}", error);
+ return;
+ },
+ },
+ None => {
+ info!("Connection terminated");
+ return;
+ },
+ }
+ }
+ )
+ }
+ }
+ }
+
+ fn _util_set_seq(&mut self, seq: Option<u64>) {
+ if let Some(seq) = seq {
+ if let Some(state) = &mut self.state {
+ state.sequence = seq;
+ }
+ }
+ }
+
+ async fn _handle(&mut self, message: &Message) {
+ match message {
+ Message::Dispatch(msg) => {
+ self._util_set_seq(msg.sequence);
+ self._dispatch(&msg).await;
+ }
+ // we need to reconnect to the gateway
+ Message::Reconnect(msg) => {
+ self._util_set_seq(msg.sequence);
+ info!("gateway disconnect requested");
+ self._disconnect().await;
+ }
+ Message::InvalidSession(msg) => {
+ self._util_set_seq(msg.sequence);
+ info!("invalid session");
+ let data = msg.data;
+ if !data {
+ info!("session removed");
+ // reset the session data
+ self.state = None;
+ if let Err(e) = self._identify().await {
+ error!("error while sending identify: {:?}", e);
+ }
+ }
+ }
+ Message::HeartbeatACK(msg) => {
+ self._util_set_seq(msg.sequence);
+ info!("heartbeat ack received");
+ }
+ Message::Hello(msg) => {
+ self._util_set_seq(msg.sequence);
+ info!("server hello received");
+ if let Err(e) = self._identify().await {
+ error!("error while sending identify: {:?}", e);
+ }
+ },
+ }
+ }
+
+ async fn _dispatch(&mut self, dispatch: &BaseMessage<Dispatch>) {
+ match &dispatch.data {
+ Dispatch::Ready(ready) => {
+ info!("received gateway dispatch hello");
+ info!(
+ "Logged in as {}",
+ ready.user.get("username").unwrap().to_string()
+ );
+ self.state = Some(SessionState {
+ sequence: dispatch.sequence.unwrap(),
+ session_id: ready.session_id.clone(),
+ });
+ }
+ Dispatch::Resumed(_) => {
+ info!("session resumed");
+ }
+ Dispatch::Other(data) => {}
+ }
+ }
+}
--- /dev/null
+use enumflags2::BitFlags;
+use serde::{Deserialize, Serialize};
+use crate::{connection::Connection, payloads::opcodes::identify::Intents};
+use self::state::{ConnectionState, SessionState};
+mod actions;
+mod connection;
+mod state;
+
+#[derive(Debug, Deserialize, Serialize, Default, Clone)]
+pub struct Sharding {
+ pub total_shards: i64,
+ pub current_shard: i64
+}
+
+
+#[derive(Debug, Deserialize, Serialize, Default, Clone)]
+pub struct ShardConfig {
+ pub max_reconnects: usize,
+ pub reconnect_delay_growth_factor: f32,
+ pub reconnect_delay_minimum: usize,
+ pub reconnect_delay_maximum: usize,
+ pub token: String,
+
+ pub large_threshold: Option<u64>,
+ pub shard: Option<Sharding>,
+ pub intents: BitFlags<Intents>
+}
+
+struct ConnectionWithState {
+ conn: Connection,
+ state: ConnectionState,
+}
+
+/// Represents a shard & all the reconnection logic related to it
+pub struct Shard {
+ connection: Option<ConnectionWithState>,
+ state: Option<SessionState>,
+ config: ShardConfig
+}
+
+impl Shard {
+ /// Creates a new shard instance
+ pub fn new(config: ShardConfig) -> Self {
+ Shard {
+ connection: None,
+ state: None,
+ config,
+ }
+ }
+}
--- /dev/null
+use std::time::Instant;
+
+/// This struct represents the state of a session
+#[derive(Clone, Debug)]
+pub struct SessionState {
+ pub sequence: u64,
+ pub session_id: String,
+}
+
+impl Default for SessionState {
+ fn default() -> Self {
+ Self {
+ sequence: Default::default(),
+ session_id: Default::default(),
+ }
+ }
+}
+
+/// This struct represents the state of a connection
+#[derive(Debug, Clone)]
+pub struct ConnectionState {
+ pub sequence: u64,
+ pub last_heartbeat_acknowledged: bool,
+ pub last_heartbeat_time: Instant,
+
+}
+impl Default for ConnectionState {
+ fn default() -> Self {
+ Self {
+ sequence: 0,
+ last_heartbeat_acknowledged: true,
+ last_heartbeat_time: Instant::now(),
+ }
+ }
+}
+
+impl ConnectionState {}
\ No newline at end of file
--- /dev/null
+/// Formats a url of connection to the gateway
+pub fn get_gateway_url (compress: bool, encoding: &str, v: i16) -> String {
+ return format!(
+ "wss://gateway.discord.gg/?v={}&encoding={}&compress={}",
+ v, encoding,
+ if compress { "zlib-stream" } else { "" }
+ );
+}
\ No newline at end of file