import { Injectable } from '@angular/core';
import { ConfiguratorCacheService } from '@shared/services/configurator-cache/configurator-cache.service';
import { UserMessageAsyncDataUpdateService } from './user-message-async-data-update.service';
import { distinctUntilChanged, map, startWith, switchMap, takeUntil, tap } from 'rxjs/operators';
import { AsyncUpdateEventModel } from '@shared/async-data-update/model/async-update-event.model';
import { MessageWebsocketDto } from '@shared/model/message-websocket-dto.model';
import { AuthenticationService } from '@shared/authentication/service/authentication.service';
import { combineLatest, EMPTY, Observable, Subject } from 'rxjs';
import { BaseDestroy } from '@util/base-class/base-destroy.class';
import { PlatformCommonService } from '@common/platform/service/platform-common.service';

@Injectable({ providedIn: 'root' })
export class UserMessageAsyncDataGlobalAdapterService extends BaseDestroy {

  private _userMessagePayloadData$: Subject<MessageWebsocketDto> = new Subject<MessageWebsocketDto>();

  private _connectionEnabled = new Subject<boolean>();

  constructor(
    private readonly platformCommonService: PlatformCommonService,
    private readonly authenticationService: AuthenticationService,
    private readonly configuratorCacheService: ConfiguratorCacheService,
    private readonly userMessageAsyncDataUpdateService: UserMessageAsyncDataUpdateService,
  ) {
    super();
  }

  public get userMessageData$(): Observable<MessageWebsocketDto> {
    return this._userMessagePayloadData$.asObservable();
  }

  public init(): void {
    // don't init WS on server
    if (this.platformCommonService.isServer) {
      return;
    }

    this.initAsyncDataListener();
    this.initAppStateListener();
  }

  private initAppStateListener(): void {
    combineLatest([
      this.configuratorCacheService.getFeSystemParam<boolean>('ENABLE_GLOBAL_USER_MESSAGES_WEBSOCKET', 'BOOLEAN'),
      this.authenticationService.getLoginStatusChange()
        .pipe(
          startWith(this.authenticationService.isLoggedIn()),
        ),
    ])
      .pipe(
        map((([globalWsEnabled, userLoggedIn]: [boolean, boolean]) => globalWsEnabled && userLoggedIn)),
        distinctUntilChanged(),
        tap((connectionToggle) => this.toggleConnection(connectionToggle)),
        takeUntil(this.destroy$),
      )
      .subscribe();
  }

  private initAsyncDataListener(): void {
    this._connectionEnabled
      .pipe(
        switchMap((connectionEnabled) => !connectionEnabled
          ? EMPTY
          : this.userMessageAsyncDataUpdateService.watchForDataUpdate$()
            .pipe(
              tap((asyncData) =>
                this.processAsyncData(asyncData)),
            ),
        ),
        takeUntil(this.destroy$),
      )
      .subscribe();
  }

  private processAsyncData(asyncData: AsyncUpdateEventModel<MessageWebsocketDto>): void {
    if (asyncData.type === 'SUCCESS') {
      this._userMessagePayloadData$.next(asyncData.payload);
    }
  }

  private toggleConnection(enabled: boolean): void {
    this._connectionEnabled.next(enabled);
  }

}
