import { Observable, Observer, Subject, timer } from 'rxjs';
import { isString, isEmpty } from 'lodash';
import { Logger, AppConfig, StateService, OTelManager } from 'flux-core';
import { ConnectionStatus } from './connection-status';
import { MessageSubscriberList } from '../message/message-subscriber-list';
import { Message } from '../message/message.mdl';
import { IMessageJson } from '../message/message-json';
import { MessageStatus } from '../message/message-status';
import { MessageResultError, InvalidMessageError } from '../message/message-error';
import { MessageType } from '../message/message-type';
import * as pako from 'pako';

/**
 * Expected heartbeat data from the server.
 */
export const HEART_BEAT: string = 'X';

/**
 * Enum for Internal Angular 2 Websocket Connection Status Codes
 * These codes are used by the angular2-websocket
 * for handling connection status.
 *
 * @export
 * @enum {number}
 */
export enum ConnectionStatusInternal {

    /**
     * Connection to the neutrino is not established
     */
    CONNECTION_NULL = -1,

    /**
     * Connecting to the neutrino server
     */
    CONNECTING = 0,

    /**
     * Connection to the neutrino is successful and is active
     */
    OPEN = 1,

    /**
     * Disconnecting from the neutrino server.
     */
    CLOSING = 2,

    /**
     * Disconnected from neutrino server
     */
    CLOSED = 3,

    /**
     * Reconnection to neutrino server aborted
     */
    RECONNECT_ABORTED = 4,
}

/**
 * Abstract Connection
 *
 * Abstract Connection manages all functionalities related to connection with neutrino server.
 * It establishes connection, disconnect, reconnect, and handles errors in connection,
 * also provides the application connectivity status
 *
 * @author  mehdhi
 * @since   2016-04-05
 *
 */
export class AbstractConnection {

    /**
     * This holds all the observers for the sent messages to respond once results arrive
     *
     * @protected
     * @type {MessageSubscriberList}
     */
    protected _messageSubscriberList: MessageSubscriberList;

    /**
     *  This holds the Websocket instance
     */
    protected webSocket: WebSocket;

    /**
     * This holds the messages that need to be sent to
     * server.
     */
    protected sendQueue: Array<Message>;

    /**
     * The keep alive message that will be sent from the client to the server.
     */
    protected keepAliveMessage: string = 'K';

    /**
     * The minimum time interval between keep alive messages.
     * This value is in seconds.
     */
    protected keepAliveInterval: number = 300;

    /**
     * The time since the last message was attempted to be
     * sent by the connection.
     * This is a unix timestamp.
     */
    protected lastMessageTime: number = 0;

    /**
     * Maximun size of uncompressed message string to send over the websocket.
     * Any message longer than this value should be compressed before sending
     * through websocket.
     */
    protected maxUncompressedMessageSize: number = 262144;

    /**
     * Uncompressed max message size of the message to be compressed. This size
     * is used to split the message initially. If the compressed message is
     * bigger than the expcted max compressed size then we need to split the
     * message again.
     */
    protected maxUncompressedMessageSizeToCompress = 2097152;

    /**
     * Maximum compressed message size to send over websocket. If the compressed
     * message size is greater than this size then we have to split the message
     * again and compress it.
     */
    protected maxCompressedMsgSize = 262144;

    protected reconnectAttempts: number = 0;

    /**
     * This holds the stream of messages that are received through the connection that can
     * be utilized by any service to make use of any specific type of messages.
     * The messages pushed on this stream are ones that are not responses the client is
     * waiting for. In other words, push messages from the server.
     */
    protected _inStream: Subject<IMessageJson>;

    /**********************************************************\
    *                                                          *
    *   Connection connectivity api functionality              *
    *                                                          *
    \**********************************************************/

    /**
     * This holds the subscriber created when {@Link connect} is called,
     * and is used to update the connection status
     *
     * @private
     * @type {Observer<ConnectionStatus>}
     */
    private connectObserver: Observer<ConnectionStatus>;

    /**
     * Creates an instance of AbstractConnection.
     *
     * @param {$WebSocket} connection (Angular2 Websocket)
     * @param {Logger} log
     */
    constructor( protected log: Logger, protected state: StateService<typeof ConnectionStatus, ConnectionStatus> ) {
        this.log.debug( 'Initiating connection to Neutrino server: ', AppConfig.apiBaseURL );
        this._messageSubscriberList = new MessageSubscriberList( log );
        this._inStream = new Subject<IMessageJson>();
        this.sendQueue = [];
    }

