import isNil from 'lodash-es/isNil';
import { concat, defer, EMPTY, isObservable, merge, Observable, of, retry, Subject, throwError } from 'rxjs';
import { catchError, delay, filter, map, mergeMap, pairwise, repeat, share, skip, startWith, switchMap, take, takeUntil, tap } from 'rxjs/operators';
import { RxStompState } from '@stomp/rx-stomp';
import { AsyncUpdateConfigModel, AsyncUpdatePollingConfigModel, AsyncUpdateWebsocketConfigModel, WsBackupPoolingConfig, WsPollingConfig } from './model/async-update-config.model';
import { AsyncUpdateEventModel, ErrorAsyncUpdateEventModel, StatusAsyncUpdateEventModel, SuccessAsyncUpdateEventModel } from './model/async-update-event.model';
import { AsyncUpdateErrorType } from './model/async-update-error.type';
import { ConfiguratorCacheService } from '@shared/services/configurator-cache/configurator-cache.service';
import { CookieService } from '@common/cookie/service/cookie.service';
import { BaseDestroy } from '@util/base-class/base-destroy.class';
import { AsyncUpdateStatusType } from '@shared/async-data-update/model/async-update-status.type';
import { NgZoneUtilService } from '@util/zone/service/ng-zone-util.service';
import { WsConnectionState } from '@shared/websocket/model/ws-connection-state.model';
import { BaseWebsocketTopicService } from '@shared/websocket/service/base-websocket-topic.service';
import { WsDestinationModel } from '@shared/websocket/model/ws-destination.model';
import { Nil } from '@util/helper-types/nil';
import { PlatformCommonService } from '@common/platform/service/platform-common.service';
import { AukError } from '@common/error/domain/auk-error';
import { LoggerService } from '@common/logger/service/logger.service';

const WEBSOCKETS_ALLOWED_COOKIE_NAME: string = 'websockets-allowed';

interface InternalEvent<T> {
  data?: T;
  error?: Error;
  type: InternalEventType;
}

type InternalEventType =
  'DATA' |
  'INIT' |
  'WS_CONNECTING' |
  'WS_OPENED' |
  'WS_DROP' |
  'WS_TIMEOUT' |
  'DATA_RETRIEVAL_ERROR';

const WS_DATA_RETRIEVAL_ERROR_RETRY_ATTEMPTS = 3;

export abstract class AsyncDataUpdateService<T, D extends object = object> extends BaseDestroy {

  public wsDataProviderShared$: Observable<T>;

  protected abstract readonly defaultConfig: AsyncUpdateConfigModel | Observable<AsyncUpdateConfigModel>;

  protected abstract readonly wsState$: Observable<WsConnectionState>;

  private _initPollAfterWsOpened$: Subject<void> = new Subject<void>();

  protected constructor(
    protected readonly configuratorCacheService: ConfiguratorCacheService,
    protected readonly cookieService: CookieService,
    protected readonly platformCommonService: PlatformCommonService,
    protected readonly ngZoneUtilService: NgZoneUtilService,
    protected readonly baseWebsocketTopicService: BaseWebsocketTopicService<T, D>,
    private readonly loggerService: LoggerService,
  ) {
    super();

    this.wsDataProviderShared$ = defer(() => this.websocketDataProvider$())
      .pipe(
        share(),
      );
  }

  /**
   * WEBSOCKETS_ALLOWED_DEFAULT feature flag in CE to completely disable websocket on FE
   * There is also possibility to override this flag via special cookie (only for dev purposes)
   */
  private get websocketsFeatureAllowed(): Observable<boolean> {
    return this.configuratorCacheService.getFeSystemParam<boolean>('WEBSOCKETS_ALLOWED_DEFAULT', 'BOOLEAN')
      .pipe(
        take(1),
        mergeMap(areWebsocketsAllowed => {
          const cookieValue = this.cookieService.get(WEBSOCKETS_ALLOWED_COOKIE_NAME);
          return of(isNil(cookieValue) ? areWebsocketsAllowed : !!parseInt(cookieValue, 10));
        }),
      );
  }

  private get isStoreBotRequest(): boolean {
    return this.platformCommonService.isGoogleStoreBot;
  }

