import { Inject, Injectable } from '@angular/core';

import * as SockJS from 'sockjs-client';
import { StompConfig } from '@stomp/stompjs';

import { Subscription } from 'rxjs';
import { filter, map, pairwise, withLatestFrom } from 'rxjs/operators';

import { BaseHttpService } from './base-http.service';
import { PropertiesService } from './properties.service';
import { SessionService } from '../../modules/shared/services/session.service';

import { WebsocketPropertyData } from '../models/properties/websocket-properties';
@Injectable({ providedIn: 'root' })
export class ActivityWsService {
  private connectionIssueCounter = 0;

  private sessionSubscription: Subscription;
  private connectionStateSubscription: Subscription;
  constructor(
    private propertiesService: PropertiesService,
    private sessionService: SessionService,
    private baseHttpService: BaseHttpService,
    @Inject('Rx_STOMP_SERVICE_TOKEN') private _rxStompService,
  ) {}

  public initAndConnect() {
    this.findEnabledWebsocket()
      .pipe(
        filter(() => !!this.baseHttpService.getJwtAccessToken()),
        withLatestFrom(this.sessionService.checkTokenValidity({ body: { idToken: this.baseHttpService.getJwtAccessToken() } })),
        filter(([_, isTokenValid]) => isTokenValid && !this._rxStompService.connected()),
      )
      .subscribe(([properties]) => {
        // Note: <<connectionStateSubscription>> will be unsubscribed in <<this.disconnect()>>
        // the workflow is valid with @Nicola.
        this.connectionStateSubscription = this._rxStompService.connectionState$
          .pipe(pairwise())
          .subscribe(([oldState, newState]) => this.observeConnectionFailures(oldState, newState));
        this._rxStompService.stompClient.configure(this.stompConfig(properties, this.baseHttpService.getJwtAccessToken()));
        this._rxStompService.activate();
        this.initUserConnection(properties);
      });
  }

  public disconnect() {
    if (this._rxStompService.stompClient) {
      this._rxStompService.stompClient.reconnectDelay = 0;
    }
    if (this._rxStompService.connected()) {
      if (this.sessionSubscription) {
        this.sessionSubscription.unsubscribe();
      }
      this._rxStompService.deactivate();
    }
    if (this.connectionStateSubscription) {
      this.connectionStateSubscription.unsubscribe();
    }
  }

  public resetConnection() {
    this.disconnect();
    this.initAndConnect();
  }

  private getRootUrl(properties: WebsocketPropertyData) {
    return document.location.protocol + '//' + properties.wsDomain;
  }

  private stompConfig(properties: WebsocketPropertyData, token: string): StompConfig {
    const options = { transports: ['websocket', 'xhr-streaming', 'xdr-streaming', 'eventsource', 'xhr-polling', 'xdr-polling', 'jsonp-polling'] };

    return {
      brokerURL: new SockJS(`${this.getRootUrl(properties)}/api/1.0/sfs-ws/?access_token=${token}`, undefined, options),
      heartbeatIncoming: 0, // Typical value 0 - disabled
      heartbeatOutgoing: 20000, // Typical value 20000 - every 20 seconds
      reconnectDelay: 5000,
      connectHeaders: {},
    };
  }

  private observeConnectionFailures(oldState: number, newState: number) {
    if (oldState === 0 && newState === 3) {
      this.connectionIssueCounter += 1;
    } else if (newState === 1) {
      this.connectionIssueCounter = 0;
    }

    if (this.connectionIssueCounter >= 3) {
      this.disconnect();
    }
  }

  private findEnabledWebsocket() {
    return this.propertiesService.findPropertiesConfiguration().pipe(
      map(properties => properties.websocket),
      filter(websocket => websocket.enabled),
    );
  }

  private initUserConnection(properties: WebsocketPropertyData) {
    this.sessionSubscription = this._rxStompService.watch(this.getRootUrl(properties) + '/sfs/user-session').subscribe();
  }
}