    /**
     * This streams all messages that are received through the connection that can
     * be utilized by any service to make use of any specific type of messages.
     * The messages pushed on this stream are ones that are not responses the client is
     * waiting for. In other words, push messages from the server.
     *
     * @return Subject<IMessageJson> Returns a rxjs Subject to which anyone can subscribe
     */
    public get inStream(): Subject<IMessageJson> {
        return this._inStream;
    }

    /**
     * Indicates if the websocket is open or connecting.
     * means no need to create a connection
     */
    protected get isWebSocketOpen(): boolean {
        return ( this.webSocket && ( this.webSocket.readyState === ConnectionStatusInternal.OPEN
                || this.webSocket.readyState === ConnectionStatusInternal.CONNECTING ));
    }

    /**
     * Returns true if the app is offline.
     */
    private get isOffline(): boolean {
        return !window.navigator.onLine;
    }

    /**
     * This function creates connection to the neutrino server
     * and returns the status of connection
     *
     * @returns {Observable<ConnectionStatus>}
     */
    public connect(): Observable<ConnectionStatus> {
        if ( this.state.is( ConnectionStatus, ConnectionStatus.OFFLINE ) ||
                this.state.is( ConnectionStatus, ConnectionStatus.SLEEP )) {
             this.createConnection();
        }
        return Observable.create(
            observer => this.connectObserver = observer,
        );
    }

    /**
     * This function disconnect connection from the neutrino server
     */
    public disconnect() {
        // TODO: should be implemented according to servers response
        if ( this.isWebSocketOpen ) {
            this.webSocket.close();
        }
    }

    /**
     * This function sends messages to the neutrino server through created connection
     * and also track message sent, This creates an observable so that message can be
     * tracked and to deliver the right message to the owner.
     *
     * @param {Message} message Message to be sent.
     * @returns {Observable<IMessageJson>}
     */
    public send( message: Message ): Observable<IMessageJson> {
        this.log.debug( 'Sending message ' + MessageType[message.messageType] + '.' + message.message );
        return Observable.create(
            observer => this.createSendObserver( message, observer ),
        );
    }

    /******************************************************
     * Sending keep alive messages
     ******************************************************/

    /**
     * Sends a keep alive message to the server.
     *
     * The purpose of the keep alive message is to keep the connection
     * between a client and neutrino alive as long as it's needed.
     *
     * A connection timeout duration is defined in neutrino - if it does not receive
     * any messages from the client during this period, neutrino will
     * close the connection.
     *
     * Keep alive messages are sent during a given interval. When this function
     * is called, a keep alive message will only be sent if the last message sent
     * via the connection exceeds this interval.
     *
     * If the connection had already been closed by the time this message is
     * attempted to be sent, it will reopen the connection and send the message.
     *
     */
    public keepAlive() {
        if (( Date.now() - this.lastMessageTime ) / 1000 >= this.keepAliveInterval ) {
            this.log.debug( 'Sending keep alive' );
            this.sendMessage( this.keepAliveMessage as any );
        }
    }

    /**
     * This creates a Websocket connection and initializes the connection
     * handlers.
     */
    protected createConnection() {
        if ( !this.isOffline && !this.isWebSocketOpen ) {
            this.webSocket = this.createWebSocket();
            this.initConnectionHandlers();
        }
    }

    /**
     * This creates WebSocket connection.
     */
    protected createWebSocket(): WebSocket {
        return new WebSocket( AppConfig.apiBaseURL );
    }

    /**
     * This function re-create connection to the neutrino server
     */
    protected reconnect() {
        this.disconnect();
        const backoffDelay: number = this.getBackoffDelay();
        timer( backoffDelay ).subscribe(() => {
            this.createConnection();
        });
    }

    /**
     * This method will use to get the maximum back off
     * duration.
     * Exponential backoff formula.
     */
    protected getBackoffDelay(): number {
        const R = Math.random() + 1;
        const initialTimeout = 500;
        const F = 2;
        const N = ++this.reconnectAttempts;
        const maxTimeout = 3000;
        return Math.floor( Math.min( R * initialTimeout * Math.pow( F, N ), maxTimeout ));
    }

    /**********************************************************\
    *                                                          *
    *                 Connection Event Handlers                *
    *                                                          *
    \**********************************************************/

    /**
     * This function adds all connection handlers to connection
     */
    protected initConnectionHandlers() {
        this.webSocket.onopen = evt => this.onOpenHandler( evt );
        this.webSocket.onclose = evt => this.onCloseHandler( evt ) ;
        this.webSocket.onerror = evt => this.onErrorHandler( evt ) ;
        this.webSocket.onmessage = evt => this.onMessageHandler( evt ) ;
    }

