import { logger } from '@/logger';

import { EventEmitter, Listener, Subscription } from '../event-emitter';

import { AsyncLock } from './async-lock';
import {
  InternalQueueItem,
  PartitionQueue,
  PartitionQueueEventMap,
  PartitionQueueRepository,
  PullConfig,
  PullTimeoutError,
  QueueItem,
  StringKey,
} from './partition-queue.types';

export const HIGH_PRIORITY = 0;
export const MEDIUM_PRIORITY = 5;
export const LOW_PRIORITY = 10;

interface PulledQueueItem<T, P extends string = string> extends InternalQueueItem<T, P> {
  commitTimeoutId: ReturnType<typeof setTimeout> | null;
}

export class DefaultPartitionQueue<T extends Record<string, unknown>> implements PartitionQueue<T> {
  private static readonly DEFAULT_CONCURRENT_PULLS = 1;

  private sessionId: string | null = null;

  private pulledItems: PulledQueueItem<unknown>[] = [];

  // All the operations in the queue are asynchronous, but we can only run one at a time to avoid inconsistencies.
  // Use this lock in all operations.
  private readonly operationLock = new AsyncLock();

  private readonly events = new EventEmitter<PartitionQueueEventMap<StringKey<T>>>();

  constructor(
    private readonly repository: PartitionQueueRepository<T>,
    private readonly concurrentPulls = DefaultPartitionQueue.DEFAULT_CONCURRENT_PULLS,
  ) {}

  initialize(sessionId: string): void {
    if (this.sessionId !== null && this.sessionId === sessionId) {
      throw new Error('The PartitionQueue has already been initialized with another session id');
    }

    this.sessionId = sessionId;
  }

  async push<P extends StringKey<T>>(partition: P, value: QueueItem<T[P]>, priority = 5): Promise<void> {
    logger.info('QUEUE.PUSH', { partition, value, priority, sessionId: this.sessionId });

    if (!this.sessionId) {
      throw new Error('The PartitionQueue has not been initialized yet');
    }

    // Validates the priority
    if (!Number.isInteger(priority)) {
      throw new Error(`Priority must be an integer, received ${priority}`);
    }
    if (priority < HIGH_PRIORITY || priority > LOW_PRIORITY) {
      throw new Error(`Priority must be between ${HIGH_PRIORITY} and ${LOW_PRIORITY}, received ${priority}`);
    }

    // Adds to the queue
    try {
      await this.operationLock.acquire();
      await this.repository.addQueueItem(this.sessionId, {
        ...value,
        partition,
        priority,
      });
      this.events.emit('push', { partition, sequenceId: value.sequenceId });
    } finally {
      this.operationLock.release();
    }
  }

  private async performCommitTimeout<P extends StringKey<T>>(item: InternalQueueItem<T[P], P>): Promise<void> {
    try {
      await this.operationLock.acquire();

      // Check if item is really still pending a commit
      const pendingItem = this.pulledItems.find(
        (i) => i.partition === item.partition && i.sequenceId === item.sequenceId,
      );
      if (!pendingItem) {
        // If it's not, we can safely return
        return;
      }

      // Otherwise, put the item back in the queue
      this.pulledItems = this.pulledItems.filter(
        (i) => i.partition !== item.partition || i.sequenceId !== item.sequenceId,
      );

      // Emit the timeout event
      this.events.emit('commitTimeout', { partition: item.partition, sequenceId: item.sequenceId });
    } finally {
      this.operationLock.release();
    }
  }

  private async findAndPullNextItem<P extends StringKey<T>>(
    partition: P,
    commitTimeout: number | undefined,
  ): Promise<QueueItem<T[P]> | null> {
    if (!this.sessionId) {
      throw new Error('The PartitionQueue has not been initialized yet');
    }

    try {
      await this.operationLock.acquire();

      const itemsInFlight = this.pulledItems.length;
      if (itemsInFlight >= this.concurrentPulls) {
        return null;
      }

      const nextItem = await this.repository.getNextQueueItem<P>(this.sessionId, itemsInFlight);
      if (nextItem === null || nextItem.partition !== partition) {
        return null;
      }

      let commitTimeoutId: ReturnType<typeof setTimeout> | null = null;
      if (commitTimeout !== undefined && commitTimeout > 0) {
        commitTimeoutId = setTimeout(() => {
          this.performCommitTimeout(nextItem);
        }, commitTimeout);
      }

      this.pulledItems.push({ ...nextItem, commitTimeoutId });

      return nextItem;
    } finally {
      this.operationLock.release();
    }
  }

  private async waitAndPullNextItem<P extends StringKey<T>>(
    partition: P,
    pullConfig?: PullConfig,
  ): Promise<QueueItem<T[P]>> {
    return new Promise<QueueItem<T[P]>>((resolve, reject) => {
      const eventsToListen = ['reset', 'push', 'commit', 'commitTimeout'] as const;

      // More than one event can be triggered at the same time, and we need to make sure
      // we don't try to pull two items. If we already pulled one, we should ignore the other events.
      const eventLock = new AsyncLock();
      let pullTimeoutId: ReturnType<typeof window.setTimeout> | null = null;
      let found = false;

      const listener = async () => {
        await eventLock.acquire();

        try {
          if (found) {
            return;
          }

          const nextItem = await this.findAndPullNextItem(partition, pullConfig?.commitTimeout);
          if (!nextItem) {
            return;
          }

          if (pullTimeoutId !== null) {
            clearTimeout(pullTimeoutId);
          }

          found = true;
          this.events.removeListeners(eventsToListen, listener);
          resolve(nextItem);
        } catch (error) {
          this.events.removeListeners(eventsToListen, listener);
          reject(error);
        } finally {
          eventLock.release();
        }
      };

      if (pullConfig?.pullTimeout) {
        pullTimeoutId = setTimeout(async () => {
          await eventLock.acquire();

          try {
            if (found === false) {
              this.events.removeListeners(eventsToListen, listener);
              reject(new PullTimeoutError());
            }
          } finally {
            eventLock.release();
          }
        }, pullConfig.pullTimeout);
      }

      this.events.addListeners(eventsToListen, listener);
    });
  }

