import { QueueJobStatus } from "@prisma/client";
import packageJson from "~/../package.json";
import { type AppContext } from "~/lib/context";
import tracer from "~/lib/datadog/tracing";
import { initContext } from "~/lib/init-context";
import { logError, logInfo, logWarn } from "~/lib/logger";
import { type BaseJobData } from "~/lib/queue/base-job-data-schema";
import { type JobPayload, type WorkerParams } from "~/lib/queue/get-workers";
import { updateQueueJobStatus } from "~/lib/queue/update-queue-job-status";
import { AuthenticatedUserIncludes } from "~/services/auth/authenticated-user-includes";
import { fetchRequiredAuthenticatedUser } from "~/services/auth/fetch-authenticated-user";

export type JobMetadata = {
  userId: number;
  companyId: number;
};

export const makeHandler =
  <T extends BaseJobData>(ctx: AppContext, params: WorkerParams<T>) =>
  async (job: JobPayload<T & { meta: JobMetadata }>) => {
    const { jobName, service } = params;
    const baseQueueJobParams = {
      name: jobName,
      singletonKey: job.singletonkey as string,
      externalJobId: job.id,
    };

    logInfo(ctx, "[worker] Starting job", { job, name: params.jobName });

    try {
      const user = await fetchRequiredAuthenticatedUser(ctx, { userId: job.data.meta.userId });

      Object.assign(user, {
        companyId: job.data.meta.companyId,
        company: await ctx.prisma.company.findUniqueOrThrow({
          where: { id: job.data.meta.companyId },
          include: AuthenticatedUserIncludes["company"]["include"],
        }),
      });

      const context = await initContext(ctx, user);

      const tags = { "job.name": jobName, "job.payload": job.data, "span.kind": "server", "user.id": user.id };
      await tracer.trace(
        "worker.tracing",
        { tags, service: `${packageJson.name}.worker`, resource: jobName },
        async () => {
          await updateQueueJobStatus(ctx, { ...baseQueueJobParams, status: QueueJobStatus.IN_PROGRESS });

          await service(context, job.data);
        }
      );

      logInfo(ctx, "[worker] Job completed successfully", { job });

      await updateQueueJobStatus(ctx, { ...baseQueueJobParams, status: QueueJobStatus.COMPLETED });
    } catch (error) {
      logError(ctx, "[worker] Unexpected error while processing job", { error, job });

      if (job.retrycount < job.retrylimit) {
        logInfo(ctx, "[worker] Job failed but a retry will occur", { job });
      } else {
        logWarn(ctx, "[worker] Job failed and the retry limit was reached", { job });
        await updateQueueJobStatus(ctx, { ...baseQueueJobParams, status: QueueJobStatus.FAILED });
      }

      // We need to raise an error so that PgBoss correctly update the status of a job (i.e. to "failed").
      throw error;
    }
  };
