/** @format */

import { Injectable } from '@angular/core';
import { ProviderOptions } from '@aws-amplify/pubsub/lib-esm/types';
import { PubSub } from 'aws-amplify';
import { get } from 'lodash-es';
import { first, map, Observable } from 'rxjs';
import { of } from 'rxjs/internal/observable/of';
import { Subscriber } from 'rxjs/internal/Subscriber';
import { v4 } from 'uuid';
import { SesioIotProvider } from '../../_classes/SesioIoTProvider.class';
import { AppLogger, MethodLogger } from '../logger.service';

@Injectable({
  providedIn: 'root',
})
export class MqttService {
  protected uuid = v4();

  private logger = new AppLogger('MqttService');

  @MethodLogger()
  public getTopicSubscription<T = any>(topic: string | null | undefined): Observable<T> {
    if (!topic) return of();
    const run = (observer: Subscriber<any>) => {
      if (SesioIotProvider.state.value === 'Connected') {
        PubSub.subscribe(topic).subscribe(observer);
      } else {
        this.retry(() => PubSub.subscribe(topic).subscribe(observer));
      }
    };
    return new Observable<any>(run).pipe(map((data) => get(data, 'value')));
  }

  @MethodLogger()
  public publishToTopic(topic: string | string[] | null | undefined, msg: any, options?: ProviderOptions): void {
    if (!topic) return;
    if (SesioIotProvider.state.value === 'Connected') {
      PubSub.publish(topic, msg);
    } else {
      this.retry(() => PubSub.publish(topic, msg));
    }
  }

  protected retry(run: () => any): void {
    if (SesioIotProvider.state.value === 'Disconnected') {
      return this.logger.warn(`discard retry due to diconnected state`);
    }
    SesioIotProvider.state.pipe(first()).subscribe((state) => {
      if (state === 'Disconnected') {
        return this.logger.warn(`discard retry due to diconnected state`);
      }
      if (state === 'Connected') return run();
      return this.retry(run);
    });
  }
}
