import { QueueJobStatus } from "@prisma/client";
import { isNumber, omit } from "lodash";
import packageJson from "~/../package.json";
import { BusinessLogicError } from "~/lib/api";
import { cloneContext } from "~/lib/cloning";
import { type AppContext } from "~/lib/context";
import tracer from "~/lib/datadog/tracing";
import { initContext } from "~/lib/init-context";
import { logError, logInfo, logWarn } from "~/lib/logger";
import { type BaseJobData } from "~/lib/queue/base-job-data-schema";
import { type JobPayload, type WorkerParams } from "~/lib/queue/get-workers";
import { updateQueueJobStatus } from "~/lib/queue/update-queue-job-status";
import { getServiceAccountUserId, sleep } from "~/lib/utils";
import { AuthenticatedUserIncludes } from "~/services/auth/authenticated-user-includes";
import { fetchRequiredAuthenticatedUser } from "~/services/auth/fetch-authenticated-user";

export const makeHandler =
  <T extends BaseJobData>(baseCtx: AppContext, params: WorkerParams<T>) =>
  async (job: JobPayload<T>) => {
    const ctx = cloneContext(baseCtx);

    const jobForLogging = { ...job, data: omit(job.data, "rows") };

    const baseQueueJobParams = {
      name: params.jobName,
      singletonKey: job.singletonkey as string,
      externalJobId: job.id,
    };

    logInfo(ctx, "[worker] Starting job", { job: jobForLogging, name: params.jobName });

    if (!isNumber(job.data.companyId)) {
      throw new BusinessLogicError(`Missing companyId for job ${params.jobName}`);
    }

    try {
      const serviceAccount = await fetchRequiredAuthenticatedUser(ctx, { userId: getServiceAccountUserId() });

      await initContext(ctx, {
        ...serviceAccount,
        companyId: job.data.companyId,
        company: await ctx.prisma.company.findUniqueOrThrow({
          where: { id: job.data.companyId },
          include: AuthenticatedUserIncludes["company"]["include"],
        }),
      });

      const tags = {
        "job.name": params.jobName,
        "job.payload": omit(job.data, "rows"),
        "span.kind": "server",
        "user.id": serviceAccount.id,
      };

      await tracer.trace(
        "worker.tracing",
        { tags, service: `${packageJson.name}.worker`, resource: params.jobName },
        async () => {
          await updateQueueJobStatus(ctx, { ...baseQueueJobParams, status: QueueJobStatus.IN_PROGRESS });

          await params.service(ctx, job.data);
        }
      );

      // This allows datadog to flush traces before process exits (maybe)
      await sleep(1000);

      logInfo(ctx, "[worker] Job completed successfully", { job: jobForLogging });

      await updateQueueJobStatus(ctx, { ...baseQueueJobParams, status: QueueJobStatus.COMPLETED });
    } catch (error) {
      logError(ctx, "[worker] Unexpected error while processing job", { error, job: jobForLogging });

      if (job.retrycount < job.retrylimit) {
        logInfo(ctx, "[worker] Job failed but a retry will occur", { job: jobForLogging });
      } else {
        logWarn(ctx, "[worker] Job failed and the retry limit was reached", { job: jobForLogging });
        await updateQueueJobStatus(ctx, { ...baseQueueJobParams, status: QueueJobStatus.FAILED });
      }

      // We need to raise an error so that PgBoss correctly update the status of a job (i.e. to "failed").
      throw error;
    }
  };
