import { Injectable } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import { filter } from 'rxjs/operators';
import { get } from 'lodash';

import { LoggerService } from '../util/logger.service';
import dataConstants from './constants';
import { EnoFactory } from './EnoFactory';
import { Batch, Sid, Tip } from './models/types';
import { Eno } from './models/Eno';
import { EnsrvService } from './ensrv.service';
import { IVars } from './vars.service';
import { ESessionMessageDataType, ISessionMessageData, PubSubService } from './pub-sub.service';
import { OpPullService } from './op-pull.service';

export interface IProcessResponse {
  finished: boolean;
  vars: IVars;
}

export interface IProcessResponseSessionMessageData extends ISessionMessageData {
  type: ESessionMessageDataType.processResponse;
  op: Tip;
  tips: Tip[]; // response/process ENO tips
}

@Injectable({
  providedIn: 'root'
})
export class ProcessService {
  private _processOpFactory: EnoFactory = new EnoFactory('op/process', dataConstants.SECURITY.OP);
  private _processSubjects: { [processOpTip: string]: Subject<IProcessResponse> } = {};

  // Stores the Sids of the the process responses that have been handled.
  // Right now, for any asynchronous processes, the frond-end would receive the initial process response from both the XHR response
  // and a PubSub message.
  // We want to avoid notifying the observers of #start twice for the same process response.
  private _processResponseSids: Sid[] = [];

  constructor(
    private _ensrvService: EnsrvService,
    private _pubSubService: PubSubService,
    private _opPullService: OpPullService,
    private loggerService: LoggerService
  ) {
    this._pubSubService.receiveSessionMessage(ESessionMessageDataType.processResponse).pipe(
      filter((data: IProcessResponseSessionMessageData) => this._processSubjects.hasOwnProperty(data.op)
        && data.tips && data.tips.length > 0)
    ).subscribe(
      (data: IProcessResponseSessionMessageData) => this._handleSessionMessageData(data)
    );

    this._ensrvService.getEnoReceiver('response/process').subscribe(
      (processResponseEno: Eno) => this._handleResponse(processResponseEno)
    );
  }

  start(processTip: Tip, processVars: IVars = {}): Observable<IProcessResponse> {
    const processSubject = new Subject<IProcessResponse>();
    const processOpEno = this._processOpFactory
      .setFields([
        { tip: 'op/process/process', value: [processTip] },
        { tip: 'op/process/inline-vars', value: [JSON.stringify(processVars)] }
      ])
      .makeEno();

    this._processSubjects[processOpEno.tip] = processSubject;

    this.loggerService.debug(`[ProcessService] Started process "${processTip}"`
      + `(process operation "${processOpEno.tip}" with vars "${JSON.stringify(processVars)}").`);

    this._ensrvService.send([processOpEno]).subscribe(
      (enos: Batch) => {
        const errors = enos.filter((eno) => get(eno, 'source.type', null) === 'error');
        if (errors.length > 0) {
          this.logProcessFailure({ processTip, processOpEno, processVars, error: errors });
          processSubject.error(errors);
        }
      },
      (error) => {
        this.logProcessFailure({ processTip, processOpEno, processVars, error });
        processSubject.error(error);
      }
    );

    return processSubject.asObservable();
  }

  private logProcessFailure({ processTip, processOpEno, processVars, error }) {
    this.loggerService.error(`[ProcessService] Failed to execute process "${processTip}"`
      + `(process operation "${processOpEno.tip}" with vars "${JSON.stringify(processVars)}").`, error);
  }

  private _handleResponse(processResponseEno: Eno): void {
    if (this._processResponseSids.includes(processResponseEno.sid)) {
      return;
    }

    this._processResponseSids.push(processResponseEno.sid);

    const processOpTip = processResponseEno.getFieldStringValue('response/process/op-tip');
    const processSubject = this._processSubjects[processOpTip];

    if (!processSubject) {
      return;
    }

    const vars = processResponseEno.getFieldJsonValue('response/process/inline-vars') || {};
    const finished = processResponseEno.getFieldBooleanValue('response/process/finished');

    processSubject.next({ finished, vars });

    if (finished) {
      processSubject.complete();
    }
  }

  private _handleSessionMessageData(data: IProcessResponseSessionMessageData) {
    const processResponseTip: Tip = data.tips[0];

    const opPullEno: Eno = this._opPullService.createOpPull({
      tip: [processResponseTip],
      watch: false
    });

    this._ensrvService.send([opPullEno]).subscribe();
  }
}
