import { defer, merge, Observable, of, Subject, throwError } from 'rxjs';
import { catchError, concatMap, debounce, filter, finalize, mergeMap, scan, take, takeUntil, takeWhile, tap } from 'rxjs/operators';
import { GuidHelper } from '../../../../helpers/guidHelper';
import { IDestroyable } from "../../lifecycle/models/IDestroyable";
import { afterSubscribe } from '../../rxjs/operators/after-subscribe';

/** Représente les paramètres requis pour lancer une exécution. */
interface IExecutionParams<T extends IDestroyable, U extends any[], V extends Observable<any>> {
	/** Méthode à exécuter */
	originalMethod: (...paArgs: U) => V;
	/** Objet source depuis lequel est exécuté l'observable. */
	target: T;
	/** Identifiant de la file. */
	id: string;
	/** Arguments passé lors de l'exécution de la méthode. */
	arguments: U;
	/** Identifiant unique de l'exécution. */
	guid: string;
}

/** Représente le résultat d'une exécution. */
interface IResult {
	/** Identifiant unique de l'exécution. */
	guid: string;
	result: any;
}

/** Représente l'erreur sur une exécution. */
interface IError {
	/** Identifiant unique de l'exécution. */
	guid: string;
	error: any;
}

/** Représente tous les sujets nécessaires à une file. */
interface IQueueSubjects<T extends IDestroyable, U extends any[], V extends Observable<any>> {
	executionSubject: Subject<IExecutionParams<T, U, V>>;
	resultSubject: Subject<IResult>;
	errorSubject: Subject<IError>;
	finalizeSubject: Subject<string>;
}

/** Paramétrage de la file. */
interface IQueueParams<T extends IDestroyable, U extends any[]> {
	/** Crée un identifiant de file pour ventiler les exécutions. */
	idBuilder?: (poThis: T, ...paArgs: U) => any;
	/** Permet de concaténer les paramètres en attente d'exécution.
	 * Si il est renseigné, on ne va pas mettre en attente toutes les demandes d'exécution, on va les fusionner au fur et à mesure.
	*/
	paramsReducer?: (paOldArgs: U, paArgs: U) => U;
	/** Condition de fermeture de la file d'attente. */
	shouldFinishQueue?: (poThis: T, ...paArgs: U) => boolean;
	/** Callback appelée lors de la fermeture de la file d'attente. */
	onQueueFinished?: (poThis: T) => void;
}

/** Permet de chaîner les appels à une méthode observable. Utiliser le snippet `queue`.
 * @param poParams
 */
export function Queue<S extends IDestroyable, T extends any[], U extends Observable<any>>(poParams?: IQueueParams<S, T>):
	(poTarget: S,
		psMethodName: string | symbol,
		poDescriptor: TypedPropertyDescriptor<(...paArgs: any[]) => U>
	) => TypedPropertyDescriptor<(...paArgs: any[]) => U> {

	return function (poTarget: S,
		psMethodName: string | symbol,
		poDescriptor: TypedPropertyDescriptor<(...paArgs: any[]) => U>
	): TypedPropertyDescriptor<(...paArgs: any[]) => U> {
		const lfOriginalMethod: (...paArgs: any[]) => U = poDescriptor.value; // On sauvegarde l'ancienne implémentation de la méthode.
		const loSubjectsByTarget = new Map<S, Map<string, IQueueSubjects<S, T, U>>>();

		poDescriptor.value = function (): U {
			const loArgument: T = arguments as any as T;
			const loTarget: S = this;
			const lsId: string = poParams?.idBuilder ? poParams?.idBuilder(poTarget, ...loArgument) : psMethodName; // On construit l'identifiant de la file d'attente.
			const lsGuid: string = GuidHelper.newGuid(); // On génère un Guid qui servira à identifier l'éxecution.
			let loSubjectsById: Map<string, IQueueSubjects<S, T, U>>;

			if (loSubjectsByTarget.has(loTarget))
				loSubjectsById = loSubjectsByTarget.get(loTarget);
			else
				loSubjectsByTarget.set(loTarget, loSubjectsById = new Map<string, IQueueSubjects<S, T, U>>());

			const loSubjects: IQueueSubjects<S, T, U> = getSubjectsForId(loTarget, lsId, loSubjectsById, poParams); // On récupère les sujets de la file d'attente.

			return defer(() => {
				return merge( // On se met en écoute des changements retournés par notre file d'éxecution.
					loSubjects.resultSubject.asObservable().pipe(filter((poResult: IResult) => poResult.guid === lsGuid)),
					loSubjects.errorSubject.asObservable().pipe(
						filter((poError: IError) => poError.guid === lsGuid), mergeMap((poError: IError) => throwError(poError.error))
					)
				).pipe(
					// On lance l'éxecution
					afterSubscribe(() => loSubjects.executionSubject.next({ target: loTarget, id: lsId, originalMethod: lfOriginalMethod, arguments: loArgument, guid: lsGuid })),
					takeUntil(loSubjects.finalizeSubject.asObservable().pipe(filter((psGuid: string) => psGuid === lsGuid), take(1))));
			}) as U;
		};

		return poDescriptor;
	};
}