    /**
     * This function handler handles activities after establishing  a successful connection
     * to neutrino server
     *
     * @protected
     * @param {Event} event {@link https://developer.mozilla.org/en-US/docs/Web/API/Event}
     */
    protected onOpenHandler( event: Event ) {
        this.log.debug( 'Connected to the server ...' );
        this.updateConnectObserver(  );
        this.reconnectAttempts = 0;
        this.fireQueue();
    }

    /**
     * This function handler handles activities after successful disconnecting from neutrino server
     *
     * @protected
     * @param {CloseEvent} event {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent}
     */
    protected onCloseHandler( event: CloseEvent ) {
        this.log.debug( 'The connection closed with the error code ' + event.code );
        this.updateConnectObserver( );
    }

    /**
     * This function handler handles all messages recieved during connection to the neutrino server.
     * It validates message and deliver it to right entity.
     *
     * Any heartbeat messages received are silently discarded when socket persistancy is not enabled.
     * If enabled, will reply with a heartbeat pong message.
     *
     * @protected
     * @param {MessageEvent} event {@link https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent}
     */
    protected onMessageHandler( event: MessageEvent ) {
        if ( !event || !event.data ) {
            this.log.warning( 'Empty message was recieved from the server' );
            return;
        }

        if ( event.data === HEART_BEAT ) {
            this.log.debug( 'Connection heartbeat recieved' );
            return;
        }

        const message: IMessageJson = JSON.parse( event.data );

        let wsSpan;
        try {
            wsSpan = OTelManager.createSpanAndAttachContextFromMessage( 'RECEIVED NEUTRINO MSG', message );
            if ( this.isValidMessage( message )) {
                // TODO: beyond this point this should be a Message instance
                this.handleMessage( message );
            }
        } finally {
            if ( wsSpan ) {
                wsSpan.end();
            }
        }
    }

    /**
     * This function handles the messages recieved from the server. It delivers
     * the messages to requester if the message was requested and if the messages
     * are pushed by the server it pushes it to inStream. If errors are sent through
     * messages it will converted to an error and handled
     *
     * @protected
     * @param {IMessageJson} message Message recieved from the neutrino server
     */
    protected handleMessage( message: IMessageJson ) {
        this.log.debug( 'Received message ' + message.type + '.' + message.msg );

        // If the message is in error
        if ( message.stat === MessageStatus[ MessageStatus.err ]) {
            this.handleMessageError(
                new MessageResultError( message.errc, message.errm, message ),
                message.id,
            );
        } else {
            if ( this._messageSubscriberList.has( message.id )) {
                this._messageSubscriberList.resolve( message );
            } else {
                this._inStream.next( message );
            }
        }
    }

    /**
     * This function validates message logs error if message is invalid.
     *
     * @protected
     * @param {IMessageJson} message (message in the form of JSON)
     * @returns {boolean} Returns true if message is valid else false
     */
    protected isValidMessage( message: IMessageJson ): boolean {
        if ( message ) {
            if ( message.id && isString( message.id ) && !isEmpty( message.id )) {
                return true;
            } else {
                this.handleMessageError( new InvalidMessageError( 'Message recieved didnt contain a message ID' ));
            }
        } else {
            this.handleMessageError( new InvalidMessageError( 'Data recieved was not a valid message' ));
        }
        return false;
    }

    /**
     * This function handler handles all errors encountered during connection to the neutrino server
     *
     * @protected
     * @param {ErrorEvent} error {@link https://developer.mozilla.org/en-US/docs/Web/API/ErrorEvent}
     */
    protected onErrorHandler( error: Event ) {
        this.log.warning( 'Error occured on connection', error.type );
        this.log.debug( error );
    }

    /**
     * This function handles all errors, when a message error is passed it sends
     * the error through subscriber to owner and other errors are logged.
     *
     * @protected
     * @param {Error} error
     * @param {string} [messageId] messageId of the message
     */
    protected handleMessageError( error: Error, messageId?: string ) {
        if ( messageId && this._messageSubscriberList.has( messageId )) {
            this._messageSubscriberList.error( messageId, error );
        } else {
            this.log.error( 'Unhandled error occured on connection', error );
        }
        /* if ( error instanceof MessageResultError && error.code === MessageResultError.ERROR_CODE_TOKEN_REVOKED ) {
            window.gravity.client.logOut( false );
        } */
    }

