import { Subject, BehaviorSubject } from 'rxjs';
import * as socketClusterClient from 'socketcluster-client';

/**
 * Type of the object emitted to state change listeners
 */
export interface IStateChangeEvent {
    peerId: string;
    userId: string;
    topic: string;
    value: string;
}


/**
 * An enum which defines all possible states the connection to photon
 * server can be in.
 */
export enum PhotonConnectionStatus {

    /**
     * Disconnected from the server
     */
    OFFLINE,

    /**
     * Connected to the server, but not authenticated
     */
    CONNECTED,

    /**
     * Connected to the server and authenticated.
     * This means there is an active connection to photon and the peer
     * is authenticated.
     */
    ONLINE,

}

/**
 * Photon publish events.
 * These events are expected by the photon server.
 */
export enum PhotonPublishEvent {
    SUBSCRIBE = 'subscribe',
    UNSUBSCRIBE = 'unsubscribe',
}

/**
 * This uses realtime infrastrucure from socketcluster to communicate
 * with other users on the same channel.
 */
export class PhotonClient {

    /**
     * The realtime state key for viewport changes.
     */
    public static KEY_STATES = 'diagramStates';

    /**
     * A delimter between <userID> <topic> to generate a peerId.
     */
    private static PEER_DELIMITER = ' ';

    /**
     * Behaviorsubject which emits the status of the photon connection
     * as it changes.
     */
    public connectionStatus: BehaviorSubject<PhotonConnectionStatus>;

    /**
     * A subject to notifiy a peer has been added.
     * The added peers ID is emmited.
     */
    public addedPeer: Subject<string>;

    /**
     * A subject to notifiy a peer has been removed.
     * The removed peers ID is emmited.
     */
    public removedPeer: Subject<string>;

    /**
     * This contains the socket cluster server instance.
     */
    private photon: socketClusterClient.AGClientSocket | undefined;

    /**
     * Topics that this session is maintaining.
     * Emits on a new message recieved
     */
    private topicSubjects: {
        [key: string]: Subject<IStateChangeEvent>,
    };

    /**
     * current states by state key and user id
     */
    private states: {
        [key: string]: {
            [userId: string]: any;
        },
    };

    /**
     * This holds the gravity token name stored in
     * cookie or local storage.
     */
    private gravityCookieName = 'gravity_token';

    /**
     * Connect to the photon server on given websocket url.
     */
    constructor( protected photonUrl: any, protected userId: string, protected topic: string ) {
        // TODO: Add auth token.
        this.connectionStatus = new BehaviorSubject( PhotonConnectionStatus.OFFLINE as any );
        this.topicSubjects = {};
        this.states = {};
        this.addedPeer = new Subject();
        this.removedPeer = new Subject();
    }

    /**
     * Initializes the connection to the photon service.
     */
    public async initialize() {
        this.photon = socketClusterClient.create({
            path: '/socketcluster/',
            hostname: this.photonUrl.HOST_NAME,
            port: this.photonUrl.PORT,
        });
        this.authenticateClient();
    }

    /**
     * Connect to photon when there is a collab session starting.
     */
    public connect() {
        const currConnStatus = this.connectionStatus.value;
        if ( currConnStatus !== PhotonConnectionStatus.OFFLINE ) {
            return;
        }
        if ( !this.photon ) {
            throw new Error( 'Photon is not initalized before connecting' );
        } else {
            this.connectionStatus.next( PhotonConnectionStatus.CONNECTED );
        }
        this.initChannels();
    }

    /**
     * Disconnects when its only a single user collaborating
     */
    public disconnect() {
        const currConnStatus = this.connectionStatus.value;
        if ( currConnStatus === PhotonConnectionStatus.OFFLINE ) {
            return;
        }
        if ( this.photon ) {
            this.photon.disconnect();
        }
    }

    /**
     * Gracefully disconects from previous connections and
     * clears the slate for a new peering session.
     */
    public async reset() {
        if ( this.photon ) {
            // complete peer emission.
            await this.addedPeer.complete();
            await this.removedPeer.complete();

            // Broadcast the unsubscription to the server.
            this.broadcast( this.topic, PhotonPublishEvent.UNSUBSCRIBE );

            // Update the connection status as offline.
            this.connectionStatus.next( PhotonConnectionStatus.OFFLINE );

            // Unsubscribe from the channel.
            this.photon.unsubscribe( this.topic );

            // Close the socket connection.
            this.photon.disconnect();

            for ( const key in Object.keys( this.topicSubjects )) {
                if ( this.topicSubjects[key]) {
                    await this.topicSubjects[key].complete();
                }
            }
        }
    }

    /**
     * Get peer state by key. A Key is like a sub-topic.
     * @param key
     */
    public getPeerState( key: string ): Subject<IStateChangeEvent> {
        if ( !this.topicSubjects[key]) {
            this.topicSubjects[key] = new Subject();
        }
        return this.topicSubjects[key];
    }

