import { Observable, Subject, Subscription, of, interval } from 'rxjs';
import { map, catchError, startWith, switchMap } from 'rxjs/operators';
import * as moment from "moment";

import { Injectable, OnDestroy } from '@angular/core';
import { HttpParams } from '@angular/common/http';

import { MqttService, IMqttMessage, IOnConnectEvent } from 'ngx-mqtt';

import { S2AHttpClient } from '@s2a/ng-common';
import { AuthService } from '@s2a/ng-auth';
import { Equipment, MachineEquipment } from '@s2a/ng-equipment';

import { MachineData } from '../model/machine-data';
import { CreatedIncidentData, CreatedIncidentResult } from '../model/created-incident-data';
import { MachineReport } from '../model/machine-report';
import { MachineHistory } from '../model/machine-history';

@Injectable({
  providedIn: 'root'
})
export class PerformanceService implements OnDestroy {
  private liveMachineDataSubscription: Subscription;
  private liveCreatedIncidentResultSubscription: Subscription;
  private mqttConnected$: Observable<IOnConnectEvent> = new Observable<IOnConnectEvent>();

  constructor(
    private s2aHttpClient: S2AHttpClient,
    private authService: AuthService,
    private mqttService: MqttService
  ) {
    this.mqttConnected$ = this.mqttService.onConnect;
    this.mqttConnected$.subscribe(() => console.log('MQTT connected'));

    this.mqttService.onError.subscribe(() => console.log('MQTT error'));
    this.mqttService.onClose.subscribe(() => console.log('MQTT close'));
    this.mqttService.onReconnect.subscribe(() => console.log('MQTT reconnect'));
  }

  ngOnDestroy(): void {
    this.unsubscribeMqttObservables();
    this.mqttService.disconnect();
  }

  getLines(): Observable<Equipment[]> {
    const options = { params: new HttpParams().set('level', 'line').set('has_ready_kit', 'true') };
    return this.s2aHttpClient.get<Equipment[]>('equipments', options);
  }

  getMachines(lineId: string): Observable<MachineEquipment[]> {
    return this.s2aHttpClient.get<MachineEquipment[]>(`equipments/${lineId}/children`).pipe(
      map((equipments: MachineEquipment[]) => equipments
        .filter((eq: MachineEquipment) => eq.level === 'machine')
        .map((eq: MachineEquipment) => {
          let sequenceNumber = 0;
          eq.properties.forEach((property: object) => {
            if (property['propertyName'] === 'sequenceNumber') {
              sequenceNumber = property['propertyValue'];
            }
          });
          eq['sequenceNumber'] = sequenceNumber;
          return eq;
        }
        )
        .sort((a: MachineEquipment, b: MachineEquipment) => (a['sequenceNumber'] - b['sequenceNumber']))
      )
    );
  }

  getMachineHistory(machineId: string): Observable<MachineHistory> {
    const options = { params: new HttpParams().set('losses', 'true').set('speeds', 'true').set('timeformat', 'ms') };
    return this.s2aHttpClient.get<MachineHistory>(`readykit/kpis/${machineId}/live`, options);
  }

  getMachineHistoryInInterval$(machineId: string): Observable<MachineHistory> {
    const twoMinutesInMilliseconds: number = 2 * 60 * 1000;
    return interval(twoMinutesInMilliseconds).pipe(
      startWith(0),
      switchMap(() => this.getMachineHistory(machineId))
    );
  }

  mqttConnect(lineId: string): void {
    this.unsubscribeMqttObservables();
    try {
      this.mqttService.disconnect();
    } catch (e) {
    }

    const mqttAuthUrl = `performance/${lineId}/mqtt`;
    console.log('MQTT auth to ' + mqttAuthUrl);
    this.s2aHttpClient.get<string>(mqttAuthUrl)
      .pipe(
        catchError((error: any) => {
          console.log(error);
          return of();
        })
      )
      .subscribe((signedUrl: string) => {
        this.mqttService.connect({ transformWsUrl: () => signedUrl });
      });
  }

