// Copyright 2021 Storro B.V.
// All rights reserved.
// Dit werk is auteursrechtelijk beschermd.
//
import { logger } from '../Logger';
import { concat } from '../Util/Concat';
import { Ecas } from './Ecas';
import { EcasValue } from './EcasValue';
import { extractContent, isContentChunk, isMetadataChunk, parseMetaChunk } from './MerkleTree';
import { StreamKey } from './StreamKey';

// A fancy typed pair.
class SubReader {
  constructor(
    public offset: number,
    public contentSize: number,
    public reader: MerkleTreeReader,
  ) {}
}

// SubReadResult provides the expected length of the result and the result.
class SubReadResult {
  constructor(
    private result: Promise<Uint8Array>,
    private expectedLength: number,
  ) {}

  // Only return the result when it has the correct length.
  async get(): Promise<Uint8Array> {
    const myResult = await this.result;
    if (myResult.length !== this.expectedLength) {
      logger.debug(`SubReadResult get(): ${myResult.length} ${this.expectedLength}`);
      throw new Error(`read() result was of unexpected length (${myResult.length} !== ${this.expectedLength})`);
    }
    return myResult;
  }
}

export class MerkleTreeReader {
  // valuePromise gets set with the result of the GET of the key.
  // It signals to other callers that we already performed a get.
  // The first caller will make sure to parse the GET result and
  // set either the contentBuffer or subreaders.
  //
  // Once this is set the first caller sets the expanded Promise.
  // Later callers must check expanded for a valid Promise which could be set.
  // They must not perform any GET calls or value expansion and parsing,
  // just wait for the expanded Promise.
  private valuePromise: Promise<EcasValue> | null;
  private contentBuffer: Uint8Array | null;
  private subreaders: Array<SubReader>;
  private expanded: Promise<void> | undefined;

  constructor(
    private ecas: Ecas,
    private key: StreamKey,
  ) {}

  // Make a copy of this MerkleTreeReader (without current cache)
  public clone(): MerkleTreeReader {
    return new MerkleTreeReader(this.ecas, this.key);
  }

  // Returns true when the MerkleTreeReader has a filled cache. The cache of
  // a MerkleTreeReader can be significant (1MB+).
  public get hasCache(): boolean {
    return this.valuePromise || this.contentBuffer || this.subreaders.length > 0 || this.expanded ? true : false;
  }

  async subRead(offset: number, length: number): Promise<Uint8Array> {
    // This needs a cached non-empty subreader map in this.subreaders.
    if (!this.subreaders) throw new Error('Can not have undefined subreaders');
    if (this.subreaders.length === 0) throw new Error('Can not have empty subreaders');
    let i = 0;
    const promises = new Array<SubReadResult>();

    while (length > 0) {
      while (i < this.subreaders.length && offset >= this.subreaders[i].contentSize) {
        // These subchunks all lie completely before the start of our read() offset.
        offset -= this.subreaders[i].contentSize;
        i++;
      }
      if (i >= this.subreaders.length) {
        // We passed the end of the chunk map. Return what we have.
        break;
      }
      // We are not yet at the end of the chunk map.
      // This current chunk falls in our (offset, length) range.
      const thisLength = Math.min(length, this.subreaders[i].contentSize - offset);
      const subPromise = this.subreaders[i].reader.readInternal(offset, thisLength);
      promises.push(new SubReadResult(subPromise, thisLength));
      length -= thisLength;
      i++;
      offset = 0;
    }

    // After requesting all the read() calls, wait for the results here.
    const result = new Array<Uint8Array>();
    for (const promise of promises) {
      result.push(await promise.get());
    }
    return concat(result);
  }

  private async expand(): Promise<void> {
    // If we already parsed the value before, ignore the call.
    if (this.contentBuffer) return;
    if (this.subreaders) return;
    // Check if we are in the process of fetching the value right now.
    if (!this.valuePromise) {
      // If the value has not been fetched, fetch it once.
      this.valuePromise = this.ecas.getValue(this.key.toEcasKey());
    }
    // Wait for the result of the fetch.
    const ecasValue = await this.valuePromise;
    const value = ecasValue.getValue();

    // Parse the value and return the read result.
    if (isContentChunk(value)) {
      // This is a content chunk. Strip the header and cache it.
      this.contentBuffer = extractContent(value);
      // Release the value promise asap to minimize memory usage.
      this.valuePromise = null;
      return;
    }
    if (isMetadataChunk(value)) {
      // This chunk describes chunks below it in the tree.
      // Parse it and construct a subreader for every child.
      const subReaders = new Array<SubReader>();
      const chunks = parseMetaChunk(this.ecas.convEnc().realmSalt, value, this.key.getContentKey());
      let myOffset = 0;
      // Ordering is important in this for loop.
      for (const chunk of chunks) {
        const r = new MerkleTreeReader(this.ecas, await chunk.streamKey);
        subReaders.push(new SubReader(myOffset, chunk.contentSize, r));
        myOffset += chunk.contentSize;
      }
      // Only assign the value when everything worked without exception.
      this.subreaders = subReaders;
      // Release the value promise asap to minimize memory usage.
      this.valuePromise = null;
      return;
    }
    // This is an invalid state.
    throw new Error('Fetched chunk is neither content nor metadata');
  }

