import uniqid from "uniqid";
import { type AppContext } from "~/lib/context";
import { makeContext } from "~/lib/context/makeContext";
import { logError, logInfo } from "~/lib/logger";
import { type BaseJobData } from "~/lib/queue/baseJobDataSchema";
import { type QueueDriver, type SendOptions } from "~/lib/queue/drivers/queueDriver";
import { getWorkers, type JobPayload, type WorkerService } from "~/lib/queue/getWorkers";
import { makeHandler } from "~/lib/queue/makeHandler";
import { type QueueJobName } from "~/lib/queue/queueJobName";
import { fireAndForget, sleep } from "~/lib/utils";

export class AsyncDriver implements QueueDriver {
  private newCtx!: AppContext;

  async start(ctx: AppContext) {
    // The async driver does need any particular action to start
    // as jobs will be automatically picked up by the same process.

    logInfo(ctx, "[worker] async queue driver started");
  }

  async initWorkers() {
    // The async driver does need to register workers
    // as jobs will be automatically picked up by the same process.
  }

  async sendJob<T extends BaseJobData>(_ctx: AppContext, jobName: QueueJobName, data: T, options: SendOptions) {
    if (!this.newCtx) {
      this.newCtx = makeContext({ forApp: true });
    }

    const worker = getWorkers().find((worker) => worker.jobName === jobName);
    if (!worker) {
      logError(this.newCtx, "[worker] Worker for job not found. Job will not be handled", { jobName });
      return null;
    }

    const handler = makeHandler(this.newCtx, {
      jobName,
      service: worker.service as WorkerService<T>,
    });

    const job = {
      id: uniqid(),
      singletonKey: options.singletonKey ?? null,
      data,
      retryCount: 0,
      retryLimit: 0,
      output: {
        message: "Job created",
      },
    } satisfies JobPayload<T>;

    const handleWithRetry = async (job: JobPayload<T>) => {
      try {
        await handler([job]);
      } catch (error) {
        job.retryCount++;

        if (job.retryCount < job.retryLimit) {
          if (options.retryDelay) {
            await sleep(options.retryDelay * 1000);
          }
          await handleWithRetry(job);
        } else {
          throw error;
        }
      }
    };

    await fireAndForget(handleWithRetry(job));

    return job.id;
  }

  async cancelJob() {
    // The async driver does need to stop a job
    return Promise.resolve();
  }
}
