import { type QueueJobName } from "@prisma/client";
import { mapSeries } from "bluebird";
import PgBoss from "pg-boss";
import { config } from "~/config";
import { type AppContext } from "~/lib/context";
import makeDatabaseUrl from "~/lib/context/makeDatabaseUrl";
import { traceBusinessService } from "~/lib/datadog/tracing";
import { difference } from "~/lib/lodash";
import { logError, logInfo, logWarn } from "~/lib/logger";
import { type BaseJobData } from "~/lib/queue/baseJobDataSchema";
import { type QueueDriver, type SendOptions } from "~/lib/queue/drivers/queueDriver";
import { getWorkers, type WorkerParams } from "~/lib/queue/getWorkers";
import { makeHandler } from "~/lib/queue/makeHandler";

export const EXIT_SUCCESS = 0;

export const DEFAULT_WORKER_OPTIONS = {
  includeMetadata: true,
  priority: config.worker.usePriority,
  batchSize: config.worker.batchSize,
  pollingIntervalSeconds: config.worker.pollingIntervalSeconds,
} as const;

export class PgBossDriver implements QueueDriver {
  private boss: PgBoss;

  private started = false;

  constructor() {
    this.boss = new PgBoss({
      connectionString: makeDatabaseUrl(config.database.url, { forWorker: true }),
      schema: config.worker.pgbossSchema,
    });
  }

  async start(ctx: AppContext) {
    if (this.started) return;

    // kind of duplicate with above but we have the context here
    this.boss.on("error", (error) => logError(ctx, "[worker] Queue error", { error }));

    await this.boss.start();
    this.started = true;

    logInfo(ctx, "[worker] pg-boss queue driver started");
  }

  async initQueues(ctx: AppContext) {
    await this.start(ctx);

    const existingQueues = (await this.boss.getQueues()).map((queue) => queue.name);
    const wantedQueues = getWorkers().map((worker) => worker.jobName);

    return mapSeries(difference(wantedQueues, existingQueues), (queue) => this.boss.createQueue(queue));
  }

  async initWorkers(ctx: AppContext) {
    this.handleProcessStop(ctx);

    await this.start(ctx);

    await this.initQueues(ctx);

    const promises = getWorkers().map((worker) => this.addWorker(ctx, worker));

    await Promise.all(promises);
  }

  async sendJob<T extends BaseJobData>(ctx: AppContext, jobName: QueueJobName, data: T, options: SendOptions) {
    await this.start(ctx);

    return this.boss.send(jobName, data, options);
  }

  async cancelJob(ctx: AppContext, params: { jobName: QueueJobName; jobId: string }) {
    const { jobName, jobId } = params;
    logInfo(ctx, "[worker] Cancelling job", { jobName, jobId });

    return traceBusinessService(
      {
        tags: { jobId },
        serviceName: "pgBossDriverCancelJob",
      },
      async () => this.boss.cancel(jobName, jobId)
    );
  }

  private async addWorker<T extends BaseJobData>(ctx: AppContext, params: WorkerParams<T>) {
    const { jobName } = params;

    logInfo(ctx, "[worker] Adding worker for job", { jobName, workerOptions: DEFAULT_WORKER_OPTIONS });

    const handler = makeHandler(ctx, params);

    await this.boss.work(jobName, DEFAULT_WORKER_OPTIONS, handler);

    logInfo(ctx, "[worker] Worker for job has been added", { jobName });
  }

  private handleProcessStop(ctx: AppContext) {
    // Makes sure we don't forget this required ENV everywhere we need it
    // Note: in DEV this is not available
    if (!process.env.NEXT_MANUAL_SIG_HANDLE) {
      logWarn(ctx, "[worker] Manual SIG handling disabled");
      return;
    }

    logInfo(ctx, "[worker] Setting up manual SIG handling");

    // The promise returned by `boss.stop()` is resolved immediately.
    // We need to wait for all active jobs to have been completed before gracefully exiting the process.
    this.boss.on("stopped", async () => {
      logInfo(ctx, `[worker] Gracefully handled termination, exiting in ${config.datadog.flushTimeInMs}ms`);

      // This allows datadog to flush traces before process exits
      setTimeout(() => process.exit(EXIT_SUCCESS), config.datadog.flushTimeInMs);
    });

    ["SIGINT", "SIGTERM"].forEach((signal) => {
      process.on(signal, async () => {
        logInfo(ctx, `[worker] ${signal} received`);

        await this.boss.stop({
          graceful: true,
          timeout: 120_000,
          close: false,
        });
      });
    });
  }
}
