import { finalize, map, mergeMap, pairwise, share, startWith, takeUntil } from 'rxjs/operators';
import { BehaviorSubject, defer, EMPTY, NEVER, Observable, Subject } from 'rxjs';
import { WebsocketsService } from './websockets.service';
import { StompHeaders } from '@stomp/stompjs';
import { WsTopicPropertiesModel } from '@shared/websocket/model/ws-topic-properties.model';
import { AuthCacheService } from '@shared/authentication/service/auth-cache.service';
import { BaseDestroy } from '@util/base-class/base-destroy.class';
import { PlatformCommonService } from '@common/platform/service/platform-common.service';
import { StringUtils } from '@util/util/string.utils';
import { ArrayUtils } from '@util/util/array.utils';
import { combineLatestWithStart$ } from '@util/rxjs-operators/combine-latest-with-start';
import { Nil } from '@util/helper-types/nil';
import isNil from 'lodash-es/isNil';
import { WsConnectionState } from '@shared/websocket/model/ws-connection-state.model';
import { isNotNil } from '@util/helper-functions/is-not-nil';
import { WsDestinationModel } from '@shared/websocket/model/ws-destination.model';
import differenceBy from 'lodash-es/differenceBy';
import uniqBy from 'lodash-es/uniqBy';
import { WsTopicDataModel } from '@shared/websocket/model/ws-topic-data.model';
import { LoggerService } from '@common/logger/service/logger.service';

