/** @format */

import { Injectable } from '@angular/core';
import { ProviderOptions } from '@aws-amplify/pubsub/lib-esm/types';
import { PubSub } from 'aws-amplify';
import { get } from 'lodash-es';
import { first, map, Observable } from 'rxjs';
import { of } from 'rxjs/internal/observable/of';
import { v4 } from 'uuid';
import { AuthService } from '../auth.service';
import { MethodLogger } from '../logger.service';
import { Subscriber } from 'rxjs/internal/Subscriber';

@Injectable({
  providedIn: 'root',
})
export class MqttService {
  protected uuid = v4();

  constructor(protected authService: AuthService) {}

  @MethodLogger()
  public getTopicSubscription<T = any>(topic: string | null | undefined): Observable<T> {
    if (!topic) return of();
    const run = (observer: Subscriber<any>) => {
      if (this.authService.iotProviderState.value === 'Connected') {
        PubSub.subscribe(topic).subscribe(observer);
      } else setTimeout(() => run(observer), 300);
    };
    return new Observable<any>(run).pipe(map((data) => get(data, 'value')));

    // return new Observable<any>((observer) => {
    //   return PubSub.subscribe(topic).subscribe(observer);
    // }).pipe(map((data) => get(data, 'value')));
  }

  @MethodLogger()
  public async publishToTopic(
    topic: string | string[] | null | undefined,
    msg: any,
    options?: ProviderOptions,
  ): Promise<void> {
    if (!topic) return;
    if (this.authService.iotProviderState.value !== 'Connected') {
      return new Promise((resolve, reject) => {
        setTimeout(() => {
          this.authService.iotProviderState
            .pipe(first())
            .subscribe(() => this.publishToTopic(topic, msg, options).then(resolve).catch(reject));
        }, 300);
      });
    }
    await PubSub.publish(topic, msg);
  }
}
