// Copyright 2021 Storro B.V.
// All rights reserved.
// Dit werk is auteursrechtelijk beschermd.
//
// ContentLevelChunker.ts implements a
// content level chunker which is a simple version of
// Zooid/Source/Serialization/ContentLevelChunker.h
//
// This version is simpler in the sense that we only
// support linear writing, no mid-stream writes.
// This simplifies things a lot.
//
import { concat } from '../Util/Concat';
import { Ecas } from './Ecas';
import { EcasKey } from './EcasKey';
import { EcasValue } from './EcasValue';
import { MerkleTreeWriter } from './MerkleTreeWriter';
import { StreamChunker } from './StreamChunker';
import { StreamKey } from './StreamKey';

// This is the maximum chunk size we can maintain without failures.
// const maximumChunkSize = 102368; (100 kb - 32 bytes)
// Anything higher will give PUT failures with Zooid's ValueServer
// claiming the body is empty.
const minimumChunkSize = 262144;
const maximumChunkSize = 2097152;

export class ContentLevelChunker {
  contentBuffer: Uint8Array;
  streamChunker: StreamChunker;
  merkleTreeWriter: MerkleTreeWriter;
  canceled: boolean;
  bytesPut: number;
  putCallBuffer: Array<Promise<EcasKey>>;

  constructor(
    private ecas: Ecas,
    private cancelCallback: () => Promise<boolean>,
    private uploadCallback: ((uploadedSize: number) => Promise<void>) | undefined = undefined,
  ) {
    this.contentBuffer = new Uint8Array();
    this.bytesPut = 0;
    // Currently set to small chunks 32768 for testing purposes.
    this.streamChunker = new StreamChunker(minimumChunkSize, maximumChunkSize, 1); // Large chunks value 2097152
    // The merkle tree writer needs its own Stream Chunker that
    // has a unit size of Chunk.chunkMetadataSize().
    this.merkleTreeWriter = new MerkleTreeWriter(ecas, 1);
    this.putCallBuffer = [];
    this.canceled = false;
  }

  public async addContent(incoming: Uint8Array): Promise<void> {
    // Add the incoming bytes to the existing content buffer.
    this.contentBuffer = concat([this.contentBuffer, incoming]);

    // While there is enough in the content buffer, keep taking chunks out.
    //
    // TODO Instead of resizing the buffer every time, keep offsets and only
    // resize once at the end.
    while (this.contentBuffer.byteLength >= this.streamChunker.maximumChunkSize()) {
      await this.takeChunk();
    }
  }

  public async read(offset: number, length: number): Promise<Uint8Array> {
    if (length === 0) throw 'Should not call read() with zero length';
    const merkleTreeWriterSize = this.merkleTreeWriter.size();
    // The write can fall (partially) in the merkle tree writer's domain.
    if (offset + length <= merkleTreeWriterSize) {
      // The read() falls completely inside the merkle tree writer domain.
      return this.merkleTreeWriter.read(offset, length);
    }
    if (offset < merkleTreeWriterSize) {
      // The read() falls partially inside of the merkle tree writer.
      // Read the merkle tree writer part.
      const firstLength = merkleTreeWriterSize - offset;
      const firstBytes = await this.merkleTreeWriter.read(offset, firstLength);
      // Then read the content buffer part
      const cbLength = offset + length - merkleTreeWriterSize;
      const secondBytes = this.contentBuffer.slice(0, cbLength);
      return concat([firstBytes, secondBytes]);
    }
    // The read falls past the merkle tree writer.
    offset -= merkleTreeWriterSize;
    return this.contentBuffer.slice(offset, offset + length);
  }

  private async takeChunk(): Promise<void> {
    if (await this.cancelCallback()) throw Error('Aborted');

    // Wait for space in the put queue.
    while (this.putCallBuffer.length > 10) {
      await this.putCallBuffer[0];
      this.putCallBuffer.shift();
    }

    // Keep chunking while we have enough content to create a chunk.
    const offset = this.merkleTreeWriter.size();
    const newChunkEnd = await this.streamChunker.chunkEnd(offset, this.contentBuffer);
    if (await this.cancelCallback()) throw Error('Aborted');
    const newChunk = this.contentBuffer.slice(0, newChunkEnd);
    // Prefix the content identifier
    const contentSuffix = new Uint8Array([67, 79, 78, 84]); // Spells 'CONT'
    const versionByte = new Uint8Array(1); // A zero byte.
    const contentMarker = new Uint8Array(1); // Associated data byte marking this content.

    const ecasValue = new EcasValue(concat([newChunk, versionByte, contentSuffix]), contentMarker);
    const putPromise = this.ecas.putValue(ecasValue);
    this.putCallBuffer.push(putPromise);

    // We need to convert the EcasKey to a StreamKey asynchronously.
    const streamKeyPromise = new Promise<StreamKey>((resolve, reject) => {
      putPromise
        .then(async ecasKey => {
          // if the user abort the promise, throw an error
          if (await this.cancelCallback()) throw Error('Aborted');

          // Notify the caller about bytes put progress:
          this.bytesPut += newChunkEnd;
          if (this.uploadCallback) {
            // The ContentLevelChunker will quite often be called inside a WebWorker using ComLink.
            // That means we need to proxy the uploadCallback object which makes all functions async.
            // That is why we need to await this seemingly boolean returning function.
            await this.uploadCallback(this.bytesPut);
          }

          const s = new StreamKey(ecasKey.getLocator(), ecasKey.getValueHash());
          resolve(s);
        })
        .catch(reason => reject(reason));
    });
    // The StreamKey needs to be added to the merkle tree writer directly (no delay),
    // otherwise we get async out of order effects between chunks.
    await this.merkleTreeWriter.appendChunk(streamKeyPromise, newChunkEnd, newChunkEnd);
    this.contentBuffer = this.contentBuffer.slice(newChunkEnd);
  }

  public size(): number {
    return this.contentBuffer.byteLength + this.merkleTreeWriter.size();
  }

  public async rootKey(): Promise<StreamKey | undefined> {
    while (this.contentBuffer.byteLength > 0) {
      await this.takeChunk();
    }
    return this.merkleTreeWriter.rootKey();
  }

  public destroy(): void {
    if (this.streamChunker) {
      this.streamChunker.destroy();
    }
    if (this.merkleTreeWriter) {
      this.merkleTreeWriter.destroy();
    }
  }
}
