- name: Execute build script\r
shell: bash\r
run: |\r
- bazel build //:package\r
+ bazel build //:packages\r
\r
- name: Bazel artifacts\r
uses: actions/upload-artifact@v2\r
- name: Publish docker images\r
shell: bash\r
run: |\r
- bazel run --define docker_repo=ghcr.io --define docker_tag=${{ steps.extract_branch.outputs.branch }} //:publish\r
+ bazel run --define docker_repo=ghcr.io --define docker_tag=${{ steps.extract_branch.outputs.branch }} //:container_publish\r
if: matrix.os == 'ubuntu-latest'\r
dependencies = [
"async-nats",
"async-stream",
+ "async-trait",
"common",
"enumflags2",
"flate2",
--- /dev/null
+use std::fmt;
+
+pub struct NovaError {
+ pub message: String,
+}
+
+impl fmt::Display for NovaError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "An error occured wihind the nova system: {}", self.message) // user-facing output
+ }
+}
+
+impl fmt::Debug for NovaError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{{ file: {}, line: {} }}", file!(), line!()) // programmer-facing output
+ }
+}
+
pub mod config;
pub mod monitoring;
pub mod nats;
-pub mod payloads;
\ No newline at end of file
+pub mod payloads;
+pub mod error;
\ No newline at end of file
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))]
pub struct CachePayload<T> {
- pub tracing: (),
+ pub tracing: Tracing,
pub data: T
}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct Tracing {
+ pub node_id: String,
+ pub span: Option<String>
+}
\ No newline at end of file
enumflags2 = { version ="0.7.1", features = ["serde"] }
common = { path = "../common/rust" }
tokio-scoped = "0.1.0"
-futures = "0.3.17"
\ No newline at end of file
+futures = "0.3.17"
+async-trait = "0.1.51"
\ No newline at end of file
--- /dev/null
+[monitoring]
+enabled = false
+
+[nats]
+host = "localhost"
+
+[gateway]
+++ /dev/null
-use super::{Connection, state::ConnectionState};
-
-impl Connection {
- /// Returns the current state of the connection.
- pub fn state(&self) -> ConnectionState {
- return self.state.clone();
- }
-}
\ No newline at end of file
+use super::{error_utils::GatewayError, utils::get_gateway_url};
use tokio::net::TcpStream;
-use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
-
+use tokio_tungstenite::{
+ connect_async, tungstenite::handshake::client::Request, MaybeTlsStream, WebSocketStream,
+};
mod stream;
-mod state;
-mod actions;
+mod utils;
/// Underlying representation of a Discord event stream
/// that streams the Event payloads to the shard structure
pub struct Connection {
/// The channel given by tokio_tungstenite that represents the websocket connection
connection: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
- /// The state of the connection
- state: state::ConnectionState,
}
impl Connection {
pub fn new() -> Self {
- Connection {
- connection: None,
- state: state::ConnectionState::default()
+ Connection { connection: None }
+ }
+
+ pub async fn start(&mut self) -> Result<(), GatewayError> {
+ let request = Request::builder()
+ .uri(get_gateway_url(false, "json", 9))
+ .body(())
+ .unwrap();
+
+ let connection_result = connect_async(request).await;
+ // we connect outselves to the websocket server
+ if let Err(err) = connection_result {
+ Err(GatewayError::from(err))
+ } else {
+ self.connection = Some(connection_result.unwrap().0);
+ Ok(())
}
}
-}
\ No newline at end of file
+}
+++ /dev/null
-/// This struct represents the state of a connection
-#[derive(Debug, Clone)]
-pub struct ConnectionState {}
-impl Default for ConnectionState {
- fn default() -> Self {
- Self { }
- }
-}
-
-impl ConnectionState {}
\ No newline at end of file
+use super::Connection;
+use crate::client::{error_utils::GatewayError};
+use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
+use log::info;
+use serde::Serialize;
use std::{
pin::Pin,
task::{Context, Poll},
};
-
-use super::Connection;
-use crate::client::payloads::message::MessageBase;
-use futures::{Sink, Stream};
+use tokio_tungstenite::tungstenite::Message;
impl Stream for Connection {
- type Item = MessageBase;
+ type Item = Result<crate::client::payloads::gateway::Message, GatewayError>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ // first, when a poll is called, we check if the connection is still open
+ if let Some(conn) = &mut self.connection {
+ // we need to wait poll the message using the tokio_tungstenite stream
+ let message = conn.poll_next_unpin(cx);
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- Poll::Pending
+ match message {
+ Poll::Ready(packet) => {
+ // if data is available, we can continue
+ match packet {
+ Some(result) => match result {
+ Ok(message) => {
+ match Box::pin(self._handle_message(&message)).poll_unpin(cx) {
+ Poll::Ready(data) => match data {
+ Ok(d) => Poll::Ready(Some(Ok(d))),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ },
+ // unknown behaviour?
+ Poll::Pending => unimplemented!(),
+ }
+ }
+ Err(e) => Poll::Ready(Some(Err(GatewayError::from(e)))),
+ },
+ // if no message is available, we return none, it's the end of the stream
+ None => {
+ info!("tokio_tungstenite stream finished successfully");
+ Box::pin(conn.close(None)).poll_unpin(cx);
+ self.connection = None;
+ Poll::Ready(None)
+ }
+ }
+ }
+ // if the message is pending, we return the same result
+ Poll::Pending => Poll::Pending,
+ }
+ } else {
+ Poll::Pending
+ }
}
}
-impl Sink<MessageBase> for Connection {
- type Error = ();
+impl<T: Serialize> Sink<crate::client::payloads::gateway::FullMessage<T>> for Connection {
+ type Error = GatewayError;
- fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- todo!()
+ #[allow(dead_code)]
+ fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if let Some(_) = &self.connection {
+ // a connection exists, we can send data
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Pending
+ }
}
- fn start_send(self: Pin<&mut Self>, item: MessageBase) -> Result<(), Self::Error> {
- todo!()
+ #[allow(dead_code)]
+ fn start_send(mut self: Pin<&mut Self>, item: crate::client::payloads::gateway::FullMessage<T>) -> Result<(), Self::Error> {
+ if let Some(conn) = &mut self.connection {
+ let message = serde_json::to_string(&item);
+ conn.start_send_unpin(Message::Text(message.unwrap()))
+ .unwrap();
+ }
+ Ok(())
}
- fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- todo!()
+ #[allow(dead_code)]
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
- fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- todo!()
+ #[allow(dead_code)]
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
}
--- /dev/null
+use super::Connection;
+use crate::client::{error_utils::GatewayError};
+use std::str::from_utf8;
+use tokio_tungstenite::tungstenite::Message;
+
+impl Connection {
+ pub(crate) async fn _handle_message(
+ &mut self,
+ data: &Message,
+ ) -> Result<crate::client::payloads::gateway::Message, GatewayError> {
+ match data {
+ Message::Text(text) => self._handle_discord_message(&text).await,
+ Message::Binary(message) => {
+ self._handle_discord_message(from_utf8(message).unwrap())
+ .await
+ }
+ _ => Err(GatewayError::from("unknown error".to_string())),
+ }
+ }
+
+ async fn _handle_discord_message(
+ &mut self,
+ raw_message: &str,
+ ) -> Result<crate::client::payloads::gateway::Message, GatewayError> {
+ let a: Result<crate::client::payloads::gateway::Message, serde_json::Error> = serde_json::from_str(raw_message);
+ let message = a.unwrap();
+ Ok(message)
+ }
+}
--- /dev/null
+use common::error::NovaError;
+
+#[derive(Debug)]
+pub struct GatewayError(NovaError);
+
+impl From<tokio_tungstenite::tungstenite::Error> for GatewayError {
+ fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
+ GatewayError {
+ 0: NovaError {
+ message: e.to_string(),
+ },
+ }
+ }
+}
+
+impl From<String> for GatewayError {
+ fn from(e: String) -> Self {
+ GatewayError {
+ 0: NovaError {
+ message: e,
+ },
+ }
+ }
+}
pub mod connection;
pub mod payloads;
-pub mod shard;
\ No newline at end of file
+pub mod shard;
+pub mod utils;
+mod error_utils;
\ No newline at end of file
--- /dev/null
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+#[derive(Serialize, Deserialize, Clone, Debug)]
+pub struct Ready {
+ #[serde(rename = "v")]
+ version: u64,
+ user: Value,
+ guilds: Vec<Value>,
+ session_id: String,
+ shard: Option<[i64;2]>,
+ application: Value,
+}
+
+#[derive(Serialize, Deserialize, Clone, Debug)]
+#[serde(tag = "t", content = "d")]
+pub enum Dispatch {
+ #[serde(rename = "READY")]
+ Ready(Ready),
+}
\ No newline at end of file
--- /dev/null
+use super::dispatch::Dispatch;
+use super::payloads::hello::Hello;
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+use serde_repr::{Deserialize_repr, Serialize_repr};
+
+macro_rules! num_to_enum {
+ ($num:expr => $enm:ident<$tpe:ty>{ $($fld:ident),+ }; $err:expr) => ({
+ match $num {
+ $(_ if $num == $enm::$fld as $tpe => { $enm::$fld })+
+ _ => $err
+ }
+ });
+}
+
+#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)]
+#[repr(u8)]
+pub enum OpCodes {
+ Dispatch = 0,
+ Heartbeat = 1,
+ Identify = 2,
+ PresenceUpdate = 3,
+ VoiceStateUpdate = 4,
+ Resume = 6,
+ Reconnect = 7,
+ RequestGuildMembers = 8,
+ InvalidSession = 9,
+ Hello = 10,
+ HeartbeatACK = 11,
+}
+
+#[derive(Serialize, Deserialize, PartialEq, Debug)]
+#[serde(bound(deserialize = "T: Deserialize<'de>"))]
+pub struct FullMessage<T> {
+ #[serde(rename = "d")]
+ pub dispatch_type: Option<String>,
+ #[serde(rename = "s")]
+ pub sequence: Option<OpCodes>,
+ pub op: OpCodes,
+ #[serde(rename = "d")]
+ pub data: T,
+}
+
+pub enum Message {
+ Dispatch(FullMessage<Dispatch>),
+ Reconnect(FullMessage<()>),
+ InvalidSession(FullMessage<bool>),
+ Hello(FullMessage<Hello>),
+ HeartbeatACK(FullMessage<()>),
+}
+
+impl<'de> serde::Deserialize<'de> for Message {
+ fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
+ let value = Value::deserialize(d)?;
+ let val = value.get("op").and_then(Value::as_u64).unwrap();
+ let op_code = num_to_enum!(
+ val => OpCodes<u64>{
+ Dispatch,
+ Heartbeat,
+ Identify,
+ PresenceUpdate,
+ VoiceStateUpdate,
+ Resume,
+ Reconnect,
+ RequestGuildMembers,
+ InvalidSession,
+ Hello,
+ HeartbeatACK
+ };
+ panic!("Cannot convert number to `MyEnum`")
+ );
+
+ match op_code {
+ OpCodes::Dispatch => Ok(Message::Dispatch(FullMessage::deserialize(value).unwrap())),
+ OpCodes::Reconnect => Ok(Message::Reconnect(FullMessage::deserialize(value).unwrap())),
+ OpCodes::InvalidSession => Ok(Message::InvalidSession(
+ FullMessage::deserialize(value).unwrap(),
+ )),
+ OpCodes::Hello => Ok(Message::Hello(FullMessage::deserialize(value).unwrap())),
+ OpCodes::HeartbeatACK => Ok(Message::HeartbeatACK(
+ FullMessage::deserialize(value).unwrap(),
+ )),
+ _ => panic!("Cannot convert"),
+ }
+ }
+}
+++ /dev/null
-use serde_json::Value;
-use serde_repr::{Serialize_repr, Deserialize_repr};
-use serde::{Deserialize, Serialize};
-
-
-#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)]
-#[repr(u8)]
-pub enum OpCodes {
- Dispatch = 0,
- Heartbeat = 1,
- Identify = 2,
- PresenceUpdate = 3,
- VoiceStateUpdate = 4,
- Resume = 6,
- Reconnect = 7,
- RequestGuildMembers = 8,
- InvalidSession = 9,
- Hello = 10,
- HeartbeatACK = 11,
-}
-
-#[derive(Serialize, Deserialize)]
-pub enum Dispatch {
- #[serde(rename = "READY")]
- Ready,
- #[serde(rename = "RESUMED")]
- Resumed,
-}
-
-#[derive(Serialize, Deserialize, PartialEq, Debug)]
-pub struct MessageBase {
- pub t: Option<String>,
- pub s: Option<i64>,
- pub op: OpCodes,
- pub d: Value
-}
pub mod payloads;
-pub mod message;
\ No newline at end of file
+pub mod dispatch;
+pub mod gateway;
\ No newline at end of file
+++ /dev/null
-use serde::{Serialize, Deserialize};
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct Hello {
- #[serde(rename = "heartbeat_interval")]
- pub heartbeat_interval: u64
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct HeartbeatACK {}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct IdentifyProprerties {
- #[serde(rename = "$os")]
- pub os: String,
- #[serde(rename = "$browser")]
- pub browser: String,
- #[serde(rename = "$device")]
- pub device: String,
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct Identify {
- pub token: String,
- pub intents: u16,
- pub properties: IdentifyProprerties,
- pub shard: Option<[i64; 2]>,
-}
\ No newline at end of file
--- /dev/null
+use serde::{Serialize, Deserialize};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Hello {
+ #[serde(rename = "heartbeat_interval")]
+ pub heartbeat_interval: u64
+}
--- /dev/null
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct IdentifyProprerties {
+ #[serde(rename = "$os")]
+ pub os: String,
+ #[serde(rename = "$browser")]
+ pub browser: String,
+ #[serde(rename = "$device")]
+ pub device: String,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Identify {
+ pub token: String,
+ pub intents: u16,
+ pub properties: IdentifyProprerties,
+ pub shard: Option<[i64; 2]>,
+}
\ No newline at end of file
--- /dev/null
+pub mod hello;
+pub mod identify;
\ No newline at end of file
use log::error;
use serde_json::Value;
-use crate::client::payloads::message::{MessageBase, OpCodes};
+use crate::client::payloads::gateway::{FullMessage, OpCodes};
use super::Shard;
pub async fn presence_update(&mut self) -> Result<(), ()> {
if let Some(connection) = &mut self.connection {
connection
- .send(MessageBase {
- t: None,
- s: None,
+ .send(FullMessage {
+ dispatch_type: None,
+ sequence: None,
op: OpCodes::PresenceUpdate,
// todo: proper payload for this
- d: Value::Null,
+ data: Value::Null,
})
- .await?;
+ .await
+ .unwrap();
} else {
error!("the connection is not open")
}
pub async fn voice_state_update(&mut self) -> Result<(), ()> {
if let Some(connection) = &mut self.connection {
connection
- .send(MessageBase {
- t: None,
- s: None,
+ .send(FullMessage {
+ dispatch_type: None,
+ sequence: None,
op: OpCodes::VoiceStateUpdate,
// todo: proper payload for this
- d: Value::Null,
+ data: Value::Null,
})
- .await?;
+ .await
+ .unwrap();
} else {
error!("the connection is not open")
}
pub async fn request_guild_members(&mut self) -> Result<(), ()> {
if let Some(connection) = &mut self.connection {
connection
- .send(MessageBase {
- t: None,
- s: None,
+ .send(FullMessage {
+ dispatch_type: None,
+ sequence: None,
op: OpCodes::RequestGuildMembers,
// todo: proper payload for this
- d: Value::Null,
+ data: Value::Null,
})
- .await?;
+ .await
+ .unwrap();
} else {
error!("the connection is not open")
}
use log::info;
impl Shard {
- async fn start(self: &mut Self) {
+ pub async fn start(self: &mut Self) {
let mut should_exit = false;
while !should_exit {
info!("Starting connection for shard");
// create the new connection
- self.connection = Some(Connection::new());
+ let mut connection = Connection::new();
+ connection.start().await.unwrap();
+ self.connection = Some(connection);
should_exit = true;
}
}
-use std::sync::Arc;
-
use self::state::SessionState;
use super::connection::Connection;
+use std::time::Instant;
+
/// This struct represents the state of a session
pub struct SessionState {
pub sequence: u64,
}
}
}
+
+/// This struct represents the state of a connection
+#[derive(Debug, Clone)]
+pub struct ConnectionState {
+ pub sequence: u64,
+ pub last_heartbeat_acknowledged: bool,
+ pub last_heartbeat_time: Instant,
+
+}
+impl Default for ConnectionState {
+ fn default() -> Self {
+ Self {
+ sequence: 0,
+ last_heartbeat_acknowledged: true,
+ last_heartbeat_time: Instant::now(),
+ }
+ }
+}
+
+impl ConnectionState {}
\ No newline at end of file
mod client;
+use client::connection::Connection;
use common::config::Settings;
+use futures::StreamExt;
+use log::info;
+use serde_json::Value;
+
+use crate::client::payloads::{dispatch::Dispatch, gateway::{FullMessage, Message, OpCodes}, payloads::identify::{Identify, IdentifyProprerties}};
#[tokio::main]
-async fn main() {
- let settings: Settings<()> = Settings::new("gateway").unwrap();
+async fn main() {
+ let settings: Settings<Value> = Settings::new("gateway").unwrap();
+
+ let mut conn = Connection::new();
+ conn.start().await.unwrap();
+
+ loop {
+ if let Some(val) = conn.next().await {
+ let data = val.as_ref().unwrap();
+ match data {
+ Message::Dispatch(dispatch) => {
+ match &dispatch.data {
+ Dispatch::Ready(_ready) => {
+
+ },
+ }
+ },
+ Message::Reconnect(_) => todo!(),
+ Message::InvalidSession(_) => todo!(),
+ Message::Hello(_hello) => {
+ info!("Server said hello! {:?}", _hello);
+ },
+ Message::HeartbeatACK(_) => todo!(),
+ }
+
+ } else {
+ break;
+ }
+ }
}
},
_ => {
let payload = serde_json::to_string(&common::payloads::CachePayload {
- tracing: (),
+ tracing: common::payloads::Tracing {
+ node_id: "".to_string(),
+ span: None,
+ },
data: value,
}).unwrap();