export abstract class BaseWebsocketTopicService<T, D extends object = never>
  extends BaseDestroy {

  public currentlyWatchedDestinations$: Observable<WsDestinationModel<D>[]>;
  public newlyWatchedDestinations$: Observable<WsDestinationModel<D>[]>;

  protected abstract readonly wsProperties: WsTopicPropertiesModel;

  /**
   * Whether destinations are multiple
   */
  protected abstract readonly isMultiple: boolean;

  /**
   * Holds currently watched WS destinations, so we can unwatch them on destroy
   */
  protected _currentlyWatchedDestinations$: BehaviorSubject<WsDestinationModel<D>[]> = new BehaviorSubject<WsDestinationModel<D>[]>([]);
  /**
   * Emits destinations, which were recently started watching
   */
  protected _newlyWatchedDestinations$: Subject<WsDestinationModel<D>[]> = new Subject<WsDestinationModel<D>[]>();

  private _message$:
    T extends unknown[]
      ? Observable<(WsTopicDataModel<T[0], D>)[] | Nil>
      : Observable<(WsTopicDataModel<T, D>) | Nil>;

  protected constructor(
    protected readonly authCacheService: AuthCacheService,
    protected readonly websocketsService: WebsocketsService<T>,
    protected readonly platformCommonService: PlatformCommonService,
    protected readonly loggerService: LoggerService,
  ) {
    super();

    this.currentlyWatchedDestinations$ = this._currentlyWatchedDestinations$.asObservable()
      .pipe(
        share(),
      );
    this.newlyWatchedDestinations$ = this._newlyWatchedDestinations$.asObservable()
      .pipe(
        share(),
      );

    this.init();
  }

  protected abstract wsDestinations$(): Observable<WsDestinationModel<D> | WsDestinationModel<D>[] | Nil>;

  public get wsConnectionState$(): Observable<WsConnectionState> {
    return this.websocketsService.connectionState$.asObservable()
      .pipe(
        takeUntil(this.destroy$),
      );
  }

  public wsMessage$(): typeof this._message$ {
    return this._message$;
  }

  private watch$(destination: string, wsProperties: WsTopicPropertiesModel): Observable<T> {
    const stompHeaders = this.constructHeaders(wsProperties);
    this.validateHeaders(wsProperties, stompHeaders);

    return this.websocketsService.watch$(destination, stompHeaders);
  }

  private constructHeaders(wsProperties: WsTopicPropertiesModel): StompHeaders {
    if (wsProperties.authorized) {
      return { authorization: this.authCacheService.authToken };
    }
    return {};
  }

  private validateHeaders(wsProperties: WsTopicPropertiesModel, stompHeaders: StompHeaders): void {
    if (wsProperties.authorized && StringUtils.isBlank(stompHeaders.authorization)) {
      this.loggerService.logMessage(
        'BaseWebsocketMultipleTopicService :: Missing authorization token for authorized WS topic',
        'error',
      );
    }
  }

  private init(): void {
    if (this.platformCommonService.isServer) {
      this._message$ = EMPTY as T extends unknown[] ? Observable<never> : Observable<never>;
      return;
    }

    this.setCurrentlyWatchedDestinations([]);
    // Defer used because of calling abstract method in abstract parent constructor.
    // Dependencies of child classes are not available at this point.
    this._message$ = defer(() => this.wsDestinations$())
      .pipe(
        // startWith needed, to fill the buffer in pairwise operator
        startWith(null),
        pairwise(),
        mergeMap(([prevDestinations, currDestinations]) => {
          if (isNil(prevDestinations) && isNil(currDestinations)) {
            return NEVER;
          }

          const prevDestinationsUnified = ArrayUtils.isArray(prevDestinations) ? prevDestinations : [prevDestinations];

          const currDestinationsUnified = ArrayUtils.isArray(currDestinations) ? currDestinations : [currDestinations];

          const destinationsToUnwatch: WsDestinationModel<D>[] =
            uniqBy(
              differenceBy(
                prevDestinationsUnified,
                currDestinationsUnified,
                (val) => val?.destination,
              )
                .filter((dest) => this.isNotDestinationNil(dest)),
              (val) => val?.destination,
            );

          const destinationsToWatch: WsDestinationModel<D>[] =
            uniqBy(
              differenceBy(
                currDestinationsUnified,
                prevDestinationsUnified,
                (val) => val?.destination,
              )
                .filter((dest) => this.isNotDestinationNil(dest)),
              (val) => val?.destination,
            );

          // store currently watched destinations, so we can to unwatch them in finalize operator
          this.setCurrentlyWatchedDestinations(
            uniqBy(
              currDestinationsUnified,
              (val) => val?.destination,
            )
              .filter((dest) => this.isNotDestinationNil(dest)),
          );
          this.setNewlyWatchedDestinations(destinationsToWatch);

          destinationsToUnwatch
            .forEach(({ destination }) => this.websocketsService.unwatch(destination));

          return this.isMultiple
            ? combineLatestWithStart$(
              destinationsToWatch
                .map(({ destination, additionalData }) =>
                  this.watch$(destination, this.wsProperties)
                    .pipe(
                      map((data) => ({
                        data,
                        additionalData,
                      })),
                    ),
                ),
            )
            : isNil(destinationsToWatch[0])
              ? NEVER
              : this.watch$(destinationsToWatch[0].destination, this.wsProperties)
                .pipe(
                  map((data) => ({
                    data,
                    additionalData: destinationsToWatch[0].additionalData,
                  })),
                );
        }),
        finalize(() => {
          // on finalize, we need to unwatch all destinations, that we have previously started watching
          this.currentlyWatchedDestinations.forEach(({ destination }) =>
            this.websocketsService.unwatch(destination));
          // cleanup
          this.setNewlyWatchedDestinations([]);
          this.setCurrentlyWatchedDestinations([]);
        }),
        share(),
        // type assertion needed, due to TS reporting error for T | T[] case
      ) as typeof this._message$;
  }

  protected get currentlyWatchedDestinations(): WsDestinationModel<D>[] {
    return this._currentlyWatchedDestinations$.value;
  }

  private setCurrentlyWatchedDestinations(destinations: WsDestinationModel<D>[]): void {
    this._currentlyWatchedDestinations$.next(destinations);
  }

  private setNewlyWatchedDestinations(destinations: WsDestinationModel<D>[]): void {
    this._newlyWatchedDestinations$.next(destinations);
  }

  private isNotDestinationNil(destModel: WsDestinationModel<D>): boolean {
    return isNotNil(destModel?.destination);
  }

}
