summaryrefslogtreecommitdiff
path: root/gateway/src/connection/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'gateway/src/connection/stream.rs')
-rw-r--r--gateway/src/connection/stream.rs99
1 files changed, 99 insertions, 0 deletions
diff --git a/gateway/src/connection/stream.rs b/gateway/src/connection/stream.rs
new file mode 100644
index 0000000..dbfab60
--- /dev/null
+++ b/gateway/src/connection/stream.rs
@@ -0,0 +1,99 @@
+use crate::{error::GatewayError, payloads::gateway::BaseMessage};
+
+use super::Connection;
+use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
+use log::info;
+use serde::Serialize;
+use std::{
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tokio_tungstenite::tungstenite::Message;
+
+/// Implementation of the Stream trait for the Connection
+impl Stream for Connection {
+ type Item = Result<crate::payloads::gateway::Message, GatewayError>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ // first, when a poll is called, we check if the connection is still open
+ if let Some(conn) = &mut self.connection {
+ // we need to wait poll the message using the tokio_tungstenite stream
+ let message = conn.poll_next_unpin(cx);
+
+ match message {
+ Poll::Ready(packet) => {
+ // if data is available, we can continue
+ match packet {
+ Some(result) => match result {
+ Ok(message) => {
+ match Box::pin(self._handle_message(&message)).poll_unpin(cx) {
+ Poll::Ready(data) => match data {
+ Ok(d) => Poll::Ready(Some(Ok(d))),
+ Err(e) => Poll::Ready(Some(Err(e))),
+ },
+ // unknown behaviour?
+ Poll::Pending => unreachable!(),
+ }
+ }
+ Err(e) => Poll::Ready(Some(Err(GatewayError::from(e)))),
+ },
+ // if no message is available, we return none, it's the end of the stream
+ None => {
+ info!("tokio_tungstenite stream finished successfully");
+ let _ = Box::pin(conn.close(None)).poll_unpin(cx);
+ self.connection = None;
+ Poll::Ready(None)
+ }
+ }
+ }
+ // if the message is pending, we return the same result
+ Poll::Pending => Poll::Pending,
+ }
+ } else {
+ Poll::Pending
+ }
+ }
+}
+
+/// Implementation of the Sink trait for the Connection
+impl<T: Serialize> Sink<BaseMessage<T>> for Connection {
+ type Error = tokio_tungstenite::tungstenite::Error;
+
+ #[allow(dead_code)]
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if let Some(conn) = &mut self.connection {
+ // a connection exists, we can send data
+ conn.poll_ready_unpin(cx)
+ } else {
+ Poll::Pending
+ }
+ }
+
+ #[allow(dead_code)]
+ fn start_send(mut self: Pin<&mut Self>, item: BaseMessage<T>) -> Result<(), Self::Error> {
+ if let Some(conn) = &mut self.connection {
+ let message = serde_json::to_string(&item);
+ conn.start_send_unpin(Message::Text(message.unwrap()))
+ .unwrap();
+ }
+ Ok(())
+ }
+
+ #[allow(dead_code)]
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if let Some(conn) = &mut self.connection {
+ conn.poll_flush_unpin(cx)
+ } else {
+ Poll::Pending
+ }
+ }
+
+ #[allow(dead_code)]
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if let Some(conn) = &mut self.connection {
+ conn.poll_close_unpin(cx)
+ } else {
+ Poll::Pending
+ }
+ }
+}