import { Injectable } from '@angular/core';
import { AbstractAuthentication, IMessageJson, MessageFactory, NeutrinoConnection } from 'flux-connection';
import { CommandService, Logger } from 'flux-core';
import { AbstractSubscriptionService } from 'flux-subscription';
import { defer, empty, forkJoin, Observable } from 'rxjs';
import { switchMap, take } from 'rxjs/operators';
import { CommentSub } from './comment.sub';

/**
 * CommentSubscriptionService
 * CommentSubscriptionService is the subscription service which handles comments.
 */
@Injectable()
export class CommentSubscriptionService extends AbstractSubscriptionService {
    /**
     * subscriptions
     * subscriptions is a map of diagramId => subscription instances.
     */
    protected subscriptions: { [diagramId: string]: any };

    /**
     * constructor
     * constructor initializes subscription maps and starts listening to
     * connection events. It will call handlers when connection status changes.
     */
    constructor(
        protected logger: Logger,
        protected messageFactory: MessageFactory,
        protected authentication: AbstractAuthentication,
        protected commandService: CommandService,
        protected neutrinoConnection: NeutrinoConnection,
    ) {
        super( logger );
    }

    /**
     * startSubscriptions starts a registered subscriptions. It will send
     * sub.start messages to Neutrino for all subscriptions.
     */
    protected startSubscription( sub: CommentSub ): Observable<unknown> {
        this.logger.debug( `SubscriptionService: starting subscription: ${sub.resourceId}` );
        const subStart = defer(() => sub.getLatestSyncedTime().pipe( take( 1 ))).pipe(
            switchMap( timestamp => this.sendSubStartRequest( sub, timestamp )),
            switchMap( message => this.handleSubStartResponse( sub, message )),
        );
        // TODO: implement syncing comments which are added while the user is offline.
        return subStart;
    }

    /**
     * closeSubscription closes a subscriptions. It will send a sub.end message.
     */
    protected closeSubscription( sub: CommentSub ): Observable<any> {
        this.logger.debug( `SubscriptionService: closing subscription: ${sub.resourceId}` );
        return this.sendSubEndRequest( sub );
    }

    /**
     * sendSubStartRequest sends the sub.start message to Neutrino for model type subscriptions.
     */
    protected sendSubStartRequest( sub: CommentSub, timestamp?: number ): Observable<IMessageJson> {
        const message = this.messageFactory.createSubscriptionMessage( 'start', sub.subscription, sub.resourceId );
        message.authToken = this.authentication.token;
        if ( timestamp ) {
            message.payload = { latestCommentTimestamp: timestamp };
        } else {
            message.payload = {};
        }
        return this.neutrinoConnection.send( message );
    }

    /**
     * sendSubEndRequest sends the sub.end message to Neutrino for model type subscriptions.
     */
    protected sendSubEndRequest( sub: CommentSub ): Observable<unknown> {
        const message = this.messageFactory.createSubscriptionMessage( 'end', sub.subscription, sub.resourceId );
        message.authToken = this.authentication.token;
        message.payload = {};
        return this.neutrinoConnection.send( message );
    }

    /**
     * handleSubStartResponse handles sub.start response messages.
     */
    protected handleSubStartResponse( sub: CommentSub, msg: IMessageJson ): Observable<unknown> {
        const payload: any = msg.ppld;
        const observables: Observable<any>[] = [];
        if ( payload.comments ) {
            const resultData = { type: sub.modelType, data: { model: payload.comments }};
            observables.push( this.commandService.collab( 'SubStoreModels', msg.rid, null, resultData ));
        }
        if ( !observables.length ) {
            return empty();
        }
        return forkJoin( observables );
    }
}
