import { Injectable } from '@angular/core';
import PCancelable from 'p-cancelable';
import { fromEvent, merge, Observable, throwError } from 'rxjs';
import { finalize, mergeMap, scan, take, takeUntil, tap } from 'rxjs/operators';
import { ArrayHelper } from '../../../../helpers/arrayHelper';
import { NumberHelper } from '../../../../helpers/numberHelper';
import { StoreHelper } from '../../../../helpers/storeHelper';
import { StringHelper } from '../../../../helpers/stringHelper';
import { Database } from '../../../../model/store/Database';
import { EReplicationStyle } from '../../../../model/store/ereplication-style.enum';
import { IStoreDocument } from '../../../../model/store/IStoreDocument';
import { IStoreReplicationOptions } from '../../../../model/store/IStoreReplicationOptions';
import { IStoreReplicationResponse } from '../../../../model/store/IStoreReplicationResponse';
import { NetworkService } from '../../../../services/network.service';
import { PerformanceManager } from '../../../performance/PerformanceManager';
import { ISyncronizationEvent } from '../../model/isyncronization-event';
import { IOnProgressFunction } from '../models/ion-progress-function';
import { ReplicatorBase } from '../models/replicator-base';

@Injectable()
export class ClassicReplicatorService extends ReplicatorBase {

	//#region FIELDS

	/** Valeur par défaut du timeout de réplication, appliqué uniquement aux réplications non live. */
	private readonly C_DEFAULT_REPLICATION_TIMEOUT_MILLISECONDS = 120000;
	private static readonly C_LOG_ID = "CLASSICREPLIC.S::";

	//#endregion

	//#region METHODS

	constructor(psvcNetwork: NetworkService) {
		super(psvcNetwork);
	}

	/** @override */
	public replicateAsync(
		poDatabase: Database,
		poSource: PouchDB.Database<{}>,
		poTarget: PouchDB.Database<{}>,
		poOptions?: IStoreReplicationOptions,
		pfOnProgress?: IOnProgressFunction
	): PCancelable<IStoreReplicationResponse> {
		return new PCancelable(async (pfResolve, pfReject, pfOnCancel) => {
			try {
				if (poOptions)
					poOptions.live = false;

				await this.handleNetworkAsync(poDatabase);

				pfResolve(await this.innerReplicateAsync(poDatabase, poOptions, poSource, poTarget, pfOnCancel, pfOnProgress));
			}
			catch (poError) {
				pfReject(poError);
			}
		});
	}

	/** @override */
	public replicateLive$(
		poDatabase: Database,
		poSource: PouchDB.Database<{}>,
		poTarget: PouchDB.Database<{}>,
		poOptions: IStoreReplicationOptions
	): Observable<IStoreReplicationResponse> {
		poOptions.live = true;

		return this.handleNetwork$(poDatabase).pipe(
			mergeMap(() => this.innerReplicate$(poOptions, poSource, poTarget))
		);
	}

	private innerReplicate$(
		poReplicateOptions: IStoreReplicationOptions,
		poSourceInstance: PouchDB.Database<{}>,
		poTargetInstance: PouchDB.Database<{}>
	): Observable<IStoreReplicationResponse> {
		const lbIsSyncToRemote: boolean = StoreHelper.isRemoteDatabase(poTargetInstance.name);

		poReplicateOptions = this.prepareReplicateOptions(poReplicateOptions, false, lbIsSyncToRemote);

		const loReplication: PouchDB.Replication.Replication<any> = poTargetInstance.replicate.from(poSourceInstance, poReplicateOptions);
		const loCompleteEvent$: Observable<PouchDB.Replication.ReplicationResultComplete<any>> = fromEvent(loReplication, "complete");
		const loComplete$: Observable<PouchDB.Replication.ReplicationResultComplete<any>> = loCompleteEvent$.pipe(take(1));

		const loError$: Observable<never> = this.replicateOnError(loReplication, loCompleteEvent$);
		const loChange$: Observable<PouchDB.Replication.ReplicationResult<any>> =
			fromEvent(loReplication, "change").pipe(takeUntil(loCompleteEvent$)) as Observable<PouchDB.Replication.ReplicationResult<any>>;

		return merge(loChange$, loComplete$, loError$).pipe(
			scan((poAccumulator: IStoreReplicationResponse, poCurrent: PouchDB.Replication.ReplicationResult<any>) =>
				this.innerReplicate_reduceResults(poAccumulator, poCurrent)
			),
			tap(
				() => { },
				poError => this.onReplicationError(poSourceInstance, poTargetInstance, poError)
			),
			finalize(() => loReplication.cancel())
		);
	}

	private replicateOnError(
		poReplication: PouchDB.Replication.Replication<any>,
		poCompleteEvent$: Observable<PouchDB.Replication.ReplicationResultComplete<any>>
	): Observable<never> {
		return fromEvent(poReplication, "error")
			.pipe(
				mergeMap(poError => throwError(poError)),
				takeUntil(poCompleteEvent$)
			);
	}