  private wsData$(config: AsyncUpdateWebsocketConfigModel): Observable<InternalEvent<T>> {
    return this.wsDataProviderShared$
      .pipe(
        map((dto: T) => this.dataEvent<T>(dto)),
        catchError((error: Error, source$) => concat(
          // Emit informative error and resubscribe
          of(this.createConnectionEvent('DATA_RETRIEVAL_ERROR', error)), source$,
        )),
        takeUntil(
          merge(this.wsTimeout$(config), this.wsDataRetrievalError$(config)),
        ),
      );
  }

  private pollDataOnNewlyWatchedDestinations$(config: AsyncUpdateWebsocketConfigModel): Observable<InternalEvent<T>> {
    return this.initPollAfterWsOpenedDone$()
      .pipe(
        take(1),
        mergeMap(() => this.baseWebsocketTopicService.newlyWatchedDestinations$),
        mergeMap((newlyWatchedDestinations) => this.pollData$(newlyWatchedDestinations)),
        takeUntil(
          merge(
            this.wsTimeout$(config),
            this.wsDataRetrievalError$(config),
          ),
        ),
      );
  }

  /**
   * Runs polling on ws connection drop. Stops polling on ws reconnect.
   */
  private backupPollingDataOnWsInterrupt$(config: WsBackupPoolingConfig): Observable<InternalEvent<T>> {
    return merge(
      this.wsConnectionDrop$(), // start polling immediately when ws connection lost
      this.wsTimeout$(config), // start polling when ws not initialized after timeout (on async stream init)
    )
      .pipe(
        switchMap(() => this.pollingData$(config)
          .pipe(
            // Close backup polling on ws connection opened
            takeUntil(this.wsConnectionOpened$()),
          ),
        ),
      );
  }

  /**
   * Runs polling on initial ws connection. Stops polling on ws connection established or timeout.
   */
  private backupPollingDataUntilWsConnect$(config: WsBackupPoolingConfig): Observable<InternalEvent<T>> {
    return of(true)
      .pipe(
        take(1),
        switchMap(() => this.pollingData$(config)),
        // Close backup polling on ws connection opened or ws timeout (polling on ws interrupt started)
        takeUntil(
          merge(
            this.wsConnectionOpened$(),
            this.wsTimeout$(config),
          )),
      );
  }

  /**
   * Initiates data stream for asynchronous data update.
   * @param config Default config can be overridden providing this param
   */
  public watchForDataUpdate$(
    config: AsyncUpdateConfigModel | Observable<AsyncUpdateConfigModel> = this.defaultConfig,
  ): Observable<AsyncUpdateEventModel<T>> {
    const existingWatch$ = new Subject<void>();
    const unifiedCfg$ = isObservable(config)
      ? config
      : of(config);

    return unifiedCfg$
      .pipe(
        tap(() => {
          // destroy previous source after some delay, so it won't unnecessarily unsub/resub to the same WS topic
          // when the config changes
          this.ngZoneUtilService.simpleTimerOut$(
            () => {
              existingWatch$.next();
            },
            this.destroy$,
            250,
          );
        }),
        // Create data stream source which consists from actual data and connection events
        mergeMap((cfg) =>
          this.sourceByAsyncType$(cfg)
            .pipe(
              startWith(this.createConnectionEvent('INIT')),
              mergeMap((e: InternalEvent<T>) =>
                e.type === 'DATA'
                  ? this.handleData(e.data)
                  : this.handleInternalEvent(cfg, e)),
              takeUntil(
                existingWatch$
                  .pipe(
                    // skip the first emit, as this would immediately stop the current source, but we want to stop it after next
                    // unifiedCfg$ emit
                    skip(1),
                  ),
              ),
            ),
        ),
        this.ngZoneUtilService.observeOnNgZone(),
        share(),
      );

  }

