import { Injectable, NgZone } from '@angular/core';
import { merge, Observable, Subject } from 'rxjs';
import { filter } from 'rxjs/operators';
import { ConfigService } from '../config/config.service';
import { LoggingService } from '../logging/logging.service';
import { SignalR } from './signal-r';
import { IMessage } from './symbols';

@Injectable({
  providedIn: 'root',
})
export class MessageHubService {
  received$: Observable<IMessage>;
  hubs: { [name: string]: SignalR } = {};
  private _received$ = new Subject<IMessage>();

  constructor(
    private _logger: LoggingService,
    private _configService: ConfigService,
    private _ngZone: NgZone,
  ) {}

  startup() {
    return this.buildHubs().then(() => {
      this.received$ = merge(
        ...[this._received$, ...Object.values(this.hubs).map(hub => hub.received$)],
      );
    });
  }

  broadcastLocally(message: IMessage) {
    this._received$.next(message);
  }

  filterByKey<T = IMessage>(key: string) {
    return (this.received$ as Observable<T & IMessage>).pipe(
      filter(message => message.Key === key),
    );
  }

  filterByTransactionId<T = IMessage>(transactionId: string) {
    return (this.received$ as Observable<T & IMessage>).pipe(
      filter(message => message.TransactionId === transactionId),
    );
  }

  filterByClientId<T = IMessage>(clientId: string) {
    return (this.received$ as Observable<T & IMessage>).pipe(
      filter(message => message.ClientId === clientId),
    );
  }

  filterByDeviceId<T = IMessage>(deviceId: string) {
    return (this.received$ as Observable<T & IMessage>).pipe(
      filter(message => message.DeviceId === deviceId),
    );
  }

  private buildHubs() {
    const startups = [];
    const servers = this._configService.config.Servers;
    Object.keys(servers).forEach(name => {
      if (servers[name].SignalR) {
        this.hubs[name] = new SignalR(this._ngZone, this._logger, servers[name].SignalR);
        startups.push(this.hubs[name].startup());
      }
    });
    return Promise.all(startups);
  }
}
