import { inject, Injectable } from "@angular/core";
import { AsyncSubject, BehaviorSubject, Observable } from "rxjs";
import {
  AudioCompressionService,
  CompressionOptions,
  CompressionResult,
  CompressionStatus,
  CompressionSuccess
} from "./audio-compression-service";
import { FfmpegService } from "../ffmpeg-service";

interface DurationJob {
  name: string;
  blob: Blob;
  subject: AsyncSubject<number>;
}

@Injectable()
export class FfmpegAudioCompressor implements AudioCompressionService {
  private durationQueue: DurationJob[] = [];
  private processingDuration = false;
  private ffmpeg = inject(FfmpegService);

  private readonly defaultOptions: Required<CompressionOptions> = {
    channels: '1',
    bitrate: '16k',
    sampleRate: '24000',
    format: 'mp3'
  };

  async init(): Promise<boolean> {
    return this.ffmpeg.init();
  }

  compressWithProgress(filename: string, file: Blob, options: CompressionOptions): Observable<CompressionResult> {
    const effectiveOptions = { ...this.defaultOptions, ...options };
    const output = `compressed_${filename}.${effectiveOptions.format}`;

    const subject = new BehaviorSubject<CompressionResult>({ status: CompressionStatus.compressing, progress: 0, elapsedTime: 0 });

    // Let's sub to the progress events that will soon be emitted
    const progressSub =
      this.ffmpeg
        .getProgress()
        .subscribe({
          next: (event) => subject.next({
            status: CompressionStatus.compressing,
            elapsedTime: event.time,
            progress: event.progress * 100
          })
        });

    // Now we can start the compression process
    this.ffmpeg
      .writeFile(filename, file)
      .then(() => this.executeCompression(filename, output, effectiveOptions))
      .then(() => this.ffmpeg.readFile(output))
      .then((data) => {
        const success = compressionSuccess(effectiveOptions.format, data)
        subject.next(success);
        subject.complete();
        progressSub.unsubscribe();
      })
      .catch(error => subject.next({ status: CompressionStatus.error, detail: error }))
      .finally(() => this.ffmpeg.deleteFiles([filename, output]))

    return subject.asObservable();
  }

  getDuration(filename: string, file: Blob): Observable<number> {
    const subject = new AsyncSubject<number>();

    this.durationQueue.push({
      name: filename,
      blob: file,
      subject
    });

    if (!this.processingDuration) {
      void this.processQueue();
    }

    return subject.asObservable();
  }

  private async processQueue() {
    if (this.durationQueue.length === 0) {
      this.processingDuration = false;
      return;
    }

    this.processingDuration = true;
    const job = this.durationQueue.shift()!;

    try {
      // Complete sequence for a single file
      await this.ffmpeg.writeFile(job.name, job.blob);

      this.ffmpeg.getDurationFromStdout().subscribe({
        next: (duration) => {
          // Emit duration as soon as its available
          job.subject.next(duration);
          job.subject.complete();
        }
      })

      await this.ffmpeg.execCommand(['-i', job.name]);

      // Delete the file once we are done with it
      await this.ffmpeg.deleteFile(job.name);

      console.log(`Done with ${job.name}`)
    } catch (error) {
      job.subject.error(error);
      // Cleanup on error
      void this.ffmpeg.deleteFile(job.name).catch(() => { });
    }

    // Process next file only after current one is fully complete
    void this.processQueue();
  }

  private executeCompression(
    filename: string,
    output: string,
    options: Required<CompressionOptions>
  ): Promise<number> {
    return this.ffmpeg.execCommand([
      "-i", filename,
      "-ac", options.channels,
      "-b:a", options.bitrate,
      "-ar", options.sampleRate,
      output
    ]);
  }
}

function compressionSuccess(format: string, data: Uint8Array): CompressionSuccess {
  return {
    status: CompressionStatus.done,
    result: new Blob([data], { type: `audio/${format}` })
  };
}
