import { Injectable } from '@angular/core';
import { RxStompState } from '@stomp/rx-stomp';
import { IFrame } from '@stomp/stompjs';
import moment, { Moment } from 'moment-mini-ts';
import { BehaviorSubject, Observable, of, Subject } from 'rxjs';
import { debounceTime, filter, finalize, map, mergeMap, pairwise, share, take, takeUntil, tap } from 'rxjs/operators';
import { NgUnsubscribe } from '@util/base-class/ng-unsubscribe.class';
import isNil from 'lodash-es/isNil';
import { TimeService } from '@common/time/service/time.service';
import { StringUtils } from '@util/util/string.utils';
import { RestHttpClientService } from '@api/rest/rest-http-client.service';
import { RxStompService } from '@shared/websocket/service/rx-stomp.service';
import { NgZoneUtilService } from '@util/zone/service/ng-zone-util.service';
import { ArrayUtils } from '@util/util/array.utils';
import { AuthCacheService } from '@shared/authentication/service/auth-cache.service';
import { ObjectUtils } from '@util/util/object.utils';
import { StompHeadersExtendedModel } from '@shared/websocket/model/stomp-headers-extended.model';
import { WsConnectionState } from '@shared/websocket/model/ws-connection-state.model';
import { Nil } from '@util/helper-types/nil';
import { DateUtils } from '@util/util/date.utils';
import { AukError } from '@common/error/domain/auk-error';

interface WatcherInfo<T> {
  watcher: Observable<T>;
  /**
   * Count of subscriptions to the given topic.
   */
  count: number;
  /**
   * This fn clears all subscriptions from created inside {@link watcher} Observable
   *
   * It should be called, when we want to destroy watcher subscription, so it will cancel all inner subscriptions
   * inside {@link RxStomp#watch} fn
   */
  destroyFn: () => void;
}

/**
 * Interval after that is connection considered established.
 */
const CONNECTION_ESTABLISHED_INTERVAL_MILLIS: number = 200;
/**
 * Interval to wait between WS connection attempts
 */
const RECONNECT_DELAY_MILLIS: number = 500;
/**
 * Formula for incremental increase of interval between WS connection attempts. Power of 2 * delay.
 */
const RECONNECT_DELAY_INCREMENT_FORMULA = (delay: number, attempts: number): number => ((2 ** attempts) * delay);

@Injectable({ providedIn: 'root' })
export class WebsocketsService<T> extends NgUnsubscribe {

  private static lastConnectionTimestamp: moment.Moment | Nil;

  /**
   * Delay, after which the WS connection will be closed, if within that timeframe there won't be new topic subscription
   */
  private readonly closeWs$DebounceTime: number = DateUtils.convertSecondsToMilliseconds(2);

  private errorLogged: boolean = false;

  /**
   * Connection State
   *
   * It is a BehaviorSubject and will emit current status immediately. This will typically get
   * used to show current status to the end user.
   */
  public connectionState$: BehaviorSubject<WsConnectionState> = this.stompService.connectionState$;

  /**
   * Count of connections attempts since last successfully opened ws connection
   */
  private connectAttempts = 0;

  /**
   * Provides headers from most recent connection to the server as returned by the CONNECTED frame.
   * If the STOMP connection has already been established it will trigger immediately.
   * It will trigger for each reconnection.
   */
  private serverHeaders$: Observable<StompHeadersExtendedModel> = this.stompService.serverHeaders$;

  /**
   * Saved observables of all watched topics.
   */
  private readonly watchers: Record<string, WatcherInfo<T>> = {};

  private readonly closeWs$: Subject<void> = new Subject<void>();

  constructor(
    private readonly stompService: RxStompService,
    private readonly timeService: TimeService,
    private readonly ngZoneUtilService: NgZoneUtilService,
    private readonly authCacheService: AuthCacheService,
  ) {
    super();
    this.initConnectServerTimeDiff();
    this.initWsConnectionChangeListener();
    this.handleConnectionOpened();
    this.handleWsCloseConnection();
  }

