diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/commands/ping.ts | 2 | ||||
| -rw-r--r-- | src/events/client.ts | 34 | ||||
| -rw-r--r-- | src/events/event-emitter.ts | 56 | ||||
| -rw-r--r-- | src/events/transport.ts | 127 | ||||
| -rw-r--r-- | src/index.ts | 39 | ||||
| -rw-r--r-- | src/register.ts | 11 | ||||
| -rw-r--r-- | src/rest.ts | 11 | ||||
| -rw-r--r-- | src/sys/events/client.ts | 142 | ||||
| -rw-r--r-- | src/sys/events/index.ts (renamed from src/events/index.ts) | 0 | ||||
| -rw-r--r-- | src/sys/events/transport.ts | 175 | ||||
| -rw-r--r-- | src/sys/handler/builder.ts (renamed from src/handler/builder.ts) | 0 | ||||
| -rw-r--r-- | src/sys/handler/index.ts (renamed from src/handler/index.ts) | 15 |
12 files changed, 360 insertions, 252 deletions
diff --git a/src/commands/ping.ts b/src/commands/ping.ts index 9bf6bc0..a39e25a 100644 --- a/src/commands/ping.ts +++ b/src/commands/ping.ts @@ -5,7 +5,7 @@ import { ApplicationCommandType, InteractionResponseType, } from "discord-api-types/v10"; -import { CommandBuilder, HandlerFn } from "../handler"; +import { CommandBuilder, HandlerFn } from "../sys/handler"; import { promise } from "ping"; type Messages = { diff --git a/src/events/client.ts b/src/events/client.ts deleted file mode 100644 index 06c7903..0000000 --- a/src/events/client.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { GatewayDispatchPayload } from "discord-api-types/v10"; -import { BaseEventEmitter, HandlerFunction } from "./event-emitter"; -import { Transport, TransportOptions } from "./transport"; -import { CamelCase } from "type-fest"; -import { REST } from "@discordjs/rest"; - -type ExtractEvent<O extends GatewayDispatchPayload, U extends O["t"]> = Extract< - O & { t: Exclude<O["t"], Exclude<O["t"], U>> }, - { t: U } ->; - -export type Events = { - [P in GatewayDispatchPayload["t"] as `${CamelCase<P>}`]: [ - ExtractEvent<GatewayDispatchPayload, P>["d"] - ]; -}; - -export class EventClient extends BaseEventEmitter { - public transport: Transport; - // constructs - constructor(private rest: REST) { - super(); - this.transport = new Transport(this, rest); - } - - public async start(options: TransportOptions) { - await this.transport.start(options); - } - - on<K extends keyof Events>(name: K, fn: HandlerFunction<Events[K]>): this { - this.transport.subscribe(name); - return super.on(name, fn); - } -} diff --git a/src/events/event-emitter.ts b/src/events/event-emitter.ts deleted file mode 100644 index ad64cd5..0000000 --- a/src/events/event-emitter.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { EventEmitter } from "events"; -import { PascalCase } from "type-fest"; -import { Events } from "."; -import { APIInteractionResponse } from "discord-api-types/v10"; - -export type HandlerFunction<Args extends unknown[]> = ( - ...args: [...Args, ...[resolve?: (data: APIInteractionResponse) => void]] -) => unknown | Promise<unknown>; - -export type EventsFunctions = { - [P in keyof Events as P extends string ? `on${PascalCase<P>}` : never]: ( - fn: HandlerFunction<Events[P]> - ) => BaseEventEmitter; -}; - -// Typings for the EventClient -export interface BaseEventEmitter extends EventEmitter { - addListener<K extends keyof Events>( - name: K, - fn: HandlerFunction<Events[K]> - ): this; - - on<K extends keyof Events>(name: K, fn: HandlerFunction<Events[K]>): this; - - once<K extends keyof Events>(name: K, fn: HandlerFunction<Events[K]>): this; - - off<K extends keyof Events>(name: K, fn: HandlerFunction<Events[K]>): this; - - prependListener<K extends keyof Events>( - name: K, - fn: HandlerFunction<Events[K]> - ): this; - - prependOnceListener<K extends keyof Events>( - name: K, - fn: HandlerFunction<Events[K]> - ): this; - - removeAllListeners(eventName: keyof Events | undefined): this; - removeListener(eventName: keyof Events): this; - - emit<T extends keyof Events>( - name: T, - respond: (data: APIInteractionResponse) => void, - ...args: Events[T] - ): boolean; - listenerCount(event: keyof Events): number; - listeners<T extends keyof Events>(event: T): HandlerFunction<Events[T]>[]; - rawListeners: this["listeners"]; -} - -export class BaseEventEmitter extends EventEmitter implements BaseEventEmitter { - constructor() { - super(); - } -} diff --git a/src/events/transport.ts b/src/events/transport.ts deleted file mode 100644 index 882ce41..0000000 --- a/src/events/transport.ts +++ /dev/null @@ -1,127 +0,0 @@ -import { connect, ConnectionOptions, NatsConnection } from "nats"; -import { EventClient, Events } from "."; -import globRegex from "glob-regex"; -import { REST } from "@discordjs/rest"; -import { - APIInteractionResponse, - GatewayDispatchPayload, - GatewayInteractionCreateDispatch, - Routes, -} from "discord-api-types/v10"; -import { CamelCase } from "type-fest"; - -export type TransportOptions = { - additionalEvents?: (keyof Events)[]; - nats?: ConnectionOptions; - queue: string; -}; -export class Transport { - private nats: NatsConnection | null = null; - private subscription: Map<string, Function> = new Map(); - private queue?: string; - private events: Set<string> = new Set(); - - constructor(private emitter: EventClient, private rest: REST) {} - - public async start(options: TransportOptions) { - this.nats = await connect(options?.nats); - this.queue = options.queue; - if (options.additionalEvents) { - options.additionalEvents.forEach((a) => this.events.add(a)); - } - - let initial_events = [...this.events]; - - for (let subscription of initial_events) { - await this.subscribe(subscription); - } - } - - public async subscribe(event: string) { - if (!this.nats) { - console.log("Requesting event " + event); - this.events.add(event); - return; - } - let dashed = event.replace(/[A-Z]/g, (m) => "_" + m.toLowerCase()); - event = `nova.cache.dispatch.${dashed.toUpperCase()}`; - - let isAlreadyPresent = [...this.subscription.keys()].reduce( - (previous, current) => { - if (previous) return previous; - let regex = globRegex(current); - - return regex.test(event); - }, - false - ); - - if (isAlreadyPresent) { - console.warn("nats subscription already covered."); - return; - } - - let regex = globRegex(event); - [...this.subscription.keys()].map((key) => { - if (regex.test(key)) { - let v = this.subscription.get(key); - if (!v) { - return; - } - - console.log(`unsubscribing from ${key}`); - v(); - } - }); - - this._subTask(event, this.queue || "default_queue"); - } - - private async _subTask(event: string, queue: string) { - if (!this.nats) { - throw new Error("nats transporter is not started."); - } - - console.log(`subscribing to ${event}`); - let resolve: Function = () => {}; - let task = new Promise((r) => { - resolve = r; - }); - let sub = this.nats.subscribe(event, { queue: "" }); - - const fn = async () => { - for await (let data of sub) { - let string = Buffer.from(data.data).toString("utf-8"); - let d: GatewayDispatchPayload = JSON.parse(string); - let respond: (repond: APIInteractionResponse) => void | null = null; - const camelCased = d.t.toLowerCase().replace(/_([a-z])/g, function (g) { - return g[1].toUpperCase(); - }) as CamelCase<`${typeof d.t}`>; - - if (camelCased === "integrationCreate") { - let interaction = d.d as GatewayInteractionCreateDispatch["d"]; - respond = (respond: APIInteractionResponse) => { - if (data.reply) { - data.respond(Buffer.from(JSON.stringify(respond), "utf-8")); - } else { - this.rest.post( - Routes.webhook(interaction.channel_id, interaction.token), - { body: respond } - ); - } - }; - console.log("expecting reply."); - } - - this.emitter.emit(camelCased, respond, d.d as any); - } - }; - this.subscription.set(event, resolve); - - await Promise.race([task, fn()]); - - console.log(`finished task for ${event}`); - sub.unsubscribe(); - this.subscription.delete(event); - } -} diff --git a/src/index.ts b/src/index.ts index 62cfdcb..c95e664 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,23 +1,34 @@ -import { EventClient } from "./events/index"; -import { buildHandler } from "./handler"; +import { Client } from "./sys/events"; +import { buildHandler } from "./sys/handler"; import { commands } from "./commands"; -import { rest } from "./rest"; /** - * We instanciate our nova broken client. + * We instanciate our nova broker client. */ -const emitter = new EventClient(rest); - -// We register our slash command handler. -emitter.on("interactionCreate", buildHandler(commands)); - -// We connect ourselves to the nova nats broker. -emitter - .start({ +const emitter = new Client({ + transport: { additionalEvents: [], nats: { servers: ["localhost:4222"], }, queue: "nova-worker-common", - }) - .catch(console.log); + }, + rest: { + api: "http://localhost:8090/api", + }, +}); + +// We register our slash command handler. +emitter.on("interactionCreate", buildHandler(commands)); + +// Simple message handler +emitter.on("messageCreate", (message) => { + if (message.content === "~ping") { + message.client.channels.createMessage(message.channel_id, { + content: `Bonjour! <@${message.author.id}>`, + }); + } +}); + +// We connect ourselves to the nova nats broker. +emitter.start().catch(console.log); diff --git a/src/register.ts b/src/register.ts index af01e76..92ce36a 100644 --- a/src/register.ts +++ b/src/register.ts @@ -1,7 +1,14 @@ +import { REST } from "@discordjs/rest"; import { commands } from "./commands"; -import { registerCommands } from "./handler"; -import { rest } from "./rest"; +import { registerCommands } from "./sys/handler"; +const rest = new REST({ + version: "10", + headers: { Authorization: "" }, + api: "http://localhost:8090/api", + }).setToken("_"); + + /** * We register the commands with discord */ diff --git a/src/rest.ts b/src/rest.ts deleted file mode 100644 index ab9ff54..0000000 --- a/src/rest.ts +++ /dev/null @@ -1,11 +0,0 @@ -require('source-map-support').install(); -import { REST } from "@discordjs/rest"; - -/** - * Rest client used to communicate with discord - */ -export const rest = new REST({ - version: "10", - headers: { Authorization: "" }, - api: "http://localhost:8090/api", -}).setToken("_"); diff --git a/src/sys/events/client.ts b/src/sys/events/client.ts new file mode 100644 index 0000000..90d7447 --- /dev/null +++ b/src/sys/events/client.ts @@ -0,0 +1,142 @@ +import { + APIApplicationCommandPermissionsConstant, + APIInteractionResponse, + APIInteractionResponseCallbackData, + GatewayDispatchPayload, + GatewayInteractionCreateDispatchData, +} from "discord-api-types/v10"; +import { Transport, TransportOptions } from "./transport"; +import { CamelCase, PascalCase } from "type-fest"; +import { REST, RESTOptions } from "@discordjs/rest"; +import { EventEmitter } from "stream"; +import TypedEmitter from "typed-emitter"; + +import { API } from "@discordjs/core"; + +/** + * Maps an event name (O['t']) and a Union O and extracts alla the union members that have a matching O['t'] + * Example: + * type Variant1 = { t: 'type1', myProperty: 1 }; + * type Variant2 = { t: 'type2', anotherProperty: 2 }; + * type ExampleUnion = Variant1 | Variant2; + * + * let variant1: ExtractVariant<ExampleUnion, 'type1'>; // Type of variant1 is Variant1 + * let variant2: ExtractVariant<ExampleUnion, 'type2'>; // Type of variant2 is Variant2 + * + */ +type ExtractVariant<O extends { t: string }, U extends O["t"]> = Extract< + O & { t: Exclude<O["t"], Exclude<O["t"], U>> }, + { t: U } +>; + +/** + * Add intrisics properties to the event, such as `client` and `rest` + */ +export type WithIntrisics<T> = T & { client: Client }; +export type EventName = keyof EventsHandlerArguments; +/** + * Reprends a handler function with one argument + */ +export type HandlerFunction<Arg extends unknown[]> = ( + ...args: Arg +) => unknown | Promise<unknown>; + +export type EventTypes = { + [P in GatewayDispatchPayload["t"]]: WithIntrisics< + ExtractVariant<GatewayDispatchPayload, P>["d"] + >; +}; + +/** + * Maps all events from GatewayDispatchPayload['t'] (GatewayDispatchEvents) and maps them to a camelcase event name + * Also reteives the type of the event using ExtractEvent + */ +export type EventsHandlerArguments = { + [P in keyof EventTypes as `${CamelCase<P>}`]: HandlerFunction< + [EventTypes[P]] + >; +} & { + interactionCreate: HandlerFunction< + [ + WithIntrisics<GatewayInteractionCreateDispatchData>, + (interactionCreate: APIInteractionResponseCallbackData) => void + ] + >; +}; + +/** + * Defines all the 'on...' functions on the client + * This is implemented by a Proxy + */ +export type EventsFunctions = { + [P in keyof EventsHandlerArguments as P extends string + ? `on${PascalCase<P>}` + : never]: (fn: EventsHandlerArguments[P]) => Client; +}; + +/** + * Defines all the methods known to be implemented + */ +interface ClientFunctions + extends EventsFunctions, + TypedEmitter<EventsHandlerArguments>, + API {} + +/** + * The real extended class is an EventEmitter. + */ +const UndefinedClient: { new (): ClientFunctions } = EventEmitter as any; + +/** + * nova.js client + * + * Used to interact with nova, emits events from nova + * Example: + * client.on('messageCreate', (message) => { console.log('Message received', message.content) }); + * client.on('interactionCreate', (message) => { }); + */ +export class Client extends UndefinedClient { + private readonly transport: Transport; + private readonly api: API; + public readonly rest: REST; + + constructor(options: { + rest?: Partial<RESTOptions>; + transport: TransportOptions; + }) { + super(); + this.rest = new REST(options.rest); + this.transport = new Transport(this, options.transport); + this.api = new API(this.rest); + + // This is safe because this event is emitted by the EventEmitter itself. + this.on("newListener" as any, (event: EventName) => { + this.transport.subscribe(event); + }); + + // Using a proxy to provide the 'on...' functionality + return new Proxy(this, { + get(self, symbol) { + let name = symbol.toString(); + if (name.startsWith("on") && name.length > 2) { + // Get the event name + let eventName = [name[2].toLowerCase(), name.slice(3)].join( + "" + ) as EventName; + return (fn: EventsHandlerArguments[typeof eventName]) => + self.on(eventName, fn); + } + + if (self.api[symbol] && self[symbol]) { + return self.api[symbol]; + } + + return self[symbol]; + }, + }); + } + + public start() { + return this.transport.start(); + } +} diff --git a/src/events/index.ts b/src/sys/events/index.ts index 5ec7692..5ec7692 100644 --- a/src/events/index.ts +++ b/src/sys/events/index.ts diff --git a/src/sys/events/transport.ts b/src/sys/events/transport.ts new file mode 100644 index 0000000..28be5cf --- /dev/null +++ b/src/sys/events/transport.ts @@ -0,0 +1,175 @@ +import { connect, ConnectionOptions, NatsConnection, Subscription } from "nats"; +import { + Client, + EventName, + EventTypes, + EventsHandlerArguments, + WithIntrisics, +} from "."; +import globRegex from "glob-regex"; +import { + APIInteraction, + APIInteractionResponse, + APIInteractionResponseCallbackData, + GatewayDispatchPayload, + GatewayInteractionCreateDispatch, + Routes, +} from "discord-api-types/v10"; +import { CamelCase } from "type-fest"; + +/** + * Options for the nats transport layer + */ +export type TransportOptions = { + additionalEvents?: (keyof EventsHandlerArguments)[]; + nats?: ConnectionOptions; + queue: string; +}; + +/** + * Transport implements all the communication to Nova using Nats + */ +export class Transport { + // Nats connection + private nats: NatsConnection | null = null; + // Current subscriptions + private subscriptions: Map<string, Subscription> = new Map(); + // Current subscribed events + private events: Set<EventName> = new Set(); + + // Creats a new Transport instance. + constructor( + private emitter: Client, + private config: Partial<TransportOptions> + ) {} + + /** + * Starts a new nats client. + */ + public async start() { + this.nats = await connect(this.config?.nats); + + for (let eventName of this.events) { + await this.subscribe(eventName); + } + if (this.config.additionalEvents) { + for (let eventName of this.config.additionalEvents) { + await this.subscribe(eventName); + } + } + } + + /** + * Subscribe to a new topic + * @param event Event to subscribe to + * @returns + */ + public async subscribe(event: EventName) { + // If nats is not connected, we simply request to subscribe to it at startup + if (!this.nats) { + console.log("Requesting event " + event); + this.events.add(event); + return; + } + + // Since the event names used by this library are camelCase'd we need to + // re-transform it to the UPPER_CASE used by nova. + let dashed = event.replace(/[A-Z]/g, (m) => "_" + m.toLowerCase()); + // Construct the topic name used by nova. + // This **is going to change** as we implement the caching component. + let topic = `nova.cache.dispatch.${dashed.toUpperCase()}`; + + // To avoid having multiple subscriptions covering this event + // we check if each of our subscriptions covers this scope. + let isAlreadyPresent = [...this.subscriptions.keys()].reduce( + (previous, current) => { + if (previous) return previous; + let regex = globRegex(current); + + return regex.test(topic); + }, + false + ); + + // We abord the subscriptions if it's already covered. + if (isAlreadyPresent) { + console.warn("nats subscription already covered."); + return; + } + + // We remove all the subscriptions that are covered by out current subsciptions. + let regex = globRegex(topic); + [...this.subscriptions.keys()].map((key) => { + if (regex.test(key)) { + let subsciption = this.subscriptions.get(key); + if (!subsciption) { + return; + } + + console.log(`unsubscribing from ${key}`); + subsciption.unsubscribe(); + } + }); + + this._subscriptionTask(topic); + } + + // Task that monitors the subscription + // It also listens for a subscription end. + private async _subscriptionTask(topic: string) { + if (!this.nats) { + throw new Error("nats connection is not started"); + } + + console.log(`subscribing to ${topic}`); + // Create the nats subscription + let subscription = this.nats.subscribe(topic, { queue: this.config.queue }); + this.subscriptions.set(topic, subscription); + // Handle each event in the subscription stream. + for await (let publish of subscription) { + try { + // Decode the payload + let event: GatewayDispatchPayload = JSON.parse( + Buffer.from(publish.data).toString("utf-8") + ); + // Transform the event name to a camclCased name + const camelCasedName = event.t + .toLowerCase() + .replace(/_([a-z])/g, function (g) { + return g[1].toUpperCase(); + }) as CamelCase<typeof event.t>; + + // Since an interaction need a reponse, + // we need to handle the case where nova is not configured + // with a webhook endpoint, hence we need to use a post request + // against webhook execute endpoint with the interaction data. + if (event.t === "INTERACTION_CREATE") { + let interaction = event.d; + let respond = async (respond: APIInteractionResponseCallbackData) => { + if (publish.reply) { + publish.respond(Buffer.from(JSON.stringify(respond), "utf-8")); + } else { + await this.emitter.interactions.reply( + interaction.id, + interaction.token, + respond + ); + } + }; + // Emit the + this.emitter.emit( + camelCasedName, + { ...event.d, client: this.emitter }, + respond + ); + } else { + // Typescript refuses to infer this, whyyy + this.emitter.emit(camelCasedName, { + ...event.d, + client: this.emitter, + } as any); + } + } catch (e) {} + } + } +} diff --git a/src/handler/builder.ts b/src/sys/handler/builder.ts index feb5780..feb5780 100644 --- a/src/handler/builder.ts +++ b/src/sys/handler/builder.ts diff --git a/src/handler/index.ts b/src/sys/handler/index.ts index 9852d60..c9a5343 100644 --- a/src/handler/index.ts +++ b/src/sys/handler/index.ts @@ -3,6 +3,7 @@ import { APIApplicationCommandInteraction, APIInteraction, APIInteractionResponse, + APIInteractionResponseCallbackData, InteractionType, RESTPostAPIApplicationCommandsJSONBody, RESTPostAPIChatInputApplicationCommandsJSONBody, @@ -17,7 +18,7 @@ export type PromiseLike<T> = T | Promise<T>; */ export type HandlerFn = ( data: APIApplicationCommandInteraction -) => PromiseLike<APIInteractionResponse>; +) => PromiseLike<APIInteractionResponseCallbackData>; export type Command = { json: RESTPostAPIChatInputApplicationCommandsJSONBody; @@ -35,11 +36,11 @@ export const registerCommands = async ( rest: REST, applicationId: string ) => { - for (const command of commands) { - await rest.post(Routes.applicationCommands(applicationId), { - body: command.json as RESTPostAPIApplicationCommandsJSONBody, - }); - } + await rest.post(Routes.applicationCommands(applicationId), { + body: [...commands].map( + (x) => x.json + ) as RESTPostAPIApplicationCommandsJSONBody[], + }); }; /** @@ -55,7 +56,7 @@ export const buildHandler = (commands: Iterable<Command>) => { return async ( event: APIInteraction, - reply?: (data: APIInteractionResponse) => void + reply?: (data: APIInteractionResponseCallbackData) => void ) => { console.log("executing: ", event.data); if (event.type === InteractionType.ApplicationCommand) { |