  /**
   * Construct data stream source based on async mode. Consists from actual data and connection events.
   */
  private sourceByAsyncType$(config: AsyncUpdateConfigModel): Observable<InternalEvent<T>> {
    if (config.mode === 'POLLING_ONLY') {
      return this.pollingData$(config);
    }

    return this.websocketsFeatureAllowed
      .pipe(
        // Do not open websocket for Google Store Bot as it cannot handle websocket connection
        map((websocketsAllowed) => websocketsAllowed && !this.isStoreBotRequest),
        switchMap((websocketsEnabled) => {
          if (websocketsEnabled) {
            // If websockets are enabled
            switch (config.mode) {
              // start websocket data stream and listen on websocket state events
              case 'WS_ONLY':
                return merge(
                  this.wsData$(config),
                  this.wsStateEvents$(config),
                );
              // start websocket data stream and listen on websocket state events. Start/stop polling if needed
              case 'WS_POLLING_BACKUP':
                return merge(
                  this.wsData$(config),
                  this.pollDataOnNewlyWatchedDestinations$(config),
                  this.wsStateEvents$(config),
                  this.backupPollingDataOnWsInterrupt$(config),
                  this.backupPollingDataUntilWsConnect$(config),
                );
              // start websocket data stream and listen on websocket state events. Start polling simultaneously with websockets
              case 'WS_POLLING':
                return merge(
                  this.wsData$(config),
                  this.wsStateEvents$(config),
                  this.pollingData$(config),
                );
            }
          } else {
            // If websockets are disabled via configuration (config element) or by special cookie
            switch (config.mode) {
              // throw error as async channel cannot be constructed on websocket only mode
              case 'WS_ONLY': {
                // Do not raise error for Store Bot
                if (this.isStoreBotRequest) {
                  return EMPTY;
                }
                return throwError(() => new Error('Cannot initialize async stream - websockets disabled and no pooling backup'));
              }
              // start polling on websockets with backup polling mode
              case 'WS_POLLING_BACKUP':
                return this.pollingData$(config);
              // start polling on websockets with simultaneous polling mode
              case 'WS_POLLING':
                return this.pollingData$(config);
            }
          }
        }),
      );
  }

  /**
   * Handles passing of async data payload to listener.
   */
  private handleData(data: T): Observable<SuccessAsyncUpdateEventModel<T>> {
    return of(this.successData(data));
  }

  /**
   * Handles internal communication events of async data stream.
   * Decides on which events are listeners notified about. Can execute additional logic based on internal events.
   */
  private handleInternalEvent(config: AsyncUpdateConfigModel, internalEvent: InternalEvent<T>): Observable<AsyncUpdateEventModel<T>> {
    switch (config.mode) {
      case 'POLLING_ONLY':
        return this.handleEventPolling(config, internalEvent);
      case 'WS_ONLY':
        return this.handleEventWs(internalEvent);
      case 'WS_POLLING_BACKUP':
        return this.handleEventWsBackupPolling(config, internalEvent);
      case 'WS_POLLING':
        return this.handleEventWsPolling(config, internalEvent);
    }
  }

  private handleEventWs(internalEvent: InternalEvent<T>): Observable<AsyncUpdateEventModel<T>> {
    return of(internalEvent)
      .pipe(
        mergeMap((e: InternalEvent<T>) => {
          switch (e.type) {
            case 'WS_OPENED':
              return of(this.statusData('CONNECTED'));
            case 'WS_DROP':
              // Inform listener on connection not available
              return of(this.errorData(e.error, 'CONNECTION_NOT_AVAILABLE'));
            case 'WS_TIMEOUT': {
              // Close stream on ws connection timeout
              throw e.error;
            }
            case 'DATA_RETRIEVAL_ERROR':
              return of(this.errorData(e.error, 'DATA_RETRIEVAL_ERROR'));
            default:
              return EMPTY;
          }
        }),
      );
  }

  private handleEventWsBackupPolling(
    config: AsyncUpdateConfigModel,
    internalEvent: InternalEvent<T>,
  ): Observable<AsyncUpdateEventModel<T>> {
    return of(internalEvent)
      .pipe(
        mergeMap((e: InternalEvent<T>) => {
          switch (e.type) {
            case 'WS_OPENED':
              // Perform initial poll when on ws connection established
              return this.takeInitPoll(config)
                .pipe(
                  tap(() => this._initPollAfterWsOpened$.next()),
                );
            case 'WS_TIMEOUT':
              // Inform listener about ws termination
              return of(this.errorData(e.error, 'CONNECTION_TIMEOUT'));
            case 'DATA_RETRIEVAL_ERROR':
              return of(this.errorData(e.error, 'DATA_RETRIEVAL_ERROR'));
            default:
              return EMPTY;
          }
        }),
      );
  }

