import { Observable, Subscription } from 'rxjs';
import { Logger, CommandService } from 'flux-core';
import { NeutrinoConnection } from '../connection/neutrino-connection.svc';
import { Injectable } from '@angular/core';
import { filter, tap } from 'rxjs/operators';

/**
 * NeutrinoCollaboration
 */
@Injectable()
export class NeutrinoRealtime {
    /**
     * messageSubscription is the subscription which listens to collab commands.
     */
    protected messageSubscription: Subscription;

    constructor(
        protected logger: Logger,
        protected conn: NeutrinoConnection,
        protected commands: CommandService,
    ) { }

    /**
     * initialize starts listening to collab messages.
     */
    public start() {
        if ( !this.messageSubscription ) {
            this.messageSubscription = this.listenToCollabMessages().subscribe();
        }
    }

    /**
     * listenToCollabMessages listens to command messages on in stream and triggers
     * the collaboration scenario on the command service.
     */
    protected listenToCollabMessages(): Observable<any> {
        return this.conn.inStream.pipe(
            filter( msg => msg.type === 'cmd' ),
            tap( msg => this.commands.collab( msg.msg, msg.rid, msg.pld, msg.ppld ).subscribe()),
        );
    }
}