function getSubjectsForId<T extends IDestroyable, U extends any[], V extends Observable<any>>(
	poTarget: T,
	psId: string,
	loSubjectsByTargetRef: Map<string, IQueueSubjects<T, U, V>>,
	poParams: IQueueParams<T, U>
): IQueueSubjects<T, U, V> {
	if (loSubjectsByTargetRef.has(psId))
		return loSubjectsByTargetRef.get(psId);

	poTarget.destroyed$.subscribe(() => {
		const loSubjects: IQueueSubjects<T, U, V> = getSubjectsForId(poTarget, psId, loSubjectsByTargetRef, poParams);

		loSubjects.resultSubject.complete();
		loSubjects.errorSubject.complete();
		loSubjects.executionSubject.complete();
		loSubjects.finalizeSubject.complete();

		loSubjectsByTargetRef.delete(psId);
	});

	const loSubjects: IQueueSubjects<T, U, V> = {
		executionSubject: new Subject<IExecutionParams<T, U, V>>(),
		resultSubject: new Subject<IResult>(),
		errorSubject: new Subject<IError>(),
		finalizeSubject: new Subject<string>()
	};

	initExecution(poTarget, loSubjects, poParams);

	loSubjectsByTargetRef.set(psId, loSubjects);

	return loSubjects;
}

function initExecution<T extends IDestroyable, U extends any[], V extends Observable<any>>(
	poTarget: T,
	poSubjects: IQueueSubjects<T, U, V>,
	poParams?: IQueueParams<T, U>
): void {
	let loPrevious: IExecutionParams<T, U, V>;
	let lbRunning = false;
	let lsRunningGuid: string;

	poSubjects.executionSubject.asObservable() // On initialise notre file d'éxecution.
		.pipe(
			takeWhile((poExecParams: IExecutionParams<T, U, V>) => // Tant qu'on n'a pas de condition nous demandant d'arrêter l'éxecution de cette file, on continue.
				poParams?.shouldFinishQueue ? !poParams?.shouldFinishQueue(poExecParams.target, ...poExecParams.arguments) : true
			),
			scan((poAccParams: IExecutionParams<T, U, V>, poNewParams: IExecutionParams<T, U, V>) => {
				if (poParams?.paramsReducer)
					poNewParams.arguments = poParams.paramsReducer(poAccParams.arguments, poNewParams.arguments);

				return poNewParams;
			}),
			debounce((poExecParams: IExecutionParams<T, U, V>) => {
				if (!lbRunning || !poParams?.paramsReducer) // Si pas déjà en cours d'exécution ou si pas de reducer de paramètres on passe, car sinon on aura de la perte de données.
					return of(lbRunning = true);

				if (loPrevious)
					poSubjects.finalizeSubject.next(loPrevious.guid);

				loPrevious = poExecParams;
				return poSubjects.finalizeSubject.asObservable().pipe(filter((psGuid: string) => psGuid === lsRunningGuid), take(1));// Sinon on attend la fin de l'action précédente.
			}),
			concatMap((poExecParams: IExecutionParams<T, U, V>) => {
				lsRunningGuid = poExecParams.guid;

				return execOriginalMethod<T, U, V>(poExecParams, poSubjects)
					.pipe(finalize(() => {
						lbRunning = false;
						poSubjects.finalizeSubject.next(poExecParams.guid);
					}));
			}
			),
			finalize(() => {
				if (poParams?.onQueueFinished)
					poParams.onQueueFinished(poTarget);
			})
		).subscribe();
}

function execOriginalMethod<T extends IDestroyable, U extends any[], V extends Observable<any>>(
	poExecParams: IExecutionParams<T, U, V>,
	poSubjects: IQueueSubjects<T, U, V>): V {
	return poExecParams.originalMethod.apply(poExecParams.target, poExecParams.arguments)
		.pipe(
			tap((poResult: any) => poSubjects.resultSubject.next({ guid: poExecParams.guid, result: poResult })),
			catchError((poError: any) => {
				poSubjects.errorSubject.next({ guid: poExecParams.guid, error: poError });
				return of(null);
			})
		);
}
