-use crate::client::payloads::{message::OpCodes, payloads::Hello};
-
+use crate::client::payloads::{message::OpCodes, payloads::{Hello, Identify, IdentifyProprerties}};
use super::{
payloads::message::MessageBase,
state::{Stage, State},
utils::get_gateway_url,
};
-use futures_util::{
- SinkExt, StreamExt,
-};
-use log::{error, info, warn};
+use flate2::write::ZlibDecoder;
+use futures_util::{SinkExt, StreamExt};
+use log::{debug, error, info, warn};
use std::{str::from_utf8, time::Duration};
-use tokio::{
- net::TcpStream,
- select,
- time::{Instant},
-};
-use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message}};
+use tokio::{net::TcpStream, select, time::Instant};
+use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::{self, Message, handshake::client::Request}};
#[derive(Debug)]
pub enum CloseReason {
ConnexionError(tungstenite::Error),
}
+pub struct Config {
+ pub token: String,
+ pub compress: bool,
+}
+
pub enum HandleResult {
Success,
Error(CloseReason),
/// the MessageHandler trait.
pub struct Connexion {
state: State,
+ config: Config,
+ zlib: ZlibDecoder<Vec<u8>>,
connexion: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
+ nats: Option<async_nats::Connection>,
}
impl Connexion {
/// websocket connexion. This instance is not initialized by default.
/// a websocket connexion like this can be re-used multiple times
/// to allow reconnexion mechanisms.
- pub async fn new() -> Self {
+ pub async fn new(config: Config) -> Self {
Connexion {
state: State::default(),
connexion: None,
+ config,
+ zlib: ZlibDecoder::<Vec<u8>>::new(vec![]),
+ nats: None,
}
}
} else {
// we reset the state before starting the connection
self.state = State::default();
+ let request = Request::builder()
+ .header("User-Agant", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36")
+ .uri(get_gateway_url(false, "json", 9))
+ .body(())
+ .unwrap();
- let connexion_result = connect_async(get_gateway_url(false, "json", 9)).await;
+ let connexion_result = connect_async(request).await;
// we connect outselves to the websocket server
if let Err(err) = connexion_result {
- return CloseReason::ConnexionError(err)
+ return CloseReason::ConnexionError(err);
}
self.connexion = Some(connexion_result.unwrap().0);
-
+ self.nats = Some(async_nats::connect("localhost:4222").await.unwrap());
// this is the loop that will maintain the whole connexion
loop {
if let Some(connexion) = &mut self.connexion {
if self.state.stage == Stage::Unknown {
let msg = connexion.next().await;
if let HandleResult::Error(reason) = self._handle_message(&msg).await {
- return reason
+ return reason;
}
} else {
let timer = self.state.interval.as_mut().unwrap().tick();
}
}
} else {
- return CloseReason::ConnexionEnded
+ return CloseReason::ConnexionEnded;
}
}
}
HandleResult::Error(CloseReason::ConnexionEnded)
}
- _ => {
- HandleResult::Error(CloseReason::ErrorEncountered("unsupported message type encountered"))
- }
+ _ => HandleResult::Error(CloseReason::ErrorEncountered(
+ "unsupported message type encountered",
+ )),
},
- Err(_error) => {
- HandleResult::Error(CloseReason::ErrorEncountered("error while reading a message"))
- }
+ Err(_error) => HandleResult::Error(CloseReason::ErrorEncountered(
+ "error while reading a message",
+ )),
}
} else {
- HandleResult::Error(CloseReason::ErrorEncountered("error while reading a message"))
+ HandleResult::Error(CloseReason::ErrorEncountered(
+ "error while reading a message",
+ ))
}
}
}
match message.op {
- OpCodes::Dispatch => todo!(),
- OpCodes::Heartbeat => todo!(),
- OpCodes::Identify => todo!(),
+ OpCodes::Dispatch => {
+ let t = message.t.unwrap();
+ info!("dispatch message received: {:?}", t);
+ let topic = format!("nova.gateway.{}", t);
+ self.nats.as_ref().unwrap().publish(
+ &topic,
+ &serde_json::to_vec(&message.d).unwrap(),
+ ).await.unwrap();
+ },
OpCodes::PresenceUpdate => todo!(),
OpCodes::VoiceStateUpdate => todo!(),
- OpCodes::Resume => todo!(),
OpCodes::Reconnect => todo!(),
- OpCodes::RequestGuildMembers => todo!(),
OpCodes::InvalidSession => todo!(),
OpCodes::Hello => {
if let Ok(hello) = serde_json::from_value::<Hello>(message.d) {
Duration::from_millis(hello.heartbeat_interval),
));
self.state.stage = Stage::Initialized;
+ self._send(&MessageBase {
+ t: None,
+ op: OpCodes::Identify,
+ s: None,
+ d: serde_json::to_value(&Identify{
+ token: self.config.token.clone(),
+ intents: 1 << 0 |
+ 1 << 1 |
+ 1 << 2 |
+ 1 << 3 |
+ 1 << 4 |
+ 1 << 5 |
+ 1 << 6 |
+ 1 << 7 |
+ 1 << 8 |
+ 1 << 9 |
+ 1 << 10 |
+ 1 << 11 |
+ 1 << 12 |
+ 1 << 13 |
+ 1 << 14,
+ properties: IdentifyProprerties {
+ os: "Linux".into(),
+ browser: "Nova".into(),
+ device: "Linux".into(),
+ }
+ }).unwrap(),
+ }).await;
+ // do login
+ // todo: session logic
}
}
OpCodes::HeartbeatACK => {
);
self.state.last_heartbeat_acknowledged = true;
}
+ _ => {} // invalid payloads
}
}
async fn _do_heartbeat(&mut self) {
if !self.state.last_heartbeat_acknowledged {
- self._terminate_websocket(CloseReason::ErrorEncountered("the server did not acknowledged the last heartbeat")).await;
+ self._terminate_websocket(CloseReason::ErrorEncountered(
+ "the server did not acknowledged the last heartbeat",
+ ))
+ .await;
return;
}
self.state.last_heartbeat_acknowledged = false;
info!("sending heartbeat");
- self._send(
- serde_json::to_vec(&MessageBase {
- t: None,
- d: serde_json::to_value(self.state.sequence).unwrap(),
- s: None,
- op: OpCodes::Heartbeat,
- })
- .unwrap(),
- )
+ self._send(&MessageBase {
+ t: None,
+ d: serde_json::to_value(self.state.sequence).unwrap(),
+ s: None,
+ op: OpCodes::Heartbeat,
+ })
.await;
self.state.last_heartbeat_time = std::time::Instant::now();
}
- async fn _send(&mut self, data: Vec<u8>) {
+ async fn _send(&mut self, data: &MessageBase) {
if let Some(connexion) = &mut self.connexion {
- if let Err(error) = connexion.send(Message::Binary(data)).await {
- error!("failed to write to socket: {}", error);
- self._terminate_websocket(CloseReason::ErrorEncountered("failed to write to the socket")).await;
+ if let Ok(json) = serde_json::to_vec(data) {
+ if let Err(error) = connexion.send(Message::Binary(json)).await {
+ error!("failed to write to socket: {}", error);
+ self._terminate_websocket(CloseReason::ErrorEncountered(
+ "failed to write to the socket",
+ ))
+ .await;
+ }
+ } else {
+ self._terminate_websocket(CloseReason::ErrorEncountered(
+ "failed to serialize the message",
+ ))
+ .await;
}
}
}