  public static getWsConnectionDurationSeconds(): number {
    const currentMilliseconds = moment().milliseconds();
    const lastConnectionMilliseconds = WebsocketsService.lastConnectionTimestamp?.milliseconds() ?? currentMilliseconds;

    return Math.floor(Math.abs(currentMilliseconds - lastConnectionMilliseconds) / 1000);
  }

  public currentConnectionState(): WsConnectionState {
    return this.connectionState$.getValue();
  }

  private handleConnectionOpened(): void {
    this.connectionState$
      .pipe(
        // On successful connection opened
        filter((state: WsConnectionState) => state === RxStompState.OPEN),
        // Listen on STOMP errors for given interval
        mergeMap(() => this.stompErrorsBuffer$(CONNECTION_ESTABLISHED_INTERVAL_MILLIS)),
        takeUntil(this.ngUnsubscribe),
      )
      .subscribe((errors: IFrame[]) => {
        if (ArrayUtils.isEmpty(errors)) {
          this.connectAttempts = 0;
          WebsocketsService.lastConnectionTimestamp = moment();
        }
      });
  }

  private stompErrors$(): Observable<IFrame> {
    return this.stompService.stompErrors$.asObservable();
  }

  private stompErrorsBuffer$(interval: number): Observable<IFrame[]> {
    return this.stompErrors$()
      .pipe(
        this.ngZoneUtilService.bufferTimeOut(interval),
        take(1),
      );
  }

  /**
   * Watch messages from given topic. It also starts websocket connection to server if necessary.
   * @param destination Topic destination
   * @param headers Topic subscribe headers
   */
  public watch$(destination: string, headers?: StompHeadersExtendedModel): Observable<T> {
    if (StringUtils.isBlank(destination)) {
      throw new AukError('Websockets: No topic destination value.', false);
    }

    // return existing observable if topic already watched
    const watcherInfo = this.watchers[destination];
    if (watcherInfo) {
      watcherInfo.count++;
      return watcherInfo.watcher;
    }

    const watchDestroy$ = new Subject<void>();
    const newWatcherInfo: WatcherInfo<T> = {
      count: 1,
      watcher: this.connect$()
        .pipe(
          take(1),
          mergeMap(() =>
            this.stompService.watch(
              destination,
              {
                ...headers,
                id: destination,
              },
            ),
          ),
          tap((data) => this.timeService.syncServerDiff(StringUtils.parseDate(data.headers['server-time']))),
          map((data) => JSON.parse(data.body) as T),
          // Called after last listener unsub. Eventually disconnects ws connection.
          finalize(() => {
            this.unwatch(destination);
          }),
          share(),
          takeUntil(watchDestroy$),
        ),
      destroyFn: (): void => {
        watchDestroy$.next();
      },
    };

    this.watchers[destination] = newWatcherInfo;

    return newWatcherInfo.watcher;
  }

  /**
   * Unwatch given topic. Closes connection of when there is none left.
   */
  public unwatch(
    /**
     * Topic destination
     */
    destination: string,
  ): void {
    if (isNil(destination)) {
      return;
    }

    const watcherInfo = this.watchers[destination];

    if (isNil(watcherInfo)) {
      return;
    }

    watcherInfo.count--;

    if (watcherInfo.count === 0) {
      watcherInfo.destroyFn();
      delete this.watchers[destination];
    }

    if (ObjectUtils.isObjectEmpty(this.watchers)) {
      this.closeWs$.next();
    }
  }

  /** Close connection to websocket server. Active subscriptions are automatically unsubscribed. */
  private closeWsConnection(): void {
    void this.stompService.deactivate();
  }