  private handleEventWsPolling(config: WsPollingConfig, internalEvent: InternalEvent<T>): Observable<AsyncUpdateEventModel<T>> {
    return of(internalEvent)
      .pipe(
        mergeMap((e: InternalEvent<T>) => {
          switch (e.type) {
            // Perform initial poll on stream init
            case 'INIT':
              return config.performPollOnInit
                ? this.takeInitPoll(config)
                : EMPTY;
            case 'WS_TIMEOUT':
              // Inform listener about ws termination
              return of(this.errorData(e.error, 'CONNECTION_TIMEOUT'));
            case 'DATA_RETRIEVAL_ERROR':
              return of(this.errorData(e.error, 'DATA_RETRIEVAL_ERROR'));
            default:
              return EMPTY;
          }
        }),
      );
  }

  private handleEventPolling(config: AsyncUpdateConfigModel, internalEvent: InternalEvent<T>):
    Observable<AsyncUpdateEventModel<T>> {
    return of(internalEvent)
      .pipe(
        mergeMap((e: InternalEvent<T>) => {
          switch (e.type) {
            // Perform initial poll on stream init
            case 'INIT':
              return this.takeInitPoll(config);
            case 'DATA_RETRIEVAL_ERROR':
              return of(this.errorData(e.error, 'DATA_RETRIEVAL_ERROR'));
            default:
              return EMPTY;
          }
        }),
      );
  }

  /**
   * Performs initial poll request, informs listener if poll fail.
   */
  private takeInitPoll(config: AsyncUpdateConfigModel): Observable<AsyncUpdateEventModel<T>> {
    return this.pollData$()
      .pipe(
        mergeMap((e) =>
          e.type === 'DATA'
            ? of(this.successData(e.data))
            : this.handleInternalEvent(config, e)),
      );
  }

  /**
   * Performs poll request, catches and transforms error to internal type event.
   */
  private pollData$(
    destinations?: WsDestinationModel<D>[] | Nil,
  ): Observable<InternalEvent<T>> {
    return this.pollingDataProvider$(destinations)
      .pipe(
        take(1),
        map((dto: T) => this.dataEvent<T>(dto)),
        catchError((err: Error) => of(this.createConnectionEvent('DATA_RETRIEVAL_ERROR', err))),
      );
  }

  /**
   * Polls backend using provided configuration.
   *
   * TODO: [PDEV-14930] - Optimize polling for more datasources with the same polling data provider
   */
  private pollingData$(config: AsyncUpdatePollingConfigModel): Observable<InternalEvent<T>> {
    let currentCount = 0;

    return defer(() => {
      currentCount++;

      const rateResolver = config.polling.rateResolver(currentCount);

      return isObservable(rateResolver)
        ? rateResolver
        : of(rateResolver);
    })
      .pipe(
        mergeMap((rate) => this.ngZoneUtilService.timerOut$(rate)),
        switchMap(() => this.pollData$()),
        repeat(),
      );
  }

  /**
   * Constructs async update stream event with data.
   */
  private successData(data: T): SuccessAsyncUpdateEventModel<T> {
    return {
      payload: data,
      type: 'SUCCESS',
    };
  }

  /**
   * Constructs async update stream event with error.
   */
  private errorData(err: Error, errorType: AsyncUpdateErrorType = 'UNSPECIFIED'): ErrorAsyncUpdateEventModel {
    return {
      payload: {
        sourceError: err,
        errorType,
      },
      type: 'ERROR',
    };
  }

  /**
   * Constructs async update stream event with status.
   */
  private statusData(status: AsyncUpdateStatusType): StatusAsyncUpdateEventModel {
    return {
      payload: {
        statusType: status,
      },
      type: 'STATUS',
    };
  }

  private dataEvent<DATA>(data: DATA): InternalEvent<DATA> {
    return { data, type: 'DATA' };
  }

  private createConnectionEvent(type: InternalEventType, error?: Error): InternalEvent<never> {
    return { type, error };
  }

  private wsConnectionState$(): Observable<WsConnectionState> {
    return this.wsState$;
  }

