import { QueueJobStatus } from "@prisma/client";
import { mapSeries } from "bluebird";
import { randomUUID } from "crypto";
import packageJson from "~/../package.json";
import { cloneContext } from "~/lib/cloning";
import { type AppContext } from "~/lib/context";
import tracer from "~/lib/datadog/tracing";
import { BusinessLogicError } from "~/lib/errors/businessLogicError";
import { initContext } from "~/lib/initContext";
import { isNumber, omit } from "~/lib/lodash";
import { CORRELATION_ID_KEY, logError, logInfo, logWarn } from "~/lib/logger";
import { type BaseJobData } from "~/lib/queue/baseJobDataSchema";
import { type JobPayload, type WorkerParams } from "~/lib/queue/getWorkers";
import { updateQueueJobStatus } from "~/lib/queue/updateQueueJobStatus";
import { getServiceAccountUserId, sleep } from "~/lib/utils";
import { AuthenticatedUserIncludes } from "~/services/auth/authenticatedUserIncludes";
import { fetchRequiredAuthenticatedUser } from "~/services/auth/fetchAuthenticatedUser";

export const makeHandler =
  <T extends BaseJobData>(baseCtx: AppContext, params: WorkerParams<T>) =>
  async (jobs: JobPayload<T>[]) => {
    type JobDataWithCorrelationId = T & { correlationId?: string };

    return mapSeries(jobs, async (job) => {
      const ctx = cloneContext(baseCtx);

      // forced to do that because of the shitty yup inference type
      const jobDatawithMetadataIncluded = job.data as JobDataWithCorrelationId;
      const jobData = omit(job.data, "correlationId") as T;

      const correlationId =
        jobDatawithMetadataIncluded.correlationId ?? ctx.metadata.get(CORRELATION_ID_KEY) ?? randomUUID();
      ctx.metadata.set(CORRELATION_ID_KEY, correlationId);

      const jobForLogging = { ...job, data: omit(jobData, ["rows", "values"]) };

      const baseQueueJobParams = {
        name: params.jobName,
        singletonKey: job.singletonKey as string,
        externalJobId: job.id,
      };

      logInfo(ctx, "[worker] Starting job", { job: jobForLogging, name: params.jobName });

      if (!isNumber(jobData.companyId)) {
        throw new BusinessLogicError(`Missing companyId for job ${params.jobName}`);
      }

      try {
        const serviceAccount = await fetchRequiredAuthenticatedUser(ctx, { userId: getServiceAccountUserId() });

        await initContext(ctx, {
          ...serviceAccount,
          companyId: jobData.companyId,
          company: await ctx.prisma.company.findUniqueOrThrow({
            where: { id: jobData.companyId },
            include: AuthenticatedUserIncludes["company"]["include"],
          }),
        });

        const tags = {
          "correlation_id": correlationId,
          "job.name": params.jobName,
          "job.payload": omit(jobData, ["rows", "values"]),
          "span.kind": "server",
          "user.id": serviceAccount.id,
        };

        await tracer.trace(
          "worker.tracing",
          { tags, service: `${packageJson.name}.worker`, resource: params.jobName },
          async () => {
            await updateQueueJobStatus(ctx, { ...baseQueueJobParams, status: QueueJobStatus.IN_PROGRESS });

            await params.service(ctx, jobData);
          }
        );

        // This allows datadog to flush traces before process exits
        await sleep(1000);

        logInfo(ctx, "[worker] Job completed successfully", { job: jobForLogging });

        await updateQueueJobStatus(ctx, {
          ...baseQueueJobParams,
          status: QueueJobStatus.COMPLETED,
          output: job.output,
        });
      } catch (error) {
        logError(ctx, "[worker] Unexpected error while processing job", { error, job: jobForLogging });

        if (job.retryCount < job.retryLimit) {
          logInfo(ctx, "[worker] Job failed but a retry will occur", { job: jobForLogging });
        } else {
          logWarn(ctx, "[worker] Job failed and the retry limit was reached", { job: jobForLogging });
          await updateQueueJobStatus(ctx, {
            ...baseQueueJobParams,
            status: QueueJobStatus.FAILED,
            output: job.output,
          });
        }

        // We need to raise an error so that PgBoss correctly update the status of a job (i.e. to "failed").
        throw error;
      } finally {
        ctx.metadata.delete(CORRELATION_ID_KEY);

        if (global.gc) {
          global.gc();
          logInfo(ctx, "[worker] Garbage collected");
          return;
        }

        logWarn(
          ctx,
          "[worker] Garbage collection unavailable, pass --expose-gc when launching node to enable forced garbage collection"
        );
      }
    });
  };