	private async innerReplicateAsync(
		poDatabase: Database,
		poReplicateOptions: IStoreReplicationOptions,
		poSourceInstance: PouchDB.Database,
		poTargetInstance: PouchDB.Database,
		pfOnCancel: (pfCallback: () => void) => void,
		pfOnProgress?: IOnProgressFunction,
	): Promise<IStoreReplicationResponse> {
		let lnOriginSeqNumber: number;
		const loPerformanceManager = new PerformanceManager;

		const lbIsSyncFromRemote: boolean = StoreHelper.isRemoteDatabase(poSourceInstance.name);
		const lbIsSyncToRemote: boolean = StoreHelper.isRemoteDatabase(poTargetInstance.name);

		poReplicateOptions = this.prepareReplicateOptions(poReplicateOptions, false, lbIsSyncToRemote);
		const loReplication: PouchDB.Replication.Replication<any> = poTargetInstance.replicate.from(poSourceInstance, poReplicateOptions);

		pfOnCancel(() => loReplication.cancel());

		let loReplicationResponse: IStoreReplicationResponse;
		loPerformanceManager.markStart();
		const lnTargetTotalSeqNumber: number = StoreHelper.getSequenceNumber((await poSourceInstance.info()).update_seq);
		if (lbIsSyncFromRemote)
			lnOriginSeqNumber = StoreHelper.getSequenceNumber(poDatabase.syncMarker?.remoteSequenceNumber ?? "0");
		else
			lnOriginSeqNumber = poDatabase.syncMarker?.localSequenceNumber ?? 0;

		return new Promise<IStoreReplicationResponse>((pfResolve: (poResponse: IStoreReplicationResponse) => void, pfReject: (poError: any) => void) => {
			loReplication.on("complete", (poReplicationResult: PouchDB.Replication.ReplicationResultComplete<IStoreDocument>) => {
				if (lbIsSyncFromRemote)
					poDatabase.isSynchronizingFromServer = undefined;
				else if (lbIsSyncToRemote)
					poDatabase.isSynchronizingToServer = undefined;

				console.debug(`${ClassicReplicatorService.C_LOG_ID}Réplication de la base ${poSourceInstance.name} vers ${poTargetInstance.name} terminée en ${loPerformanceManager.markEnd().measure()}ms.`);
				pfResolve(this.innerReplicate_reduceResults(loReplicationResponse, poReplicationResult));
			});

			loReplication.on("change", async (poReplicationResult: PouchDB.Replication.ReplicationResult<IStoreDocument>) => {
				const loISyncronizationEvent: ISyncronizationEvent = {
					loaded: StoreHelper.getSequenceNumber(poReplicationResult.last_seq) - lnOriginSeqNumber,
					total: lnTargetTotalSeqNumber - lnOriginSeqNumber
				};

				if (ArrayHelper.hasElements(poReplicationResult.docs)) {
					if (lbIsSyncFromRemote)
						poDatabase.isSynchronizingFromServer = loISyncronizationEvent;
					else if (lbIsSyncToRemote)
						poDatabase.isSynchronizingToServer = loISyncronizationEvent;
				}

				loReplicationResponse = this.innerReplicate_reduceResults(loReplicationResponse, poReplicationResult);

				if (pfOnProgress)
					await pfOnProgress(loISyncronizationEvent, loReplicationResponse);
			});

			loReplication.on("error", (poError: any) => {
				this.onReplicationError(poSourceInstance, poTargetInstance, poError);
				pfReject(poError);
			});
		});
	}

	private onReplicationError(poSourceInstance: PouchDB.Database<{}>, poTargetInstance: PouchDB.Database<{}>, poError: any): void {
		return console.error(`${ClassicReplicatorService.C_LOG_ID}Réplication de la base ${poSourceInstance.name} vers ${poTargetInstance.name} échouée.`, poError);
	}

	private innerReplicate_reduceResults(poAccumulator: IStoreReplicationResponse, poCurrent: PouchDB.Replication.ReplicationResult<any>)
		: IStoreReplicationResponse {
		const ldNow = new Date;

		if (!poAccumulator) {
			poAccumulator = {
				...poCurrent,
				start_time: new Date(poCurrent.start_time).toISOString(),
				end_time: ldNow.toISOString(),
				status: "ok"
			};
		}
		else {
			if (poCurrent.docs)
				poAccumulator.docs.push(...poCurrent.docs);

			poAccumulator.end_time = ldNow.toISOString();
		}

		return poAccumulator;
	}

	/** Modifie/Crée et retourne l'objet des options de réplication en mettant certaines valeurs prédéfinies.
	 * @param poReplicateOptions Objet d'options de réplication, peut être non défini.
	 * @param pbIsLiveReplication Indique si les options de réplication sont pour une réplication live ou non.
	 * @param pbIsToRemote Indique si la réplication est vers le serveur.
	 */
	private prepareReplicateOptions(poReplicateOptions: IStoreReplicationOptions, pbIsLiveReplication: boolean, pbIsToRemote: boolean): IStoreReplicationOptions {
		if (!poReplicateOptions)
			poReplicateOptions = {};

		if (StringHelper.isBlank(poReplicateOptions.style) && !pbIsToRemote) // Si le style n'est pas indiqué et que la réplication n'est pas vers le serveur, alors on utilise le style main_only
			poReplicateOptions.style = EReplicationStyle.mainOnly;

		if (!pbIsLiveReplication && poReplicateOptions.live !== true) {
			poReplicateOptions.live = false;

			// Si le 'timeout' n'est pas un nombre valide ou n'est pas à 'false' (type number|false) alors on met le 'timeout' par défaut.
			if (!NumberHelper.isValid(poReplicateOptions.timeout as number) && poReplicateOptions.timeout !== false)
				poReplicateOptions.timeout = this.C_DEFAULT_REPLICATION_TIMEOUT_MILLISECONDS;
		}
		else {
			poReplicateOptions.live = true;
			poReplicateOptions.retry = true;
		}

		return poReplicateOptions;
	}

	//#endregion
}
