import { Observable, concat, defer, from } from 'rxjs';
import { Injectable } from '@angular/core';
import { Selector, Database, Collection } from '@creately/rxdata';
import { AbstractModel, ModelChange } from 'flux-core';
import { clone, uniq } from 'lodash';
import { DataStore } from './data-store.svc';
import { map, take, ignoreElements } from 'rxjs/operators';

/**
 * DataSync
 * DataSync service is responsible for performing change related
 * operations on models. Model changes will be handled by this service.
 */
@Injectable()
export class DataSync {
    constructor( protected database: Database, protected dataStore: DataStore ) {
        // ...
    }

    /**
     * findLatestChangeId
     * findLatestChangeId subscribes to the latest change id of the model.
     */
    public findLatestChangeId( type: typeof AbstractModel, id: string ): Observable<string> {
        return this.dataStore.findOneRaw( type, { id }).pipe(
            map( mdl => mdl.lastChangeId ),
        );
    }

    /**
     * findLatestChangeIds
     * findLatestChangeIds subscribes to a map of model ids => latest change ids
     * for models which match the selector. The observable will re-emit when the models change.
     */
    public findLatestChangeIds( type: typeof AbstractModel, selector: Selector ): Observable<{ [id: string]: string }> {
        return this.dataStore.findRaw( type, selector ).pipe(
            map( models => {
                const objectMap: { [id: string]: string } = {};
                for ( let i = 0; i < models.length; i++ ) {
                    const mdl = models[i];
                    objectMap[mdl.id] = mdl.lastChangeId;
                }
                return objectMap;
            }),
        );
    }

    /**
     * findChanges
     * findChanges subscribes to all matching changes for a model type.
     */
    public findChanges( type: typeof AbstractModel, selector: Selector ): Observable<ModelChange[]> {
        const collection = this.getChangeCollection( type );
        return collection.find( selector );
    }

    /**
     * findSavedChanges
     * findSavedChanges subscribes to all matching changes for a model type which
     * are already saved on the server. These changes have a savedTime field.
     */
    public findSavedChanges( type: typeof AbstractModel, selector: Selector ): Observable<ModelChange[]> {
        const selectSaved = { $and: [ selector, { status: 'saved' }]};
        return this.findChanges( type, selectSaved );
    }

    /**
     * findUnsavedChanges
     * findUnsavedChanges subscribes to all matching changes for a model type which
     * are not saved on the server (or the client did not get the notification).
     */
    public findUnsavedChanges( type: typeof AbstractModel, selector: Selector ): Observable<ModelChange[]> {
        const selectUnsaved = { $and: [ selector, { status: 'unsaved' }]};
        return this.findChanges( type, selectUnsaved );
    }

    /**
     * storeChanges
     * storeChanges stores change objects in a collection associated with the given model type.
     */
    public storeChanges( type: typeof AbstractModel, changes: ModelChange[]): Observable<void> {
        const store = this.dataStore.getModelStore( type );
        const promise = store.insertChange( changes );
        return this.emptyObservable( promise );
    }

    /**
     * applyChanges
     * applyChanges stores change objects in a collection associated with the given model type.
     * It will then attempt to apply all synced changes wich are ready to be applied for the model.
     */
    public applyChanges( type: typeof AbstractModel, changes: ModelChange[]): Observable<void> {
        return concat(
            this.storeChanges( type, changes ),
            defer(() => {
                const ids = uniq( changes.map( change => change.modelId ));
                const promise = Promise.all( ids.map( id => this.applyPendingChanges( type, id )));
                return this.emptyObservable( promise );
            }),
        );
    }

    /**
     * removeChanges
     * removeChanges removes given changes from local cache. If a change fails to get applied
     * on the server, it should also be removed from the local cache.
     */
    public removeChanges( type: typeof AbstractModel, changeIds: string[]): Observable<void> {
        const collection = this.getChangeCollection( type );
        const promise = collection.remove({ id: { $in: changeIds }});
        return this.emptyObservable( promise );
    }

    /**
     * apply a change to the model directly.
     * this method can be used for the models which doesn't have a change collection.
     */
    public applyDirectChange( type: typeof AbstractModel, change: ModelChange ): Observable<void> {
        const selector = {
            id: change.modelId,
        };
        const promise = this.dataStore.update( type, selector, change.modifier ).toPromise();
        return this.emptyObservable( promise );
    }

    /**
     * applyPendingChanges
     * applyPendingChanges applies all pending (synced) changes for the model with given id.
     */
    protected async applyPendingChanges( type: typeof AbstractModel, modelId: string ): Promise<string> {
        const model = await this.dataStore.findOneRaw( type, { id: modelId }).pipe( take( 1 )).toPromise();
        if ( !model ) {
            return null;
        }
        const applyNext = async ( changeId: string ) => {
            const nextChangeId = await this.applyNextChange( type, modelId, changeId );
            if ( !nextChangeId ) {
                return changeId;
            }
            return await applyNext( nextChangeId );
        };
        return await applyNext( model.lastChangeId );
    }

    /**
     * applyNextChange
     * applyNextChange applies the next change for the model with given id. It will take
     * the change for the changeId, apply if it's available and return the next change id.
     */
    protected async applyNextChange( type: typeof AbstractModel, modelId: string, changeId: string ): Promise<string> {
        const collection = this.getChangeCollection( type );
        // The modelId field is not required for below query because changeId is unique.
        const selectNextChange = { modelId, previousId: changeId, status: 'saved' };
        const change = await collection.findOne( selectNextChange ).pipe( take( 1 )).toPromise();
        if ( !change?.modifier ) {
            return null;
        }
        const selectModel = { id: modelId, lastChangeId: changeId };
        const updateModel = clone( change.modifier );
        updateModel.$set = updateModel.$set || {};
        updateModel.$set.lastChangeId = change.id;
        updateModel.$set.lastUpdated = change.serverTime;
        updateModel.$set.updatedBy = change.userId;
        await this.dataStore.update( type, selectModel, updateModel ).toPromise();
        return change.id;
    }

    /**
     * getChangeCollection
     * getChangeCollection returns the database collection which contains model changes
     * for a particular model type. Appends '.changes' to model store collection name.
     */
    protected getChangeCollection( type: typeof AbstractModel ): Collection<any> {
        const store = this.dataStore.getModelStore( type );
        return store.changeCollection;
    }

    /**
     * emptyObservable
     * emptyObservable creates an empty observable with a promise.
     * The observable will complete when the promise resolves.
     */
    protected emptyObservable( promise: Promise<any> ): Observable<void> {
        return from( promise ).pipe( ignoreElements());
    }
}
