// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
const { listen, copy, toAsyncIterator } = Deno;
type Conn = Deno.Conn;
type Reader = Deno.Reader;
type Writer = Deno.Writer;
import { BufReader, BufState, BufWriter } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { STATUS_TEXT } from "./http_status.ts";
import { assert } from "../testing/asserts.ts";

interface Deferred {
  promise: Promise<{}>;
  resolve: () => void;
  reject: () => void;
}

function deferred(isResolved = false): Deferred {
  let resolve, reject;
  const promise = new Promise(
    (res, rej): void => {
      resolve = res;
      reject = rej;
    }
  );
  if (isResolved) {
    resolve();
  }
  return {
    promise,
    resolve,
    reject
  };
}

interface HttpConn extends Conn {
  // When read by a newly created request B, lastId is the id pointing to a previous
  // request A, such that we must wait for responses to A to complete before
  // writing B's response.
  lastPipelineId: number;
  pendingDeferredMap: Map<number, Deferred>;
}

function createHttpConn(c: Conn): HttpConn {
  const httpConn = Object.assign(c, {
    lastPipelineId: 0,
    pendingDeferredMap: new Map()
  });

  const resolvedDeferred = deferred(true);
  httpConn.pendingDeferredMap.set(0, resolvedDeferred);
  return httpConn;
}

