import { QueueJobName, SpreadsheetImportRowStatus } from "@prisma/client";
import { map } from "bluebird";
import { array, number, object } from "yup";
import { type AppContext } from "~/lib/context";
import { makeSingletonKey } from "~/lib/makeSingletonKey";
import { BaseJobDataSchema } from "~/lib/queue/baseJobDataSchema";
import { sendJob } from "~/lib/queue/sendJob";
import { getId, type YupOutputType } from "~/lib/utils";
import { findImportRows } from "~/services/external-employee/import/findImportRows";
import { findNextImportRowIdsToImport } from "~/services/external-employee/import/findNextImportRowIdsToImport";
import { importInitialRows } from "~/services/external-employee/import/process/initial/importInitialRows";
import { importPartialRows } from "~/services/external-employee/import/process/partial/importPartialRows";
import { updateImportRow } from "~/services/external-employee/import/updateImportRow";
import { sendPostImportSpreadsheetJob } from "~/workers/import/postImportSpreadsheet";

export const IMPORT_SPREADSHEET_ROW_BATCH_SIZE = 50;

const ImportInitialExternalEmployeeSpreadsheetRowJobDataSchema = BaseJobDataSchema.concat(
  object({
    companyId: number().required(),
    importId: number().required(),
    rowIds: array().of(number().required()).required(),
  })
);

export type ImportInitialExternalEmployeeSpreadsheetRowJobData = YupOutputType<
  typeof ImportInitialExternalEmployeeSpreadsheetRowJobDataSchema
>;

export const importSpreadsheetRowWorkerService = async (
  ctx: AppContext,
  data: ImportInitialExternalEmployeeSpreadsheetRowJobData
) => {
  const { companyId, importId, rowIds } = data;

  const externalEmployeeImport = await ctx.prisma.externalEmployeeSpreadsheetImport.findFirstOrThrow({
    where: { id: data.importId },
  });

  const importRows = await findImportRows(ctx, {
    importId: importId,
    rowIds: rowIds,
    statuses: [SpreadsheetImportRowStatus.INIT],
  });

  if (externalEmployeeImport.spreadsheetTemplate === "INITIAL") {
    const { rowErrors, mapSuccessRowIdToExternalEmployeeId } = await importInitialRows(ctx, {
      rows: importRows,
      companyId,
      importId,
    });

    await map(
      rowErrors,
      ({ rowId, error }) =>
        markRowAsError(ctx, {
          rowId,
          errorMessage: error,
        }),
      { concurrency: 2 }
    );

    await map(
      mapSuccessRowIdToExternalEmployeeId,
      async ([rowId, externalEmployeeId]) => {
        await updateImportRow(ctx, {
          rowId: rowId,
          status: SpreadsheetImportRowStatus.IMPORTED,
          externalEmployeeId: externalEmployeeId,
        });
      },
      { concurrency: 2 }
    );
  }

  if (externalEmployeeImport.spreadsheetTemplate === "PARTIAL") {
    const { rowWarnings } = await importPartialRows(ctx, {
      rows: importRows,
      companyId,
      importId,
    });

    await map(
      rowWarnings,
      ({ rowId, warning }) =>
        markRowAsWarning(ctx, {
          rowId,
          warningMessage: warning,
        }),
      { concurrency: 2 }
    );

    await markRowsAsImported(ctx, importRows.map(getId));
  }

  const nextRowsIds = await findNextImportRowIdsToImport(ctx, {
    importId: importId,
    take: IMPORT_SPREADSHEET_ROW_BATCH_SIZE,
  });

  if (nextRowsIds.length > 0) {
    await sendImportSpreadsheetRowJob(ctx, {
      companyId: companyId,
      importId: importId,
      rowIds: nextRowsIds,
    });
  } else {
    await sendPostImportSpreadsheetJob(ctx, {
      companyId: companyId,
      importId: importId,
    });
  }
};

export const sendImportSpreadsheetRowJob = async (
  ctx: AppContext,
  data: ImportInitialExternalEmployeeSpreadsheetRowJobData
) => {
  await sendJob(ctx, {
    jobName: QueueJobName.IMPORT_SPREADSHEET_ROW,
    data,
    options: {
      singletonKey: makeSingletonKey(
        {
          companyId: data.companyId,
          importId: data.importId,
          rowIds: data.rowIds.join("|"),
        },
        { unique: false }
      ),
    },
  });
};

const markRowsAsImported = async (ctx: AppContext, rowIds: number[]) => {
  await map(
    rowIds,
    async (rowId) => {
      await updateImportRow(ctx, {
        rowId: rowId,
        status: SpreadsheetImportRowStatus.IMPORTED,
      });
    },
    { concurrency: 2 }
  );
};

const markRowAsError = async (ctx: AppContext, params: { rowId: number; errorMessage: string }) => {
  return await updateImportRow(ctx, {
    rowId: params.rowId,
    status: SpreadsheetImportRowStatus.ERROR,
    message: params.errorMessage,
  });
};

const markRowAsWarning = async (ctx: AppContext, params: { rowId: number; warningMessage: string }) => {
  return await updateImportRow(ctx, {
    rowId: params.rowId,
    status: SpreadsheetImportRowStatus.WARNING,
    message: params.warningMessage,
  });
};
