diff options
Diffstat (limited to 'gateway/lib')
| -rw-r--r-- | gateway/lib/BUILD.bazel | 16 | ||||
| -rw-r--r-- | gateway/lib/gateway/gateway.go | 7 | ||||
| -rw-r--r-- | gateway/lib/gateway/transporters/rabbitmq.go | 60 | ||||
| -rw-r--r-- | gateway/lib/logger.go | 25 | ||||
| -rw-r--r-- | gateway/lib/prometheus.go | 20 |
5 files changed, 38 insertions, 90 deletions
diff --git a/gateway/lib/BUILD.bazel b/gateway/lib/BUILD.bazel index ee3e303..e69de29 100644 --- a/gateway/lib/BUILD.bazel +++ b/gateway/lib/BUILD.bazel @@ -1,16 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") - -go_library( - name = "lib", - srcs = [ - "logger.go", - "prometheus.go", - ], - importpath = "github.com/discordnova/nova/gateway/lib", - visibility = ["//visibility:public"], - deps = [ - "@com_github_prometheus_client_golang//prometheus/promhttp", - "@com_github_rs_zerolog//:zerolog", - "@com_github_rs_zerolog//log", - ], -) diff --git a/gateway/lib/gateway/gateway.go b/gateway/lib/gateway/gateway.go index 6f1abc3..ee9d13e 100644 --- a/gateway/lib/gateway/gateway.go +++ b/gateway/lib/gateway/gateway.go @@ -8,11 +8,11 @@ import ( "time" "github.com/boz/go-throttle" + gatewayTypes "github.com/discordnova/nova/common/discord/types/payloads/gateway" "github.com/discordnova/nova/common/discord/types/payloads/gateway/commands" "github.com/discordnova/nova/common/discord/types/payloads/gateway/events" "github.com/discordnova/nova/common/discord/types/structures" "github.com/discordnova/nova/common/discord/types/types" - gatewayTypes "github.com/discordnova/nova/common/discord/types/payloads/gateway" "github.com/discordnova/nova/common/gateway" "github.com/gorilla/websocket" "github.com/prometheus/client_golang/prometheus" @@ -363,7 +363,10 @@ func (discord *GatewayConnector) dispatch(message *gatewayTypes.Payload) { log.Err(err).Msg("failed to serialize the outgoing nova message") } - err = discord.options.Transporter.PushDispatchEvent(newName, data) + discord.options.Transporter.PushChannel() <- gateway.PushData{ + Data: data, + Name: newName, + } if err != nil { log.Err(err).Msg("failed to send the event to the nova event broker") diff --git a/gateway/lib/gateway/transporters/rabbitmq.go b/gateway/lib/gateway/transporters/rabbitmq.go index 96c3738..1a163ad 100644 --- a/gateway/lib/gateway/transporters/rabbitmq.go +++ b/gateway/lib/gateway/transporters/rabbitmq.go @@ -3,14 +3,14 @@ package transporters import ( "time" + "github.com/discordnova/nova/common/gateway" "github.com/rs/zerolog/log" "github.com/streadway/amqp" - "github.com/discordnova/nova/common/gateway" ) type RabbitMqTransporter struct { - connection *amqp.Connection - sendChannel *amqp.Channel + pullChannel chan []byte + pushChannel chan gateway.PushData } // NewRabbitMqTransporter creates a rabbitmq transporter using a given url @@ -19,13 +19,13 @@ func NewRabbitMqTransporter(url string) (gateway.Transporter, error) { conn, err := amqp.Dial(url) if err != nil { - return &RabbitMqTransporter{}, err + return nil, err } send, err := conn.Channel() if err != nil { - return &RabbitMqTransporter{}, err + return nil, err } err = send.ExchangeDeclare( @@ -39,32 +39,38 @@ func NewRabbitMqTransporter(url string) (gateway.Transporter, error) { ) if err != nil { - return &RabbitMqTransporter{}, err + return nil, err } - return RabbitMqTransporter{ - connection: conn, - sendChannel: send, + pullChannel, pushChannel := make(chan []byte), make(chan gateway.PushData) + + go func() { + for { + data := <-pushChannel + send.Publish( + "nova_gateway_dispatch", + data.Name, + false, + false, + amqp.Publishing{ + Priority: 1, + Timestamp: time.Now(), + Type: data.Name, + Body: data.Data, + }, + ) + } + }() + + return &RabbitMqTransporter{ + pullChannel: pullChannel, + pushChannel: pushChannel, }, nil } -// PushDispatchEvent dispatches a general event to all the bot workers. -func (transporter RabbitMqTransporter) PushDispatchEvent(t string, data []byte) error { - return transporter.sendChannel.Publish( - "nova_gateway_dispatch", - t, - false, - false, - amqp.Publishing{ - Priority: 1, - Timestamp: time.Now(), - Type: t, - Body: data, - }, - ) +func (t RabbitMqTransporter) PushChannel() chan gateway.PushData { + return t.pushChannel } - -// PushEventCache dispatches a cache specific events to all the cache workers. -func (transporter RabbitMqTransporter) PushEventCache(t string, data []byte) error { - return nil +func (t RabbitMqTransporter) PullChannel() chan []byte { + return t.pullChannel } diff --git a/gateway/lib/logger.go b/gateway/lib/logger.go deleted file mode 100644 index 1220805..0000000 --- a/gateway/lib/logger.go +++ /dev/null @@ -1,25 +0,0 @@ -package lib - -import ( - "flag" - "os" - - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" -) - -var ( - debug *bool = flag.Bool("debug", false, "enables the debug mode") - pretty *bool = flag.Bool("pretty", false, "enables the pretty log messages") -) - -func SetupLogger() { - if *pretty { - log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) - } - zerolog.TimeFieldFormat = zerolog.TimeFormatUnix - zerolog.SetGlobalLevel(zerolog.InfoLevel) - if *debug { - zerolog.SetGlobalLevel(zerolog.DebugLevel) - } -} diff --git a/gateway/lib/prometheus.go b/gateway/lib/prometheus.go deleted file mode 100644 index 98d7371..0000000 --- a/gateway/lib/prometheus.go +++ /dev/null @@ -1,20 +0,0 @@ -package lib - -import ( - "fmt" - "net/http" - - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/rs/zerolog/log" -) - -// CreatePrometheus creates a webserver instance that returns the metrics of the -// current program reported using promauto. -func CreatePrometheus(port int) { - http.Handle("/metrics", promhttp.Handler()) - err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil) - - if err != nil { - log.Err(err).Msgf("failed to start the prometheus reporting on the port :%d", port) - } -} |
