From 779cd4943cc096365259518b578bdf2b8beb05f0 Mon Sep 17 00:00:00 2001 From: MatthieuCoder Date: Thu, 5 Jan 2023 02:55:16 +0400 Subject: [PATCH] fix small things --- src/index.ts | 9 + src/sys/events/client.ts | 19 +- src/sys/events/transport.ts | 342 ++++++++++++++++++------------------ 3 files changed, 191 insertions(+), 179 deletions(-) diff --git a/src/index.ts b/src/index.ts index 2a9ea9e..26ffdf5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -28,8 +28,17 @@ emitter.on('messageCreate', async (message) => { await message.client.channels.createMessage(message.channel_id, { content: `Bonjour! <@${message.author.id}>`, }); + } else if (message.content === '~pid') { + + await message.client.channels.createMessage(message.channel_id, { + content: `Mon pid est ${process.pid}`, + }); } }); +emitter.onMessageCreate(async (message) => { + console.log(message.content); +}); + // We connect ourselves to the nova nats broker. (async () => emitter.start())(); diff --git a/src/sys/events/client.ts b/src/sys/events/client.ts index 562ff47..44de1f9 100644 --- a/src/sys/events/client.ts +++ b/src/sys/events/client.ts @@ -109,16 +109,10 @@ export class Client extends undefinedClient { }) { super(); this.rest = new REST(options.rest).setToken('_'); - this.transport = new Transport(this, options.transport); this.api = new API(this.rest); - // This is safe because this event is emitted by the EventEmitter itself. - this.on('newListener' as any, async (event: EventName) => { - await this.transport.subscribe(event); - }); - // Using a proxy to provide the 'on...' functionality - return new Proxy(this, { + let self = new Proxy(this, { get(self, symbol: keyof typeof Client) { const name = symbol.toString(); if (name.startsWith('on') && name.length > 2) { @@ -130,7 +124,7 @@ export class Client extends undefinedClient { self.on(eventName, fn); } - if (self.api[symbol] && self[symbol as string]) { + if (self.api[symbol] && !self[symbol as string]) { // eslint-disable-next-line @typescript-eslint/no-unsafe-return return self.api[symbol]; } @@ -138,6 +132,15 @@ export class Client extends undefinedClient { return self[symbol as string]; }, }); + + this.transport = new Transport(self, options.transport); + + // This is safe because this event is emitted by the EventEmitter itself. + this.on('newListener' as any, async (event: EventName) => { + await this.transport.subscribe(event); + }); + + return self; } public async start() { diff --git a/src/sys/events/transport.ts b/src/sys/events/transport.ts index cd2d8c2..c3c9f7a 100644 --- a/src/sys/events/transport.ts +++ b/src/sys/events/transport.ts @@ -1,185 +1,185 @@ -import {Buffer} from 'node:buffer'; +import { Buffer } from "node:buffer"; import { - connect, - type ConnectionOptions, - type NatsConnection, - type Subscription, -} from 'nats'; -import globRegex from 'glob-regex'; + connect, + type ConnectionOptions, + type NatsConnection, + type Subscription, +} from "nats"; +import globRegex from "glob-regex"; import { - type APIInteractionResponse, - InteractionResponseType, - type APIInteractionResponseCallbackData, - type GatewayDispatchPayload, - Routes, -} from 'discord-api-types/v10'; -import {type CamelCase} from 'type-fest'; -import {type Client, type EventName, type EventsHandlerArguments} from '.'; + type APIInteractionResponse, + InteractionResponseType, + type APIInteractionResponseCallbackData, + type GatewayDispatchPayload, + Routes, +} from "discord-api-types/v10"; +import { type CamelCase } from "type-fest"; +import { type Client, type EventName, type EventsHandlerArguments } from "."; /** * Options for the nats transport layer */ export type TransportOptions = { - additionalEvents?: Array; - nats?: ConnectionOptions; - queue: string; + additionalEvents?: Array; + nats?: ConnectionOptions; + queue: string; }; /** * Transport implements all the communication to Nova using Nats */ export class Transport { - // Nats connection - private nats: NatsConnection | undefined = null; - // Current subscriptions - private readonly subscriptions = new Map(); - // Current subscribed events - private readonly events = new Set(); - - // Creats a new Transport instance. - constructor( - private readonly emitter: Client, - private readonly config: Partial, - ) {} - - /** - * Starts a new nats client. - */ - public async start() { - this.nats = await connect(this.config?.nats); - - await Promise.all( - [...this.events].map(async (eventName) => this.subscribe(eventName)), - ); - - if (this.config.additionalEvents) { - await Promise.all( - this.config.additionalEvents.map(async (eventName) => - this.subscribe(eventName), - ), - ); - } - } - - /** - * Subscribe to a new topic - * @param event Event to subscribe to - * @returns - */ - public async subscribe(event: EventName) { - // If nats is not connected, we simply request to subscribe to it at startup - if (!this.nats) { - console.log('Requesting event ' + event); - this.events.add(event); - return; - } - - // Since the event names used by this library are camelCase'd we need to - // re-transform it to the UPPER_CASE used by nova. - const dashed = event.replace(/[A-Z]/g, (m) => '_' + m.toLowerCase()); - // Construct the topic name used by nova. - // This **is going to change** as we implement the caching component. - const topic = `nova.cache.dispatch.${dashed.toUpperCase()}`; - - // To avoid having multiple subscriptions covering this event - // we check if each of our subscriptions covers this scope. - const isAlreadyPresent = [...this.subscriptions.keys()].reduce( - (previous, current) => { - if (previous) { - return previous; - } - - const regex = globRegex(current); - - return regex.test(topic); - }, - false, - ); - - // We abord the subscriptions if it's already covered. - if (isAlreadyPresent) { - console.warn('nats subscription already covered.'); - return; - } - - // We remove all the subscriptions that are covered by out current subsciptions. - const regex = globRegex(topic); - for (const key of this.subscriptions.keys()) { - if (regex.test(key)) { - const subsciption = this.subscriptions.get(key); - if (!subsciption) { - continue; - } - - console.log(`unsubscribing from ${key}`); - subsciption.unsubscribe(); - } - } - - void this._subscriptionTask(topic); - } - - // Task that monitors the subscription - // It also listens for a subscription end. - private async _subscriptionTask(topic: string) { - if (!this.nats) { - throw new Error('nats connection is not started'); - } - - console.log(`subscribing to ${topic}`); - // Create the nats subscription - const subscription = this.nats.subscribe(topic, { - queue: this.config.queue, - }); - this.subscriptions.set(topic, subscription); - // Handle each event in the subscription stream. - for await (const publish of subscription) { - try { - // Decode the payload - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - const event: GatewayDispatchPayload = JSON.parse( - Buffer.from(publish.data).toString('utf8'), - ); - // Transform the event name to a camclCased name - const camelCasedName = event.t - .toLowerCase() - .replace(/_([a-z])/g, (g) => g[1].toUpperCase()) as CamelCase< - typeof event.t - >; - - // Since an interaction need a reponse, - // we need to handle the case where nova is not configured - // with a webhook endpoint, hence we need to use a post request - // against webhook execute endpoint with the interaction data. - if (event.t === 'INTERACTION_CREATE') { - const interaction = event.d; - const respond = async (respond: APIInteractionResponse) => { - if (publish.reply) { - publish.respond(Buffer.from(JSON.stringify(respond), 'utf8')); - } else { - await this.emitter.rest.post( - Routes.interactionCallback(interaction.id, interaction.token), - { - body: respond, - }, - ); - } - }; - - // Emit the - this.emitter.emit( - camelCasedName, - {...event.d, client: this.emitter}, - respond, - ); - } else { - // Typescript refuses to infer this, whyyy - this.emitter.emit(camelCasedName, { - ...event.d, - client: this.emitter, - } as any); - } - } catch {} - } - } + // Nats connection + private nats: NatsConnection | undefined = null; + // Current subscriptions + private readonly subscriptions = new Map(); + // Current subscribed events + private readonly events = new Set(); + + // Creats a new Transport instance. + constructor( + private readonly emitter: Client, + private readonly config: Partial + ) {} + + /** + * Starts a new nats client. + */ + public async start() { + this.nats = await connect(this.config?.nats); + + await Promise.all( + [...this.events].map(async (eventName) => this.subscribe(eventName)) + ); + + if (this.config.additionalEvents) { + await Promise.all( + this.config.additionalEvents.map(async (eventName) => + this.subscribe(eventName) + ) + ); + } + } + + /** + * Subscribe to a new topic + * @param event Event to subscribe to + * @returns + */ + public async subscribe(event: EventName) { + // If nats is not connected, we simply request to subscribe to it at startup + if (!this.nats) { + console.log("Requesting event " + event); + this.events.add(event); + return; + } + + // Since the event names used by this library are camelCase'd we need to + // re-transform it to the UPPER_CASE used by nova. + const dashed = event.replace(/[A-Z]/g, (m) => "_" + m.toLowerCase()); + // Construct the topic name used by nova. + // This **is going to change** as we implement the caching component. + const topic = `nova.cache.dispatch.${dashed.toUpperCase()}`; + + // To avoid having multiple subscriptions covering this event + // we check if each of our subscriptions covers this scope. + const isAlreadyPresent = [...this.subscriptions.keys()].reduce( + (previous, current) => { + if (previous) { + return previous; + } + + const regex = globRegex(current); + + return regex.test(topic); + }, + false + ); + + // We abord the subscriptions if it's already covered. + if (isAlreadyPresent) { + console.warn("nats subscription already covered."); + return; + } + + // We remove all the subscriptions that are covered by out current subsciptions. + const regex = globRegex(topic); + for (const key of this.subscriptions.keys()) { + if (regex.test(key)) { + const subsciption = this.subscriptions.get(key); + if (!subsciption) { + continue; + } + + console.log(`unsubscribing from ${key}`); + subsciption.unsubscribe(); + } + } + + void this._subscriptionTask(topic); + } + + // Task that monitors the subscription + // It also listens for a subscription end. + private async _subscriptionTask(topic: string) { + if (!this.nats) { + throw new Error("nats connection is not started"); + } + + console.log(`subscribing to ${topic}`); + // Create the nats subscription + const subscription = this.nats.subscribe(topic, { + queue: this.config.queue || "nova_consumer", + }); + this.subscriptions.set(topic, subscription); + // Handle each event in the subscription stream. + for await (const publish of subscription) { + try { + // Decode the payload + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const event: GatewayDispatchPayload = JSON.parse( + Buffer.from(publish.data).toString("utf8") + ); + // Transform the event name to a camclCased name + const camelCasedName = event.t + .toLowerCase() + .replace(/_([a-z])/g, (g) => g[1].toUpperCase()) as CamelCase< + typeof event.t + >; + + // Since an interaction need a reponse, + // we need to handle the case where nova is not configured + // with a webhook endpoint, hence we need to use a post request + // against webhook execute endpoint with the interaction data. + if (event.t === "INTERACTION_CREATE") { + const interaction = event.d; + const respond = async (respond: APIInteractionResponse) => { + if (publish.reply) { + publish.respond(Buffer.from(JSON.stringify(respond), "utf8")); + } else { + await this.emitter.rest.post( + Routes.interactionCallback(interaction.id, interaction.token), + { + body: respond, + } + ); + } + }; + + // Emit the + this.emitter.emit( + camelCasedName, + { ...event.d, client: this.emitter }, + respond + ); + } else { + // Typescript refuses to infer this, whyyy + this.emitter.emit(camelCasedName, { + ...event.d, + client: this.emitter, + } as any); + } + } catch {} + } + } } -- 2.39.5