import { EventEmitter, Subscription } from '../event-emitter';
import { ObservableProgress, ObserveProgressListener, Progress } from '../progress/progress.types';
import { SubscriptionGroup } from '../subscription-group';

import { AsyncLock } from './async-lock';
import { PartitionQueue } from './partition-queue.types';

type PartitionQueueProgressObserverEventMap = {
  progress: Progress;
};

export class PartitionQueueProgressObserver<T extends Record<string, unknown>> implements ObservableProgress {
  private totalCount = 0;
  private processedCount = 0;
  private totalTimeSpent = 0;
  private lastCommitTimestamp = 0;
  private isStarted = false;

  private subscriptions = new SubscriptionGroup();
  private lock = new AsyncLock();

  private eventEmitter = new EventEmitter<PartitionQueueProgressObserverEventMap>();

  constructor(private queue: PartitionQueue<T>) {}

  private triggerObservers(): void {
    const progress = this.getProgress();
    this.eventEmitter.emit('progress', progress);
  }

  async startObserving(): Promise<void> {
    try {
      // This lock prevents the observer from being stopped before it has fully been started.
      // t is not used as a mechanism to prevent the observer from being started twice.
      await this.lock.acquire();

      // Prevent the observer from being started twice
      if (this.isStarted) {
        console.warn('The queue progress observer has already been started.');
        return;
      }
      this.isStarted = true;

      this.totalCount = 0;
      this.processedCount = 0;
      this.totalTimeSpent = 0;
      this.lastCommitTimestamp = Date.now();

      this.subscriptions.add(
        this.queue.addListener('push', () => {
          this.totalCount += 1;
          this.triggerObservers();
        }),
      );
      this.subscriptions.add(
        this.queue.addListener('commit', (e) => {
          this.processedCount += e.sequenceIds.length;
          this.totalTimeSpent += Date.now() - this.lastCommitTimestamp;
          this.lastCommitTimestamp = Date.now();
          this.triggerObservers();
        }),
      );
      this.subscriptions.add(
        this.queue.addListener('clear', () => {
          this.totalCount = 0;
          this.processedCount = 0;
          this.totalTimeSpent = 0;
          this.lastCommitTimestamp = Date.now();
          this.triggerObservers();
        }),
      );

      this.totalCount += await this.queue.size();

      this.triggerObservers();
    } finally {
      this.lock.release();
    }
  }

  async stopObserving(): Promise<void> {
    try {
      await this.lock.acquire();

      if (this.isStarted === false) {
        console.warn('The queue progress observer has already been stopped.');
        return;
      }
      this.isStarted = false;

      this.subscriptions.destroy();
    } finally {
      this.lock.release();
    }
  }

  getProgress(): Progress {
    let remaining: number | null = null;

    if (this.totalTimeSpent > 0) {
      const remainingItems = this.totalCount - this.processedCount;
      if (remainingItems > 0) {
        remaining = (this.totalTimeSpent / this.processedCount / 1000) * (this.totalCount - this.processedCount);
      }
    }

    return {
      total: this.totalCount,
      value: this.processedCount,
      percentage: this.totalCount > 0 ? Math.min(this.processedCount / this.totalCount, 1) : 0,
      remaining,
    };
  }

  addListener(observer: ObserveProgressListener): Subscription {
    return this.eventEmitter.addListener('progress', observer);
  }
}