  async pull<P extends StringKey<T>>(partition: P, config?: PullConfig): Promise<QueueItem<T[P]>> {
    logger.info('QUEUE.PULL', { partition, config, sessionId: this.sessionId });

    // Check if there's a pending node in the priority queue
    // If it exists, return it.
    const nextNode = await this.findAndPullNextItem(partition, config?.commitTimeout);
    if (nextNode) {
      return nextNode;
    }

    // Otherwise, listen to to the reset, push and commit events and repeat
    // the previous logic until we find a node
    return await this.waitAndPullNextItem(partition, config);
  }

  async commit<P extends StringKey<T>>(partition: P, sequenceId: number): Promise<QueueItem<T[P]>[]> {
    logger.info('QUEUE.COMMIT', { partition, sequenceId, sessionId: this.sessionId });

    if (!this.sessionId) {
      throw new Error('The PartitionQueue has not been initialized yet');
    }

    try {
      await this.operationLock.acquire();

      // Check if the sequence is pending
      const pendingItems = this.pulledItems.filter((i) => i.partition === partition && i.sequenceId <= sequenceId);
      if (pendingItems.length === 0) {
        return [];
      }

      const sessionId = this.sessionId;
      await Promise.all(
        pendingItems.map(async (pendingItem) => {
          // Remove the commit timeout, if any
          if (pendingItem.commitTimeoutId) {
            clearTimeout(pendingItem.commitTimeoutId);
          }

          // Delete from the queue
          await this.repository.removeQueueItem(sessionId, partition, pendingItem.sequenceId);
        }),
      );

      // Update the pulled items array
      this.pulledItems = this.pulledItems.filter((i) => i.partition !== partition || i.sequenceId > sequenceId);
      // Emit the commit event
      this.events.emit('commit', { partition, sequenceIds: pendingItems.map((i) => i.sequenceId) });

      logger.info('QUEUE.COMMITTED', {
        partition,
        sequenceIds: pendingItems.map((i) => i.sequenceId),
        sessionId: this.sessionId,
      });

      return pendingItems as QueueItem<T[P]>[];
    } finally {
      this.operationLock.release();
    }
  }

  async reset<P extends StringKey<T>>(partition: P): Promise<void> {
    logger.info('QUEUE.RESET', { partition, sessionId: this.sessionId });

    try {
      await this.operationLock.acquire();

      // Clear all in-flight items, removing the commit timeout
      this.pulledItems = this.pulledItems.filter((item) => {
        if (item.partition !== partition) {
          return true;
        }
        if (item.commitTimeoutId) {
          clearTimeout(item.commitTimeoutId);
        }
        return false;
      });

      // Emit the reset event
      this.events.emit('reset', { partition });
    } finally {
      this.operationLock.release();
    }
  }

  async clear(): Promise<void> {
    logger.info('QUEUE.CLEAR', { sessionId: this.sessionId });

    if (!this.sessionId) {
      throw new Error('The PartitionQueue has not been initialized yet');
    }

    try {
      await this.operationLock.acquire();

      for (const pulledItem of this.pulledItems) {
        if (pulledItem.commitTimeoutId) {
          clearTimeout(pulledItem.commitTimeoutId);
        }
      }

      await this.repository.clear(this.sessionId);

      // Emit the reset event
      this.events.emit('clear', null);
    } finally {
      this.operationLock.release();
    }
  }

  getLatestSequenceId<P extends StringKey<T>>(partition: P): number | null {
    if (!this.sessionId) {
      throw new Error('The PartitionQueue has not been initialized yet');
    }

    return this.repository.getLatestSequenceId(this.sessionId, partition);
  }

  async size<P extends StringKey<T>>(partition?: P, excludePendingItems?: boolean): Promise<number> {
    if (!this.sessionId) {
      throw new Error('The PartitionQueue has not been initialized yet');
    }

    try {
      await this.operationLock.acquire();
      const size = await this.repository.size(this.sessionId, partition);
      if (excludePendingItems === true) {
        return size - this.pulledItems.length;
      }
      return size;
    } finally {
      this.operationLock.release();
    }
  }

  async isEmpty<P extends StringKey<T>>(partition?: P, excludePendingItems?: boolean): Promise<boolean> {
    const size = await this.size(partition, excludePendingItems);
    return size === 0;
  }

  addListener<P extends StringKey<T>, E extends keyof PartitionQueueEventMap<P>>(
    event: E,
    listener: Listener<E, PartitionQueueEventMap<P>[E]>,
  ): Subscription {
    // TODO - why is this typing breaking?
    return this.events.addListener(event, listener as never);
  }
}