    /**
     * Get peer state by key. A Key is like a sub-topic.
     * @param key
     */
    public getPeerCurrentState( key: string ): {[userId: string]: any} {
        return this.states[key];
    }

    /**
     * Broadcast the payload to all the users on this topic.
     * Only broadcast if the client is online.
     * @param payload
     */
    public broadcast( key: string, value: string ) {
        if ( this.connectionStatus.value === PhotonConnectionStatus.ONLINE ) {
            const payload = { data: { key: key, value: value }, clientId: this.userId, name: this.topic };
            this.photon!.transmitPublish( this.topic, payload );
        }
    }

    /**
     * Get the instance of the photon server.
     */
    protected getClient() {
        return this.photon;
    }

    /**
     * This function will invoke the token exchange and
     * verify with the server. If the signed auth token
     * is NOT returned from the server, the photon client
     * will be disconnected and reset.
     */
    private authenticateClient() {
        const gravityToken = this.getGravityToken();
        if ( gravityToken ) {
            Promise.all([
                this.photon!.invoke( 'login', { token: gravityToken }),
                this.photon!.listener( 'authenticate' ).once(),
            ]).then( status => {
                if ( status[1] && !status[1].signedAuthToken ) {
                    this.reset();
                }
            }).catch(() => {
                this.reset();
            });
        } else {
            this.reset();
        }
    }

    /**
     * This function will return the gravity token by checking in
     * cache first and then check on local storage.
     */
    private getGravityToken() {
        const value = '; ' + document.cookie;
        const parts: Array<any> = value.split( '; ' + this.gravityCookieName + '=' );
        let cookieValue = '';
        if ( parts.length === 2 ) {
            cookieValue = parts.pop().split( ';' ).shift();
        }

        if ( cookieValue ) {
            return cookieValue;
        } else {
            return localStorage.getItem( this.gravityCookieName );
        }
    }

    /**
     * Subscribe to the topic and listen to messages from
     * server.
     */
    private initChannels() {
        // Subscribe to the topic.
        const listener = this.photon!.subscribe( this.topic );

        // Broadcast the subscription to the server.
        this.broadcast( this.topic, PhotonPublishEvent.SUBSCRIBE );

        // Update the photon connection status.
        this.connectionStatus.next( PhotonConnectionStatus.ONLINE );

        // Listen to the messages transmitted from the server.
        ( async () => {
            for await ( const message of listener ) {
                this.recieveData( message );
            }
        })();
    }

    /**
     * When a message is recieved, it emits to the correct subject
     */
    private recieveData( message: any ) {
        // Process the received messages from other clients.
        if ( message && message.data &&
                message.data.key &&
                message.data.value &&
                message.clientId !== this.userId ) {
            if ( message && message.data.value === PhotonPublishEvent.SUBSCRIBE ) {
                // Add the peer/client if newly subscribed to the topic.
                this.addPeer( message.clientId );
            } else if ( message && message.data.value === PhotonPublishEvent.UNSUBSCRIBE ) {
                // Remove the peer/client if unsubscribed to the topic.
                this.removePeer( message.clientId );
            } else {
                if ( message.data.key === PhotonClient.KEY_STATES ) {
                    const userStates = JSON.parse( message.data.value );
                    for ( const key in userStates ) {
                        const data = { ...message.data };
                        data.key = key;
                        data.value = userStates[key];
                        this.updateTopicSubjects({ ...message, data });
                    }
                } else {
                    this.updateTopicSubjects( message );
                }
            }
        }
    }

    private updateTopicSubjects( message: any ) {
        if ( !this.states[message.data.key]) {
            this.states[message.data.key] = {};
        }
        this.states[message.data.key][message.clientId] = message.data.value;
        if ( this.topicSubjects[message.data.key]) {
            // split the peerID to get topic and uid data
            const peerId = this.getPeerId( message.clientId );
            const evt: IStateChangeEvent = {
                peerId: peerId,
                userId: message.clientId,
                topic: message.name,
                value: message.data.value,
            };
            this.topicSubjects[message.data.key].next( evt );
        }
    }

    /**
     * Adds a peer, emits public
     * @param clientId - Peer Id
     */
    private async addPeer( clientId: string ) {
        this.addedPeer.next( clientId );
    }

    /**
     * Remove a peer, emits public
     * @param clientId - Peer Id
     */
    private removePeer( clientId: string ) {
        this.removedPeer.next( clientId );
    }

    /**
     * Gets the peerID for this session.
     * It is a combination of <userId> <documentId>
     */
    private getPeerId( userId?: string ): string {
        userId = userId ? userId : this.userId;
        return userId + PhotonClient.PEER_DELIMITER + this.topic;
    }

}
