import { Injectable, Inject } from '@angular/core';
import { Observable, empty, forkJoin, of, Subscription } from 'rxjs';
import { Logger, CommandService, ISavedModelChange, IUnsavedModelChange, StateService, EventManagerService } from 'flux-core';
import { NeutrinoConnection } from 'flux-connection';
import { MessageFactory } from 'flux-connection';
import { IMessageJson } from 'flux-connection';
import { AbstractAuthentication } from 'flux-connection';
import { AbstractSubscriptionService } from '../framework/abstract-subscription.svc';
import { IModelSubscription, SubscriptionStatus } from '../framework/subscription-type';
import { DataSync } from 'flux-store';
import { defer, concat, from } from 'rxjs';
import { take, switchMap, mergeMap, filter, tap } from 'rxjs/operators';
import { chunkArray } from '@creately/array-chunk-by-size';

/**
 * ModelSubscriptionService
 * ModelSubscriptionService is the subscription service which handles model
 * type subscriptions.
 */
@Injectable()
export class ModelSubscriptionService extends AbstractSubscriptionService {
    /**
     * The size to chunk changes in bytes
     */
    protected chunkSize: number = 32 * 1024;

    /**
     * subscriptions
     * subscriptions is a map of topic => subscription instances.
     */
    protected subscriptions: { [topic: string]: IModelSubscription };

    /**
     * This holds all the subscriptions
     */
    protected subs: Subscription[] = [];

    /**
     * constructor
     * constructor initializes subscription maps and starts listening
     * to connection events. It will call handlers when connection status
     * changes.
     */
    constructor(
        @Inject( Logger ) protected logger: Logger,
        @Inject( NeutrinoConnection ) protected conn: NeutrinoConnection,
        @Inject( CommandService ) protected commands: CommandService,
        @Inject( AbstractAuthentication ) protected authentication: AbstractAuthentication,
        @Inject( MessageFactory ) protected messages: MessageFactory,
        @Inject( DataSync ) protected dataSync: DataSync,
        @Inject( EventManagerService ) protected eventManagerService: EventManagerService,
        @Inject( StateService ) protected stateSvc: StateService<any, any>,
    ) {
        super( logger );
    }

    /**
     * startSubscriptions starts a registered subscriptions. It will send
     * sub.start messages to Neutrino for all subscriptions.
     */
    protected startSubscription( sub: IModelSubscription, pld?: {[key: string]: any }): Observable<any> {
        this.logger.debug( `SubscriptionService: starting subscription: ${sub.topic}` );
        const subStart = defer(() => sub.getLatestChangeId().pipe( take( 1 ))).pipe(
            switchMap( changeId => this.sendSubStartRequest( sub, changeId, pld )),
            switchMap( message => this.handleSubStartResponse( sub, message )),
        );
        const subSync = defer(() => sub.getUnsavedChanges().pipe( take( 1 ))).pipe(
            filter( changes => changes.length > 0 ),
            switchMap( input => from( chunkArray({ input, bytesSize: this.chunkSize }))),
            mergeMap( changes => this.sendSubSyncRequest( sub, changes as IUnsavedModelChange[])),
            mergeMap( message => this.handleSubSyncResponse( sub, message )),
        );
        this.listenToStreamMessages( sub );
        return concat( subStart, subSync );
    }

    /**
     * closeSubscription closes a subscriptions. It will send a sub.end message.
     */
    protected closeSubscription( sub: IModelSubscription, sendMessage: boolean ): Observable<any> {
        this.logger.debug( `SubscriptionService: closing subscription: ${sub.topic}` );
        this.removeSubscriptions();
        if ( sendMessage ) {
            return this.sendSubEndRequest( sub ).pipe(
                tap({
                    complete: () => sub.updateStatus( SubscriptionStatus.completed ),
                }));
        } else {
            sub.updateStatus( SubscriptionStatus.completed );
            return empty();
        }
    }

    /**
     * sendSubStartRequest sends the sub.start message to Neutrino for model type subscriptions.
     */
    protected sendSubStartRequest(
        sub: IModelSubscription, changeId: string, pld?: {[key: string]: any}): Observable<IMessageJson> {

        const currentUserId: string = this.stateSvc.get( 'CurrentUser' );
        const trackEvent:  boolean = sub.subscription === 'Diagram' &&
                this.eventManagerService.shouldTrackSubscribedEvent( sub.subscription, currentUserId, sub.resourceId );
        const message = this.messages.createSubscriptionMessage( 'start', sub.subscription, sub.resourceId );
        message.authToken = this.authentication.token;
        if ( changeId ) {
            message.payload = { changeId: changeId };
        } else {
            message.payload = {};
        }

        // If pld is passed through the args, Merge it
        if ( pld ) {
            message.payload = { ...message.payload, ...pld };
        }

        if ( trackEvent ) {
            ( message.payload as any ).track = true;
            this.eventManagerService.setLastLoggedSubscription( sub.subscription, currentUserId, sub.resourceId );
        }
        return this.conn.send( message );
    }