  /** Open connection to websockets server. */
  private connect$(): Observable<void> {
    const currentConnectionState = this.stompService.connectionState$.getValue();

    switch (currentConnectionState) {
      // open state -> do nothing
      case RxStompState.OPEN:
        return of<void>(undefined);
      // closed state -> activate ws
      case RxStompState.CLOSED:
        // (PDEV-13506) - timer needed due to bug in 3rd lib (it emits closed state before it deletes current ws)
        return this.ngZoneUtilService.timerOut$(0)
          .pipe(
            tap(() => this.activate()),
            mergeMap(() => this.connect$()),
          );
      // closing state -> wait for close state, then try to connect again
      case RxStompState.CLOSING:
        return this.stompService.connectionState$
          .pipe(
            filter((state) => state === RxStompState.CLOSED),
            take(1),
            mergeMap(() => this.connect$()),
          );
      // connecting state -> wait for open state
      case RxStompState.CONNECTING:
        return this.stompService.connected$
          .pipe(
            take(1),
            mergeMap(() => this.connect$()),
          );
    }
  }

  private activate(): void {
    /**
     * heartbeatIncoming: We want to have as much as possible incoming/server checks
     * to make client quickly aware of the lost connection to the server
     *
     * heartbeatOutgoing: Outgoing/client checks can be in bigger intervals.
     * Only downside is, that the server may be sending data to users that are already disconnected
     * - when value was set to 1200, server didn't always get the response from client,
     *    and he was often disconnected by the server
     *
     * reconnectDelay: time after server disconnect to initiate reconnecting by the client
     */
    this.stompService.configure({
      brokerURL: RestHttpClientService.WS_URL,
      connectHeaders: {},
      heartbeatIncoming: 2_000,
      heartbeatOutgoing: 20_000,
      reconnectDelay: RECONNECT_DELAY_MILLIS,
      discardWebsocketOnCommFailure: true,
      beforeConnect: () => this.incrementReconnectDelay(),
    });

    this.ngZoneUtilService.runOut(() => {
      this.stompService.activate();
    });
  }

  /**
   * Incrementally increases reconnect retry delay.
   */
  private incrementReconnectDelay(): void {
    this.connectAttempts++;

    if (this.connectAttempts === 0) {
      return;
    }

    this.stompService.configure({ reconnectDelay: RECONNECT_DELAY_INCREMENT_FORMULA(RECONNECT_DELAY_MILLIS, this.connectAttempts) });
  }

  private initConnectServerTimeDiff(): void {
    this.serverHeaders$
      .pipe(
        map((headers) => {
          const serverTime: Moment = StringUtils.parseDate(headers['server-time']);
          if (!serverTime) {
            return null;
          }
          return serverTime;
        }),
        takeUntil(this.ngUnsubscribe),
      )
      .subscribe((serverTime) => this.timeService.syncServerDiff(serverTime));
  }

  private initWsConnectionChangeListener(): void {
    this.connectionState$
      .pipe(
        pairwise(),
        takeUntil(this.ngUnsubscribe),
      )
      .subscribe(([previousState, currentState]) => this.handleConnectionStateChange(previousState, currentState));
  }

  private handleConnectionStateChange(previousState: WsConnectionState, currentState: WsConnectionState): void {
    if (
      currentState !== RxStompState.CLOSED
      || previousState !== RxStompState.CONNECTING
      || this.errorLogged
    ) {
      return;
    }

    this.errorLogged = true;
  }

  /**
   * This closes WS connection always within specified delay
   * This helps cases, where multiple places are unsubscribing and immediately subscribing another topics,
   * so this won't close the WS connection
   */
  private handleWsCloseConnection(): void {
    this.closeWs$
      .pipe(
        debounceTime(this.closeWs$DebounceTime),
        takeUntil(this.ngUnsubscribe),
      )
      .subscribe(() => {
        if (ObjectUtils.isObjectEmpty(this.watchers)) {
          this.closeWsConnection();
        }
      });
  }

}