  getLiveMachineData$(lineId: string): Observable<MachineData> {
    if (this.liveMachineDataSubscription) {
      this.liveMachineDataSubscription.unsubscribe();
    }

    const liveMachineData$: Subject<MachineData> = new Subject<MachineData>();
    this.liveMachineDataSubscription = this.mqttConnected$.subscribe(() => {
      const liveMachineDataTopic = `/scheduler/kpi-calc-live/${this.authService.user.account}/${lineId}/#`;
      console.log('observing ' + liveMachineDataTopic);
      this.mqttService
        .observe(liveMachineDataTopic)
        .pipe(
          map((message: IMqttMessage) => <MachineData>JSON.parse(message.payload.toString()))
        )
        .subscribe((machineData: MachineData) => liveMachineData$.next(machineData));
    });
    return liveMachineData$;
  }

  getLiveCreatedIncidentResultData$(lineId: string): Observable<CreatedIncidentResult> {
    if (this.liveCreatedIncidentResultSubscription) {
      this.liveCreatedIncidentResultSubscription.unsubscribe();
    }

    const liveCreatedIncidentResultData$: Subject<CreatedIncidentResult> = new Subject<CreatedIncidentResult>();
    this.liveCreatedIncidentResultSubscription = this.mqttConnected$.subscribe(() => {
      const liveCreatedIncidentResultDataTopic =
        `/incident-reporting/incident-reporting-rule/new-incident/${this.authService.user.account}/${lineId}/#`;
      console.log('observing ' + liveCreatedIncidentResultDataTopic);
      this.mqttService
        .observe(liveCreatedIncidentResultDataTopic)
        .pipe(
          map((message: IMqttMessage) => <CreatedIncidentData>JSON.parse(message.payload.toString())),
          map((createdIncidentData: CreatedIncidentData) => createdIncidentData.result)
        )
        .subscribe((createdIncidentResult: CreatedIncidentResult) => {
          liveCreatedIncidentResultData$.next(createdIncidentResult);
        });
    });
    return liveCreatedIncidentResultData$;
  }

  private unsubscribeMqttObservables(): void {
    if (this.liveMachineDataSubscription) {
      this.liveMachineDataSubscription.unsubscribe();
    }
    if (this.liveCreatedIncidentResultSubscription) {
      this.liveCreatedIncidentResultSubscription.unsubscribe();
    }
  }

  private getAncestors(equipmentId: string): Observable<Equipment[]> {
    return this.s2aHttpClient.get<Equipment[]>(`equipments/${equipmentId}/ancestors`);
  }

  getPlantAndLineInfos(machineId: string): Observable<string> {
    return this.getAncestors(machineId).pipe(
      map(
        (equipments: Equipment[]) => equipments.reduce((infos: object, currentEquipment: Equipment) => {
          infos[currentEquipment.level] = currentEquipment.description;
          return infos;
        }, {})
      ),
      map(
        (plantAndLineInfosTree: object) => [
          plantAndLineInfosTree['enterprise'],
          plantAndLineInfosTree['subsidiary'],
          plantAndLineInfosTree['plant'],
          plantAndLineInfosTree['area'],
          plantAndLineInfosTree['line']
        ]
          .filter((element: string) => !!element)
          .join(' - ')
      )
    );
  }

  /**
   * @return default browser timezone if no timezone is set
   */
  getTimezone(equipmentId: string, defaultTimezone?: string): Observable<string> {
    return this.getAncestors(equipmentId).pipe(
      map(
        (equipments: Equipment[]) => equipments
          .filter((equipment: Equipment) => equipment.level === 'plant')
          .reduce((timezone: string, equipment: Equipment) => {
            return equipment.properties.reduce((tz: string, property: object) => {
              if (property['propertyName'] === 'timezone' && property['propertyValue']) {
                tz = property['propertyValue'];
              }
              return tz;
            }, timezone);
          }, defaultTimezone || moment.tz.guess())
      ),
    );
  }

  getMachine(machineId: string): Observable<MachineEquipment> {
    return this.s2aHttpClient.get<MachineEquipment>(`equipments/${machineId}`);
  }

  getMachineReport(machineId: string, shiftBegin: number): Observable<MachineReport> {
    const options = { params: new HttpParams().set('losses', 'true').set('kpis', 'true').set('timeformat', 'ms') };
    return this.s2aHttpClient.get<MachineReport>(`readykit/kpis/${machineId}/${shiftBegin}`, options);
  }

}
