import { type QueueJobName } from "@prisma/client";
import PgBoss from "pg-boss";
import { log } from "winston";
import { config } from "~/config";
import { type AppContext } from "~/lib/context";
import makeDatabaseUrl from "~/lib/context/makeDatabaseUrl";
import { logError, logInfo, logWarn } from "~/lib/logger";
import { type BaseJobData } from "~/lib/queue/baseJobDataSchema";
import { getWorkers, type WorkerParams } from "~/lib/queue/getWorkers";
import { makeHandler } from "~/lib/queue/makeHandler";
import { type QueueDriver, type SendOptions } from "~/lib/queue/makeQueueDriver";

export const EXIT_SUCCESS = 0;

export class PgBossDriver implements QueueDriver {
  private boss: PgBoss;

  private started = false;

  constructor() {
    this.boss = new PgBoss(makeDatabaseUrl(config.database.url, { forWorker: true }));
    this.boss.on("error", (error) => log("error", `[worker] Queue error`, { error }));
  }

  async start(ctx: AppContext) {
    if (this.started) return;

    // kind of duplicate with above but we have the context here
    this.boss.on("error", (error) => logError(ctx, "[worker] Queue error", { error }));

    await this.boss.start();
    this.started = true;

    logInfo(ctx, "[worker] pg-boss queue driver started");
  }

  async initWorkers(ctx: AppContext) {
    this.handleProcessStop(ctx);

    await this.start(ctx);

    const promises = getWorkers().map((worker) => this.addWorker(ctx, worker));

    await Promise.all(promises);
  }

  async sendJob<T extends BaseJobData>(ctx: AppContext, jobName: QueueJobName, data: T, options: SendOptions) {
    await this.start(ctx);

    return this.boss.send(jobName, data, options);
  }

  private async addWorker<T extends BaseJobData>(ctx: AppContext, params: WorkerParams<T>) {
    const { jobName } = params;
    logInfo(ctx, "[worker] Adding worker for job", { jobName });

    const handler = makeHandler(ctx, params);

    await this.boss.work(jobName, { includeMetadata: true }, handler);

    logInfo(ctx, "[worker] Worker for job has been added", { jobName });
  }

  private handleProcessStop(ctx: AppContext) {
    // Makes sure we don't forget this required ENV everywhere we need it
    // Note: in DEV this is not available
    if (!process.env.NEXT_MANUAL_SIG_HANDLE) {
      logWarn(ctx, "[worker] Manual SIG handling disabled");
      return;
    }

    logInfo(ctx, "[worker] Setting up manual SIG handling");

    // The promise returned by `boss.stop()` is resolved immediately.
    // We need to wait for all active jobs to have been completed before gracefully exiting the process.
    this.boss.on("stopped", async () => {
      logInfo(ctx, `[worker] Gracefully handled termination, exiting in ${config.datadog.flushTimeInMs}ms`);

      // This allows datadog to flush traces before process exits
      setTimeout(() => process.exit(EXIT_SUCCESS), config.datadog.flushTimeInMs);
    });

    ["SIGINT", "SIGTERM"].forEach((signal) => {
      process.on(signal, async () => {
        logInfo(ctx, `[worker] ${signal} received`);

        // Adding inline documentation as the old one is really hard to find
        await this.boss.stop({
          // Default: `true`.
          // If `true`, the PgBoss instance will wait for any workers that are
          // currently processing jobs to finish, up to the specified timeout.
          // During this period, new jobs will not be processed, but active jobs
          // will be allowed to finish.
          graceful: true,
          // Default: 30000.
          // Maximum time (in milliseconds) to wait for workers to finish job
          // processing before shutting down the PgBoss instance.
          timeout: 120_000,
          // Default: `false`.
          // If `true` and the database connection is managed by pg-boss,
          // it will destroy the connection pool.
          destroy: false,
        });
      });
    });
  }
}