    /**
     * This function sends messages to the neutrino server and tracks the owner of
     * message sent the by adding messageId and subscriber to message subscrier list
     * and if any occurs during the process its sends it through the subscriber.
     *
     * @protected
     * @param {Message} message Message to be sent
     * @param {Observer<IMessageJson>} observer Subscriber which is used repond with the response
     */
    protected createSendObserver( message: Message, observer: Observer<IMessageJson> ) {
        try {
            this._messageSubscriberList.add( message.messageId, observer );
            this.sendMessage( message );
        } catch ( error ) {
            observer.error( error );
        }
    }

    /**
     * This will send the message if socket is ready to receive.
     */
    protected sendMessage( message: Message ) {
        this.createConnection();
        this.sendQueue.push( message );
        this.fireQueue();
        this.lastMessageTime = Date.now();
    }

    /**
     * This will check whether socket is ready to send and if it's ready it'll
     * transmit data to the server over the Websocket connection.
     * If the message string size is more than AbstractConnection.MAX_UNCOMPRESSED_MESSAGE_SIZE
     * then the message is compressed and sent it over the websocket.
     */
    protected fireQueue() {
        while ( this.sendQueue.length > 0 && this.webSocket.readyState === ConnectionStatusInternal.OPEN ) {
            const message = this.sendQueue.shift();
            const serializedMsg = this.encodeMessage(  message );
            if ( serializedMsg.length < this.maxUncompressedMessageSize ) {
                this.webSocket.send( serializedMsg );
            } else {
                const compressedMsg = this.compressMessage( serializedMsg );
                if ( compressedMsg.length > this.maxCompressedMsgSize ) {
                    const chunks = this.createChunks( message.messageId, serializedMsg );
                    chunks.forEach( chunk => this.webSocket.send( chunk ));
                } else {
                    this.webSocket.send( compressedMsg );
                }
            }
        }
    }

    /**
     * Chunks and compress given message to fit in Neutrino web socket.
     * @param messageId Id of the message
     * @param data Message data.
     * @returns Returns an array of compressed binary array.
     */
    protected createChunks( messageId: string, data: string ): Uint8Array[] {
        const chunks: Uint8Array[] = [];
        this.chunkMessage( messageId, data, chunks, this.maxUncompressedMessageSizeToCompress );
        const chunkSizeMsg = {
            id: messageId,
            totalNumberOfChecks: chunks.length,
            type: 'chunk size',
        };
        chunks.unshift( this.compressMessage( this.encodeMessage( chunkSizeMsg )));
        return chunks;
    }

    /**
     * Chunks and create chunk message and compress the chunks and add it to given chunk array.
     * @param messageId
     * @param data
     * @param chunkedArray
     * @param splitLength
     */
    protected chunkMessage( messageId: string, data: string, chunkedArray: Uint8Array[], splitLength: number ) {
        splitLength = splitLength < data.length ? splitLength : Math.ceil(( data.length +  1 ) / 2 );
        const dataChunks = this.splitString( data, splitLength );
        dataChunks.forEach( dataChunk => {
            const msg = {
                id: messageId,
                index: chunkedArray.length,
                data: dataChunk,
                type: 'chunk',
            };
            const compressed = this.compressMessage( this.encodeMessage( msg ));
            if ( compressed.length > this.maxCompressedMsgSize ) {
                this.chunkMessage( messageId, dataChunk, chunkedArray, Math.ceil(( splitLength + 1 ) / 2 ));
            } else {
                chunkedArray.push( compressed );
            }
        });
    }

    /**
     * Split the string into smaller chunks.
     * @param text Text to split
     * @param lengthToSplit Max lent of the chunk.
     * @returns Returns an array of strings
     */
    protected splitString( text: string, lengthToSplit: number ): string[] {
        const arr = [];
        for ( let index = 0; index < text.length; index += lengthToSplit ) {
            arr.push( text.slice( index, index + lengthToSplit ));
        }
        return arr;
    }

    /**
     * This returns the passed message as string.
     * This will construct the message according to the data
     * the server needs.
     */
    protected encodeMessage( message: any ): string {
        if ( !isString( message )) {
            return JSON.stringify( message );
        }
        return message;
    }

    /**
     * Compress given string message usin gzip compression and returns
     * compressed data as Uint8Array.
     * @param message Message string.
     * @returns GZipped message in Uint8Array.
     */
    protected compressMessage( message: string ): Uint8Array {
        return pako.gzip( message );
    }

    /**
     * This method updates the {@Link connectObserver } with
     * connection status.
     *
     * @private
     * @param {ConnectionStatus} status (description)
     */
    private updateConnectObserver() {
        if ( this.connectObserver ) {
            this.connectObserver.next( this.state.get( ConnectionStatus ));
            this.connectObserver.complete();
            this.connectObserver = undefined;
        }
    }
}
