// TODO: Write unit tests for this class
import LZString from 'lz-string';
import utf8 from 'utf8';
import ICommunicationProvider from './communication-provider';
import ReconnectingWebSocket from 'reconnecting-websocket';
import MessageOrderPreserver from './message-order-preserver';
import WebSocketMessageSizeExceededError from '@/errors/WebSocketMessageSizeExceededError';
import logger from '@/services/logger';

const OUT_OF_ORDER_MESSAGES_TIMEOUT = 3000;
const MIN_COMPRESSION_LENGTH = 5 * 1024; // payloads larger than 5k characters should be compressed
const MAX_FRAME_SIZE = 32 * 1024 - 1; // 32kb is the max frame size of WebSocket messages
const KEEP_ALIVES_INTERVAL = 3 * 60 * 1000; // send keepalive over websocket every 3 minutes
const MAX_SOCKET_IDLE_TIME = 60 * 60 * 1000; // after 60 minutes of idle time, stop sending keepalives

/**
 * A communication provider that uses WebSockets over our AWS API Gateway WebSocket API.
 * Due to the complex infrastructure of AWS API Gateway, the order of messages is not preserved (WebSockets
 * normally preserve order due to their TCP nature), so this class implements custom logic to make sure messages
 * are handled in the same order that they were sent by the sender
 */
export default class AwsWebsocketCommunicationProvider extends ICommunicationProvider {
  constructor(sessionId, accessToken) {
    super();
    this.sessionId = sessionId;
    this.accessToken = accessToken;

    this.boardEventListeners = new Map();
    this.sessionSignalListeners = new Map();
    this.socketEventListeners = new Map();
    this.isConnectedToSession = false;
    this.pendingOutgoingMessages = [];
    this.outgoingMessageCounter = 0;
    this.socketDisconnected = false;
    this.incomingMessagesOrderPreserver = new MessageOrderPreserver();

    this.incomingBoardEventsEnabled = false;
    this.incomingEventsQueue = [];

    this.keepAlivesInterval = null;
    this.lastSocketMessageTime = 0;

    this.socket = new ReconnectingWebSocket(process.env.VUE_APP_WEBSOCKET_URL, null, { maxRetries : 20 });

    // Note: All callbacks are async as a trick to avoid errors from causing events to stop firing
    this.socket.addEventListener('open', async () => {
      this._handleSocketOpened();
    });
    this.socket.addEventListener('message', async event => {
      this._handleIncomingMessage(event);
    });
    this.socket.addEventListener('error', async error => {
      this._handleSocketError(error);
    });
    this.socket.addEventListener('close', async event => {
      this._handleSocketClose(event);
    });
  }

  onSessionSignal(eventName, callback) {
    if (!this.sessionSignalListeners.has(eventName)) {
      this.sessionSignalListeners.set(eventName, []);
    }

    this.sessionSignalListeners.get(eventName).push(callback);
  }

  onBoardEvent(eventName, callback) {
    if (!this.boardEventListeners.has(eventName)) {
      this.boardEventListeners.set(eventName, []);
    }

    this.boardEventListeners.get(eventName).push(callback);
  }

  onSocketEvent(eventName, callback) {
    if (!this.socketEventListeners.has(eventName)) {
      this.socketEventListeners.set(eventName, []);
    }

    this.socketEventListeners.get(eventName).push(callback);
  }

  _createPayloadFromData(data) {
    const dataAsJson = JSON.stringify(data);
    let isCompressed = dataAsJson.length >= MIN_COMPRESSION_LENGTH;
    if (isCompressed) {
      data = LZString.compressToBase64(dataAsJson);
      logger.info('compress-websocket-payload', {
        originalPayloadLength: dataAsJson.length,
        compressedPayloadLength: data.length
      });
    }

    return {
      data,
      isCompressed
    };
  }

  _getDataFromPayload(payload) {
    if (!payload) {
      return null;
    }

    if (!payload.isCompressed) {
      return payload.data;
    }

    return JSON.parse(LZString.decompressFromBase64(payload.data));
  }