  /**
   * Listens on ws connection state. Emits on ws connection establish attempt.
   */
  private wsConnecting$(): Observable<InternalEvent<T>> {
    return this.wsConnectionState$()
      .pipe(
        filter((state) => state === RxStompState.CONNECTING),
        map(() => this.createConnectionEvent('WS_CONNECTING')),
      );
  }

  private initPollAfterWsOpenedDone$(): Observable<void> {
    return this._initPollAfterWsOpened$.asObservable();
  }

  /**
   * Listens on ws connection state. Emits when connection has been changed to open.
   */
  private wsConnectionOpened$(): Observable<InternalEvent<T>> {
    return this.wsConnectionState$()
      .pipe(
        filter((state) => state === RxStompState.OPEN),
        map(() => this.createConnectionEvent('WS_OPENED')),
      );
  }

  /**
   * Listens on ws connection state. Emits when connection was open and has just been dropped.
   */
  private wsConnectionDrop$(): Observable<InternalEvent<T>> {
    return this.wsConnectionState$()
      .pipe(
        pairwise(),
        filter(([prevState, _]) => prevState === RxStompState.OPEN),
        map(() => this.createConnectionEvent('WS_DROP')),
      );
  }

  /**
   * Listens on ws messages. Emits when data retrieval error occurs more than allowed retry attempts in row.
   */
  private wsDataRetrievalError$(config: AsyncUpdateWebsocketConfigModel): Observable<InternalEvent<T>> {
    return this.wsDataProviderShared$
      .pipe(
        // Ignore data
        switchMap(() => EMPTY),
        // Retry on error for given number of attempts
        retry({ count: WS_DATA_RETRIEVAL_ERROR_RETRY_ATTEMPTS, resetOnSuccess: true }),
        // After maximum retries log error and emit ws timeout info
        catchError((error: Error) => {
          this.logWsDataRetrievalError(error);
          return of(this.createConnectionEvent('WS_TIMEOUT', error));
        }),
        // Close ws connection after WS timeout
        takeUntil(this.wsTimeout$(config)),
      );
  }

  /**
   * Listens on ws connection state. Emits when:
   * - Connection has been dropped and was not reopened in timeout.
   * - Connection has not been established in timeout interval.
   */
  private wsTimeout$(config: AsyncUpdateWebsocketConfigModel): Observable<InternalEvent<T>> {
    if (isNil(config.websocket.connectTimeout)) {
      return EMPTY;
    }
    return merge(this.wsConnectionDrop$(), this.wsConnecting$())
      .pipe(
        switchMap(() => of(this.createConnectionEvent('WS_TIMEOUT', new AukError('Websocket connection timeout', false)))
          .pipe(
            delay(config.websocket.connectTimeout),
            takeUntil(this.wsConnectionOpened$()),
          ),
        ),
      );
  }

  /**
   * Listens on ws connection state. Emits connection state change related events.
   */
  private wsStateEvents$(config: AsyncUpdateWebsocketConfigModel): Observable<InternalEvent<T>> {
    return merge(
      this.wsConnecting$(),
      this.wsConnectionOpened$(),
      this.wsConnectionDrop$(),
      this.wsDataRetrievalError$(config),
      this.wsTimeout$(config),
    );
  }

  private logWsDataRetrievalError(error: Error): void {
    console.error(error);
    this.loggerService.logException(
      error,
      {
        extra: { fingerprint: ['ASYNC_DATA_WEBSOCKETS_DATA_RETRIEVAL_EXCEPTION'],
        },
      });
  }

  protected websocketDataProvider$(): Observable<T> {
    return EMPTY;
  }

  /**
   * This is polling data provider (called when we want to poll data by API instead of WS)
   *
   * NOTE:
   *  - by default (if destinations param is nil) we want to poll data for all destinations
   *  - if destinations param is not nil, we want to poll the data only for given destinations
   *  - This is optimization as for example we want to poll only newly watched destinations on WS and not existing ones
   *    (as we know we have already the latest data for them)
   */
  protected pollingDataProvider$(
    destinations?: WsDestinationModel<D>[] | Nil,
  ): Observable<T> {
    return EMPTY;
  }

}