    /**
     * sendSubEndRequest sends the sub.end message to Neutrino for model type subscriptions.
     */
    protected sendSubEndRequest( sub: IModelSubscription ): Observable<IMessageJson> {
        const message = this.messages.createSubscriptionMessage( 'end', sub.subscription, sub.resourceId );
        message.authToken = this.authentication.token;
        message.payload = {};
        return this.conn.send( message );
    }

    /**
     * sendSubSyncRequest sends the sub.sync message to Neutrino for model type subscriptions.
     */
    protected sendSubSyncRequest( sub: IModelSubscription, changes: IUnsavedModelChange[]): Observable<IMessageJson> {
        const message = this.messages.createSubscriptionMessage( 'sync', sub.subscription, sub.resourceId );
        message.authToken = this.authentication.token;
        message.payload = { changes };
        return this.conn.send( message );
    }

    /**
     * handleSubStartResponse handles sub.start response messages.
     */
    protected handleSubStartResponse( sub: IModelSubscription, msg: IMessageJson ): Observable<any> {
        const payload: any = msg.ppld;
        let obs: Observable<any>;
        if ( payload.model || payload.modelStatic ) {
            /**
             * NOTE:
             * The command which creates the model with sub.modelType is not known to the subscription
             * service. Use a special 'SubStoreModels' command which accepts the model type from data.
             */
            const resultData = { type: sub.modelType, data: <any>{} };
            if ( payload.model ) {
                resultData.data.model = payload.model;
            }
            if ( payload.modelStatic ) {
                resultData.data.modelStatic = payload.modelStatic;
            }
            if ( payload.historyMeta ) {
                resultData.data.historyMeta = payload.historyMeta;
            }
            obs = this.commands.collab( 'SubStoreModels', msg.rid, null, resultData );
        } else {
            obs = of( null );
        }
        if ( payload.changes ) {
            obs = obs.pipe(
                switchMap(() => this.applyChanges( sub.modelType, msg.rid, payload.changes )),
            );
        }
        return obs;
    }

    /**
     * handleSubSyncResponse handles sub.sync response messages.
     */
    protected handleSubSyncResponse( sub: IModelSubscription, msg: IMessageJson ): Observable<any> {
        const payload: any = msg.ppld;
        const observables: Observable<any>[] = [];
        if ( payload.accepted ) {
            observables.push( this.applyChanges( sub.modelType, msg.rid, payload.accepted ));
        }
        if ( payload.rejected && payload.rejected.length ) {
            /**
             * NOTE:
             * The command which creates the model with sub.modelType is not known to the subscription
             * service. Use a special 'SubRemoveChanges' command which accepts the model type from data.
             */
            const resultData = { type: sub.modelType, changeIds: payload.rejected };
            const obs = this.commands.collab( 'SubRemoveChanges', msg.rid, null, resultData );
            observables.push( obs );
        }
        if ( !observables.length ) {
            return empty();
        }
        return forkJoin( observables );
    }

    /**
     * Applies changes received from the server to the local storage.
     * @param modelType - type of the model the change applies to
     * @param changes - an array of changes
     */
    protected applyChanges( modelType: any, modelId: string, changes: ISavedModelChange[]): Observable<any> {
        if ( changes && changes.length > 0 ) {
            // NOTE: When collaborating, there are cases where the changes comes as empty json object.
            // To avoid exception thrown by localforage as "IDBTransaction put without key" we filter
            // objects which has the key / id.
            changes = changes.filter( change => change.id );
            changes.map( change => {
                change.modelId = modelId;
                change.status = 'saved';
                return change;
            });
            return this.dataSync.applyChanges( modelType, changes );
        }
        return empty();
    }

    /**
     * This will receive the subscription message from neutrino and handles
     * the sync response.
     */
    private listenToStreamMessages( sub: IModelSubscription ) {
        this.subs.push( this.conn.inStream.pipe(
            filter( msg => msg.type === 'sub' && msg.msg === 'update' ),
            switchMap( message => this.handleSubSyncResponse( sub, message )),
        ).subscribe());
    }

    /**
     * Unsubscribe from all subscriptions.
     */
    private removeSubscriptions() {
        while ( this.subs.length > 0 ) {
            this.subs.pop().unsubscribe();
        }
    }
}