function bufWriter(w: Writer): BufWriter {
  if (w instanceof BufWriter) {
    return w;
  } else {
    return new BufWriter(w);
  }
}
export function setContentLength(r: Response): void {
  if (!r.headers) {
    r.headers = new Headers();
  }

  if (r.body) {
    if (!r.headers.has("content-length")) {
      if (r.body instanceof Uint8Array) {
        const bodyLength = r.body.byteLength;
        r.headers.append("Content-Length", bodyLength.toString());
      } else {
        r.headers.append("Transfer-Encoding", "chunked");
      }
    }
  }
}
async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
  const writer = bufWriter(w);
  const encoder = new TextEncoder();

  for await (const chunk of toAsyncIterator(r)) {
    if (chunk.byteLength <= 0) continue;
    const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`);
    const end = encoder.encode("\r\n");
    await writer.write(start);
    await writer.write(chunk);
    await writer.write(end);
  }

  const endChunk = encoder.encode("0\r\n\r\n");
  await writer.write(endChunk);
}
export async function writeResponse(w: Writer, r: Response): Promise<void> {
  const protoMajor = 1;
  const protoMinor = 1;
  const statusCode = r.status || 200;
  const statusText = STATUS_TEXT.get(statusCode);
  const writer = bufWriter(w);
  if (!statusText) {
    throw Error("bad status code");
  }

  let out = `HTTP/${protoMajor}.${protoMinor} ${statusCode} ${statusText}\r\n`;

  setContentLength(r);

  if (r.headers) {
    for (const [key, value] of r.headers) {
      out += `${key}: ${value}\r\n`;
    }
  }
  out += "\r\n";

  const header = new TextEncoder().encode(out);
  let n = await writer.write(header);
  assert(header.byteLength == n);

  if (r.body) {
    if (r.body instanceof Uint8Array) {
      n = await writer.write(r.body);
      assert(r.body.byteLength == n);
    } else {
      if (r.headers.has("content-length")) {
        const bodyLength = parseInt(r.headers.get("content-length"));
        const n = await copy(writer, r.body);
        assert(n == bodyLength);
      } else {
        await writeChunkedBody(writer, r.body);
      }
    }
  }
  await writer.flush();
}
async function readAllIterator(
  it: AsyncIterableIterator<Uint8Array>
): Promise<Uint8Array> {
  const chunks = [];
  let len = 0;
  for await (const chunk of it) {
    chunks.push(chunk);
    len += chunk.length;
  }
  if (chunks.length === 0) {
    // No need for copy
    return chunks[0];
  }
  const collected = new Uint8Array(len);
  let offset = 0;
  for (let chunk of chunks) {
    collected.set(chunk, offset);
    offset += chunk.length;
  }
  return collected;
}

export class ServerRequest {
  pipelineId: number;
  url: string;
  method: string;
  proto: string;
  headers: Headers;
  conn: HttpConn;
  r: BufReader;
  w: BufWriter;

  public async *bodyStream(): AsyncIterableIterator<Uint8Array> {
    if (this.headers.has("content-length")) {
      const len = +this.headers.get("content-length");
      if (Number.isNaN(len)) {
        return new Uint8Array(0);
      }
      let buf = new Uint8Array(1024);
      let rr = await this.r.read(buf);
      let nread = rr.nread;
      while (!rr.eof && nread < len) {
        yield buf.subarray(0, rr.nread);
        buf = new Uint8Array(1024);
        rr = await this.r.read(buf);
        nread += rr.nread;
      }
      yield buf.subarray(0, rr.nread);
    } else {
      if (this.headers.has("transfer-encoding")) {
        const transferEncodings = this.headers
          .get("transfer-encoding")
          .split(",")
          .map((e): string => e.trim().toLowerCase());
        if (transferEncodings.includes("chunked")) {
          // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
          const tp = new TextProtoReader(this.r);
          let [line] = await tp.readLine();
          // TODO: handle chunk extension
          let [chunkSizeString] = line.split(";");
          let chunkSize = parseInt(chunkSizeString, 16);
          if (Number.isNaN(chunkSize) || chunkSize < 0) {
            throw new Error("Invalid chunk size");
          }
          while (chunkSize > 0) {
            let data = new Uint8Array(chunkSize);
            let [nread] = await this.r.readFull(data);
            if (nread !== chunkSize) {
              throw new Error("Chunk data does not match size");
            }
            yield data;
            await this.r.readLine(); // Consume \r\n
            [line] = await tp.readLine();
            chunkSize = parseInt(line, 16);
          }
          const [entityHeaders, err] = await tp.readMIMEHeader();
          if (!err) {
            for (let [k, v] of entityHeaders) {
              this.headers.set(k, v);
            }
          }
          /* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6
          length := 0
          read chunk-size, chunk-extension (if any) and CRLF
          while (chunk-size > 0) {
            read chunk-data and CRLF
            append chunk-data to entity-body
            length := length + chunk-size
            read chunk-size and CRLF
          }
          read entity-header
          while (entity-header not empty) {
            append entity-header to existing header fields
            read entity-header
          }
          Content-Length := length
          Remove "chunked" from Transfer-Encoding
          */
          return; // Must return here to avoid fall through
        }
        // TODO: handle other transfer-encoding types
      }
      // Otherwise...
      yield new Uint8Array(0);
    }
  }

  // Read the body of the request into a single Uint8Array
  public async body(): Promise<Uint8Array> {
    return readAllIterator(this.bodyStream());
  }

  async respond(r: Response): Promise<void> {
    // Check and wait if the previous request is done responding.
    const lastPipelineId = this.pipelineId - 1;
    const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
      lastPipelineId
    );
    assert(!!lastPipelineDeferred);
    await lastPipelineDeferred.promise;
    // If yes, delete old deferred and proceed with writing.
    this.conn.pendingDeferredMap.delete(lastPipelineId);
    // Write our response!
    await writeResponse(this.w, r);
    // Signal the next pending request that it can start writing.
    const currPipelineDeferred = this.conn.pendingDeferredMap.get(
      this.pipelineId
    );
    assert(!!currPipelineDeferred);
    currPipelineDeferred.resolve();
  }
}

interface ServeEnv {
  reqQueue: ServerRequest[];
  serveDeferred: Deferred;
}

/** Continuously read more requests from conn until EOF
 * Calls maybeHandleReq.
 * bufr is empty on a fresh TCP connection.
 * Would be passed around and reused for later request on same conn
 * TODO: make them async function after this change is done
 * https://github.com/tc39/ecma262/pull/1250
 * See https://v8.dev/blog/fast-async
 */
async function readRequest(
  c: HttpConn,
  bufr?: BufReader
): Promise<[ServerRequest, BufState]> {
  if (!bufr) {
    bufr = new BufReader(c);
  }
  const bufw = new BufWriter(c);
  const req = new ServerRequest();

  // Set and incr pipeline id;
  req.pipelineId = ++c.lastPipelineId;
  // Set a new pipeline deferred associated with this request
  // for future requests to wait for.
  c.pendingDeferredMap.set(req.pipelineId, deferred());

  req.conn = c;
  req.r = bufr!;
  req.w = bufw;
  const tp = new TextProtoReader(bufr!);

  let s: string;
  let err: BufState;

  // First line: GET /index.html HTTP/1.0
  [s, err] = await tp.readLine();
  if (err) {
    return [null, err];
  }
  [req.method, req.url, req.proto] = s.split(" ", 3);

  [req.headers, err] = await tp.readMIMEHeader();

  return [req, err];
}

function maybeHandleReq(
  env: ServeEnv,
  conn: Conn,
  maybeReq: [ServerRequest, BufState]
): void {
  const [req, _err] = maybeReq;
  if (_err) {
    conn.close(); // assume EOF for now...
    return;
  }
  env.reqQueue.push(req); // push req to queue
  env.serveDeferred.resolve(); // signal while loop to process it
}

function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
  readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}

export async function* serve(
  addr: string
): AsyncIterableIterator<ServerRequest> {
  const listener = listen("tcp", addr);
  const env: ServeEnv = {
    reqQueue: [], // in case multiple promises are ready
    serveDeferred: deferred()
  };

  // Routine that keeps calling accept
  let handleConn = (_conn: Conn): void => {};
  let scheduleAccept = (): void => {};
  const acceptRoutine = (): void => {
    scheduleAccept = (): void => {
      listener.accept().then(handleConn);
    };
    handleConn = (conn: Conn): void => {
      const httpConn = createHttpConn(conn);
      serveConn(env, httpConn); // don't block
      scheduleAccept(); // schedule next accept
    };

    scheduleAccept();
  };

  acceptRoutine();

  // Loop hack to allow yield (yield won't work in callbacks)
  while (true) {
    await env.serveDeferred.promise;
    env.serveDeferred = deferred(); // use a new deferred
    let queueToProcess = env.reqQueue;
    env.reqQueue = [];
    for (const result of queueToProcess) {
      yield result;
      // Continue read more from conn when user is done with the current req
      // Moving this here makes it easier to manage
      serveConn(env, result.conn, result.r);
    }
  }
  listener.close();
}

export async function listenAndServe(
  addr: string,
  handler: (req: ServerRequest) => void
): Promise<void> {
  const server = serve(addr);

  for await (const request of server) {
    await handler(request);
  }
}

export interface Response {
  status?: number;
  headers?: Headers;
  body?: Uint8Array | Reader;
}