  // Fetches the value and parses it to a content buffer or sub-readers.
  // After this call either contentBuffer or subreaders should be present.
  async fetchAndParseValue(): Promise<void> {
    // Any non-first caller will wait for the first caller to sort things out.
    if (this.expanded) {
      await this.expanded;
      return;
    }

    // The first caller stops any other callers from coming here.
    this.expanded = this.expand();

    return this.expanded;
  }

  // There can be only one read() call active at the same time.
  // Please await the read() before calling again.
  // This is because the internal content buffers might be cleared while read()ing.
  async read(offset: number, length: number): Promise<Uint8Array> {
    // We prefetch 8 MB on every read() because
    // in the current setup we always do linear reading.
    const prefetchSize = 8388608;

    // We do not wait for prefetch because it should only issue the requests
    // without us waiting on the results.
    // Important: We do need to connect something to the Promise because
    // a failing unconnected Promise will destroy the JVM.
    this.prefetch(offset, length + prefetchSize).catch(e => {
      logger.info('Prefetch failed: ', e);
    });

    return this.readInternal(offset, length);
  }

  // Prefetch issues getValue() requests to the ecas before
  // we get a read() call that needs those keys.
  // If you want to be sure all keys have arrived it is possible to wait
  // on the prefetch() result but that is not the intended use.
  async prefetch(offset: number, length: number): Promise<void> {
    if (this.expanded) {
      // If we already fetch and parsed before, just wait for it to complete.
      await this.expanded;
      return;
    }
    // If we did not fetch and parse before, do so, wait for the result,
    // and if necessary fetch and parse the blocks below.
    await this.fetchAndParseValue();

    // If we prefetched content we're done.
    if (this.contentBuffer) return;
    // If we are at a metadata level, then prefetch the relevant children below too.
    if (this.subreaders) {
      for (let i = 0; i < this.subreaders.length; i++) {
        // Skip all subreaders which end before offset.
        while (i < this.subreaders.length && this.subreaders[i].offset + this.subreaders[i].contentSize <= offset) {
          i++;
        }
        // Prefetch all subreaders which start before the prefetch end (fall inside of range).
        const promises = new Array<Promise<void>>();
        while (i < this.subreaders.length && this.subreaders[i].offset < offset + length) {
          const reloffset = offset - this.subreaders[i].offset;
          if (reloffset >= 0) {
            // Our offset lies beyond the start of the subreader.
            promises.push(this.subreaders[i].reader.prefetch(reloffset, length));
          } else {
            // Our precache offset begins before the subreader.
            promises.push(this.subreaders[i].reader.prefetch(0, length + reloffset));
          }
          i++;
        }
        await Promise.all(promises);
      }
    }
  }

  async readInternal(offset: number, length: number): Promise<Uint8Array> {
    // Wait for the value to be present and parsed.
    await this.fetchAndParseValue();

    if (this.contentBuffer) {
      // We have previously fetched this chunk and cached it as a content chunk.
      const result = this.contentBuffer.slice(offset, offset + length);
      // Clear the content buffer as soon as we read until the end
      // to minimize memory usage.
      if (offset + length >= this.contentBuffer.length) {
        this.expanded = undefined;
        this.contentBuffer = null;
      }
      return result;
    }
    if (this.subreaders) {
      // Cached the parsed chunk as smaller merkle trees.
      return this.subRead(offset, length);
    }
    throw new Error('Value in invalid state');
  }

  rootKey(): StreamKey {
    return this.key;
  }

  async size(): Promise<number> {
    if (this.contentBuffer) {
      // We have previously fetched this chunk and cached it as a content chunk.
      return this.contentBuffer.length;
    }
    if (this.subreaders) {
      if (this.subreaders.length === 0) throw new Error('Invalid length for sub readers array');
      // Cached the parsed chunk as smaller merkle trees.
      const lastReader = this.subreaders[this.subreaders.length - 1];
      return lastReader.offset + lastReader.contentSize;
    }
    // Parse the value and retry size().
    await this.fetchAndParseValue();
    return this.size();
  }
}