  sendOwnerSignal(action) {
    this.socket.send(JSON.stringify({ action }));
  }

  sendBoardEvent(eventName, data) {
    // Maintain a queue of pending messages until the client is connected to the session
    if (!this.isConnectedToSession) {
      this.pendingOutgoingMessages.push({ eventName, data });
      return;
    }

    const payload = this._createPayloadFromData(data);
    const message = utf8.encode(
      JSON.stringify({
        action: 'sendMessage',
        message: {
          eventName,
          payload,
          messageCounter: this.outgoingMessageCounter
        }
      })
    );

    if (message.length >= MAX_FRAME_SIZE) {
      throw new WebSocketMessageSizeExceededError(
        message.length,
        MAX_FRAME_SIZE
      );
    }

    this.socket.send(message);
    this.lastSocketMessageTime = Date.now();
    this.outgoingMessageCounter++;
  }

  enableIncomingBoardEvents(fromTimestamp = 0) {
    this.incomingEventsQueue
      .filter(eventData => {
        return eventData.timestamp >= fromTimestamp;
      })
      .forEach(eventData => {
        this._handleIncomingMessageData(eventData);
      });
    this.incomingEventsQueue = [];
    this.incomingBoardEventsEnabled = true;
  }

  disableIncomingBoardEvents() {
    this.incomingBoardEventsEnabled = false;
  }

  /**
   * Invoked by the browser's WebSocket as soon as the socket is connected to the remote server
   */
  _handleSocketOpened() {
    this._emitSocketEvent('open');
    if (this.socketDisconnected) {
      // Socket reconnected
      this._emitSocketEvent('reconnect');
      this.socketDisconnected = false;
    }
    // Ask the WS server to join the session. When we implement security, this will be done with a token
    this.socket.send(
      JSON.stringify({
        action: 'joinSession',
        sessionId: this.sessionId,
        accessToken: this.accessToken
      })
    );
  }

  /**
   * Invoked by the browser's WebSocket when the server sends any packet to this client
   */
  _handleIncomingMessage(event) {
    // All WebSocket messages are expected to be UTF-8 encoded JSON strings
    const eventData = JSON.parse(utf8.decode(event.data));
    this._handleIncomingMessageData(eventData);

    this.lastSocketMessageTime = Date.now();
  }

  _handleIncomingMessageData(eventData) {
    const messageType = eventData.type;

    if (messageType === 'connection-closed-purposely') {
      this.socket.close();
      return;
    }

    if (messageType === 'join-session-response') {
      if (eventData.code === 200) {
        this._handleSessionJoined();
      } else if (eventData.code === 401) {
        this.socket.close();
      } else {
        logger.error('websocket-join-session-failed', { eventData });
      }
    } else if (messageType === 'message-from-other-client') {
      // Keep incoming events from other clients in a queue until incoming events are enabled
      if (!this.incomingBoardEventsEnabled) {
        // We don't need to keep the cursor events
        // TODO: Filter every event we don't need (non hard-coded way to exclude events by name)
        if (eventData.message.eventName !== 'cursor-position-update') {
          this.incomingEventsQueue.push(eventData);
        }
        return;
      }
      this._handleMessageFromOtherClient(
        eventData.senderConnectionId,
        eventData.message
      );
    } else if (['session-updated', 'clear-all'].includes(messageType)) {
      this._emitSessionSignal(messageType);
    } else if (
      messageType === 'send-message-response' &&
      eventData.code === 403
    ) {
      // Forbidden - participant is not allowed to send messages in this session
      // Probably due to sync issues
      this._emitSessionSignal('session-updated');
    } else {
      logger.warning('websocket-unhandled-event', { eventData });
    }
  }

  /**
   * Invoked by our custom logic after the remote server approves the client's connection to the session
   */
  _handleSessionJoined() {
    this.isConnectedToSession = true;

    // Flush all the pending messages
    this.pendingOutgoingMessages.forEach(pendingMessage => {
      this.sendBoardEvent(pendingMessage.eventName, pendingMessage.data);
    });
    this.pendingOutgoingMessages = [];

    if (this.keepAlivesInterval) {
      clearInterval(this.keepAlivesInterval);
    }
    this.keepAlivesInterval = setInterval(
      this.sendKeepAlive.bind(this),
      KEEP_ALIVES_INTERVAL
    );
  }

