summaryrefslogtreecommitdiff
path: root/gateway/lib
diff options
context:
space:
mode:
Diffstat (limited to 'gateway/lib')
-rw-r--r--gateway/lib/BUILD.bazel16
-rw-r--r--gateway/lib/gateway/gateway.go7
-rw-r--r--gateway/lib/gateway/transporters/rabbitmq.go60
-rw-r--r--gateway/lib/logger.go25
-rw-r--r--gateway/lib/prometheus.go20
5 files changed, 38 insertions, 90 deletions
diff --git a/gateway/lib/BUILD.bazel b/gateway/lib/BUILD.bazel
index ee3e303..e69de29 100644
--- a/gateway/lib/BUILD.bazel
+++ b/gateway/lib/BUILD.bazel
@@ -1,16 +0,0 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
-
-go_library(
- name = "lib",
- srcs = [
- "logger.go",
- "prometheus.go",
- ],
- importpath = "github.com/discordnova/nova/gateway/lib",
- visibility = ["//visibility:public"],
- deps = [
- "@com_github_prometheus_client_golang//prometheus/promhttp",
- "@com_github_rs_zerolog//:zerolog",
- "@com_github_rs_zerolog//log",
- ],
-)
diff --git a/gateway/lib/gateway/gateway.go b/gateway/lib/gateway/gateway.go
index 6f1abc3..ee9d13e 100644
--- a/gateway/lib/gateway/gateway.go
+++ b/gateway/lib/gateway/gateway.go
@@ -8,11 +8,11 @@ import (
"time"
"github.com/boz/go-throttle"
+ gatewayTypes "github.com/discordnova/nova/common/discord/types/payloads/gateway"
"github.com/discordnova/nova/common/discord/types/payloads/gateway/commands"
"github.com/discordnova/nova/common/discord/types/payloads/gateway/events"
"github.com/discordnova/nova/common/discord/types/structures"
"github.com/discordnova/nova/common/discord/types/types"
- gatewayTypes "github.com/discordnova/nova/common/discord/types/payloads/gateway"
"github.com/discordnova/nova/common/gateway"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
@@ -363,7 +363,10 @@ func (discord *GatewayConnector) dispatch(message *gatewayTypes.Payload) {
log.Err(err).Msg("failed to serialize the outgoing nova message")
}
- err = discord.options.Transporter.PushDispatchEvent(newName, data)
+ discord.options.Transporter.PushChannel() <- gateway.PushData{
+ Data: data,
+ Name: newName,
+ }
if err != nil {
log.Err(err).Msg("failed to send the event to the nova event broker")
diff --git a/gateway/lib/gateway/transporters/rabbitmq.go b/gateway/lib/gateway/transporters/rabbitmq.go
index 96c3738..1a163ad 100644
--- a/gateway/lib/gateway/transporters/rabbitmq.go
+++ b/gateway/lib/gateway/transporters/rabbitmq.go
@@ -3,14 +3,14 @@ package transporters
import (
"time"
+ "github.com/discordnova/nova/common/gateway"
"github.com/rs/zerolog/log"
"github.com/streadway/amqp"
- "github.com/discordnova/nova/common/gateway"
)
type RabbitMqTransporter struct {
- connection *amqp.Connection
- sendChannel *amqp.Channel
+ pullChannel chan []byte
+ pushChannel chan gateway.PushData
}
// NewRabbitMqTransporter creates a rabbitmq transporter using a given url
@@ -19,13 +19,13 @@ func NewRabbitMqTransporter(url string) (gateway.Transporter, error) {
conn, err := amqp.Dial(url)
if err != nil {
- return &RabbitMqTransporter{}, err
+ return nil, err
}
send, err := conn.Channel()
if err != nil {
- return &RabbitMqTransporter{}, err
+ return nil, err
}
err = send.ExchangeDeclare(
@@ -39,32 +39,38 @@ func NewRabbitMqTransporter(url string) (gateway.Transporter, error) {
)
if err != nil {
- return &RabbitMqTransporter{}, err
+ return nil, err
}
- return RabbitMqTransporter{
- connection: conn,
- sendChannel: send,
+ pullChannel, pushChannel := make(chan []byte), make(chan gateway.PushData)
+
+ go func() {
+ for {
+ data := <-pushChannel
+ send.Publish(
+ "nova_gateway_dispatch",
+ data.Name,
+ false,
+ false,
+ amqp.Publishing{
+ Priority: 1,
+ Timestamp: time.Now(),
+ Type: data.Name,
+ Body: data.Data,
+ },
+ )
+ }
+ }()
+
+ return &RabbitMqTransporter{
+ pullChannel: pullChannel,
+ pushChannel: pushChannel,
}, nil
}
-// PushDispatchEvent dispatches a general event to all the bot workers.
-func (transporter RabbitMqTransporter) PushDispatchEvent(t string, data []byte) error {
- return transporter.sendChannel.Publish(
- "nova_gateway_dispatch",
- t,
- false,
- false,
- amqp.Publishing{
- Priority: 1,
- Timestamp: time.Now(),
- Type: t,
- Body: data,
- },
- )
+func (t RabbitMqTransporter) PushChannel() chan gateway.PushData {
+ return t.pushChannel
}
-
-// PushEventCache dispatches a cache specific events to all the cache workers.
-func (transporter RabbitMqTransporter) PushEventCache(t string, data []byte) error {
- return nil
+func (t RabbitMqTransporter) PullChannel() chan []byte {
+ return t.pullChannel
}
diff --git a/gateway/lib/logger.go b/gateway/lib/logger.go
deleted file mode 100644
index 1220805..0000000
--- a/gateway/lib/logger.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package lib
-
-import (
- "flag"
- "os"
-
- "github.com/rs/zerolog"
- "github.com/rs/zerolog/log"
-)
-
-var (
- debug *bool = flag.Bool("debug", false, "enables the debug mode")
- pretty *bool = flag.Bool("pretty", false, "enables the pretty log messages")
-)
-
-func SetupLogger() {
- if *pretty {
- log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
- }
- zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
- zerolog.SetGlobalLevel(zerolog.InfoLevel)
- if *debug {
- zerolog.SetGlobalLevel(zerolog.DebugLevel)
- }
-}
diff --git a/gateway/lib/prometheus.go b/gateway/lib/prometheus.go
deleted file mode 100644
index 98d7371..0000000
--- a/gateway/lib/prometheus.go
+++ /dev/null
@@ -1,20 +0,0 @@
-package lib
-
-import (
- "fmt"
- "net/http"
-
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/rs/zerolog/log"
-)
-
-// CreatePrometheus creates a webserver instance that returns the metrics of the
-// current program reported using promauto.
-func CreatePrometheus(port int) {
- http.Handle("/metrics", promhttp.Handler())
- err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
-
- if err != nil {
- log.Err(err).Msgf("failed to start the prometheus reporting on the port :%d", port)
- }
-}