  /**
   * Invoked by our custom logic when an incoming message arrives
   */
  _handleMessageFromOtherClient(senderConnectionId, message) {
    // Make sure the received message is sequential to the previous one that was received from the same sender
    if (
      !this.incomingMessagesOrderPreserver.shouldReceiveMessageFromOtherClient(
        senderConnectionId,
        message.messageCounter,
        message
      )
    ) {
      // Out-of-order messages are ignored; they will be processed after the preserver deems them ready
      if (!this.syncMessagesTimeoutId) {
        this.syncMessagesTimeoutId = setTimeout(() => {
          this._emitBoardEvent('out-of-sync', {
            incomingMessageCounter: message.messageCounter
          });
          this.syncMessagesTimeoutId = null;
          this.incomingMessagesOrderPreserver.clear();
          this.disableIncomingBoardEvents();
        }, OUT_OF_ORDER_MESSAGES_TIMEOUT);
      }
      return;
    }

    if (this.syncMessagesTimeoutId) {
      clearTimeout(this.syncMessagesTimeoutId);
      this.syncMessagesTimeoutId = null;
    }
    // Invoke callbacks for the current message
    this._emitBoardEvent(message.eventName, message.payload);

    // Invoke callbacks for any previously received messages were out of order, but are now relevant
    this.incomingMessagesOrderPreserver
      .dequeueFutureMessages(senderConnectionId)
      .forEach(futureMessage => {
        this._emitBoardEvent(futureMessage.eventName, futureMessage.payload);
      });
  }

  /**
   * Triggers all listeners for the given event name
   */
  _emitBoardEvent(eventName, payload) {
    if (!this.boardEventListeners.has(eventName)) {
      return;
    }

    const eventData = this._getDataFromPayload(payload);

    this.boardEventListeners
      .get(eventName)
      .forEach(async callback => callback(eventData));
  }

  /**
   * Triggers all listeners for the given event name
   */
  _emitSessionSignal(eventName) {
    if (!this.sessionSignalListeners.has(eventName)) {
      return;
    }

    this.sessionSignalListeners
      .get(eventName)
      .forEach(async callback => callback());
  }

  /**
   * Triggers all listeners for the given event name
   */
  _emitSocketEvent(eventName) {
    if (!this.socketEventListeners.has(eventName)) {
      return;
    }

    this.socketEventListeners
      .get(eventName)
      .forEach(async callback => callback());
  }

  /**
   * Invoked by the browser's WebSocket when an error occurs at any moment in the lifetime of the socket
   */
  _handleSocketError(error) {
    // TODO: We should detect specific errors for critical operations (e.g. joining a session)
    //       so that we can retry operations / show an error message in the UI
    this.sendBoardEvent('error', error);
    logger.error('websocket-error', { error });
  }

  _handleSocketClose(event) {
    logger.warning('websocket-disconnected', { event });
    this.disableIncomingBoardEvents();
    this.socketDisconnected = true;
    this._emitSocketEvent('close');
  }

  sendKeepAlive() {
    if (this.socketDisconnected) {
      clearInterval(this.keepAlivesInterval);
      return;
    }

    // We only wish to send keepalives for a limited amount of time
    const timeSinceLastSocketActivity = Date.now() - this.lastSocketMessageTime;
    if (
      this.lastSocketMessageTime &&
      timeSinceLastSocketActivity >= MAX_SOCKET_IDLE_TIME
    ) {
      console.warn(
        'Stopped sending keepalives due to long period of inactivity'
      );
      logger.warning('keepalives-max-idle-time-exceeded', {
        timeSinceLastSocketActivity,
        MAX_SOCKET_IDLE_TIME
      });
      clearInterval(this.keepAlivesInterval);
      return;
    }

    this.socket.send(JSON.stringify({ action: 'keepalive' }));
  }
}
