// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

use super::TestEvent;
use super::TestStdioStream;
use deno_core::futures::future::poll_fn;
use deno_core::parking_lot;
use deno_core::parking_lot::lock_api::RawMutex;
use deno_core::parking_lot::lock_api::RawMutexTimed;
use deno_runtime::deno_io::pipe;
use deno_runtime::deno_io::AsyncPipeRead;
use deno_runtime::deno_io::PipeRead;
use deno_runtime::deno_io::PipeWrite;
use memmem::Searcher;
use std::fmt::Display;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::task::ready;
use std::task::Poll;
use std::time::Duration;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::ReadBuf;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::WeakUnboundedSender;

/// 8-byte sync marker that is unlikely to appear in normal output. Equivalent
/// to the string `"\u{200B}\0\u{200B}\0"`.
const SYNC_MARKER: &[u8; 8] = &[226, 128, 139, 0, 226, 128, 139, 0];
const HALF_SYNC_MARKER: &[u8; 4] = &[226, 128, 139, 0];

const BUFFER_SIZE: usize = 4096;

/// The test channel has been closed and cannot be used to send further messages.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct ChannelClosedError;

impl std::error::Error for ChannelClosedError {}

impl Display for ChannelClosedError {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    f.write_str("Test channel closed")
  }
}

impl<T> From<SendError<T>> for ChannelClosedError {
  fn from(_: SendError<T>) -> Self {
    Self
  }
}

#[repr(transparent)]
struct SendMutex(*const parking_lot::RawMutex);
impl Drop for SendMutex {
  fn drop(&mut self) {
    // SAFETY: We know this was locked by the sender
    unsafe {
      (*self.0).unlock();
    }
  }
}

// SAFETY: This is a mutex, so it's safe to send a pointer to it
unsafe impl Send for SendMutex {}

/// Create a [`TestEventSenderFactory`] and [`TestEventReceiver`] pair. The [`TestEventSenderFactory`] may be
/// used to create [`TestEventSender`]s and stdio streams for multiple workers in the system. The [`TestEventReceiver`]
/// will be kept alive until the final [`TestEventSender`] is dropped.
pub fn create_test_event_channel() -> (TestEventSenderFactory, TestEventReceiver)
{
  let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
  (
    TestEventSenderFactory {
      sender,
      worker_id: Default::default(),
    },
    TestEventReceiver { receiver },
  )
}

/// Create a [`TestEventWorkerSender`] and [`TestEventReceiver`] pair.The [`TestEventReceiver`]
/// will be kept alive until the [`TestEventSender`] is dropped.
pub fn create_single_test_event_channel(
) -> (TestEventWorkerSender, TestEventReceiver) {
  let (factory, receiver) = create_test_event_channel();
  (factory.worker(), receiver)
}

/// Polls for the next [`TestEvent`] from any worker. Events from multiple worker
/// streams may be interleaved.
pub struct TestEventReceiver {
  receiver: UnboundedReceiver<(usize, TestEvent)>,
}

impl TestEventReceiver {
  /// Receive a single test event, or `None` if no workers are alive.
  pub async fn recv(&mut self) -> Option<(usize, TestEvent)> {
    self.receiver.recv().await
  }
}

struct TestStream {
  id: usize,
  which: TestStdioStream,
  read_opt: Option<AsyncPipeRead>,
  sender: UnboundedSender<(usize, TestEvent)>,
}

impl TestStream {
  fn new(
    id: usize,
    which: TestStdioStream,
    pipe_reader: PipeRead,
    sender: UnboundedSender<(usize, TestEvent)>,
  ) -> std::io::Result<Self> {
    // This may fail if the tokio runtime is shutting down
    let read_opt = Some(pipe_reader.into_async()?);
    Ok(Self {
      id,
      which,
      read_opt,
      sender,
    })
  }

  /// Send a buffer to the test event channel. If the channel no longer exists, shut down the stream
  /// because we can't do anything.
  #[must_use = "If this returns false, don't keep reading because we cannot send"]
  fn send(&mut self, buffer: Vec<u8>) -> bool {
    if buffer.is_empty() {
      true
    } else if self
      .sender
      .send((self.id, TestEvent::Output(self.which, buffer)))
      .is_err()
    {
      self.read_opt.take();
      false
    } else {
      true
    }
  }

  fn is_alive(&self) -> bool {
    self.read_opt.is_some()
  }

  /// Cancellation-safe.
  #[inline]
  fn pipe(&mut self) -> impl Future<Output = ()> + '_ {
    poll_fn(|cx| self.poll_pipe(cx))
  }

  /// Attempt to read from a given stream, pushing all of the data in it into the given
  /// [`UnboundedSender`] before returning.
  fn poll_pipe(&mut self, cx: &mut std::task::Context) -> Poll<()> {
    let mut buffer = [0_u8; BUFFER_SIZE];
    let mut buf = ReadBuf::new(&mut buffer);
    let res = {
      // No more stream, we shouldn't hit this case.
      let Some(stream) = &mut self.read_opt else {
        unreachable!();
      };
      ready!(Pin::new(&mut *stream).poll_read(cx, &mut buf))
    };
    match res {
      Ok(_) => {
        let buf = buf.filled().to_vec();
        if buf.is_empty() {
          // The buffer may return empty in EOF conditions and never return an error,
          // so we need to treat this as EOF
          self.read_opt.take();
        } else {
          // Attempt to send the buffer, marking as not alive if the channel is closed
          _ = self.send(buf);
        }
      }
      Err(_) => {
        // Stream errored, so just return and mark this stream as not alive.
        _ = self.send(buf.filled().to_vec());
        self.read_opt.take();
      }
    }
    Poll::Ready(())
  }

  /// Read and "block" until the sync markers have been read.
  async fn read_until_sync_marker(&mut self) {
    let Some(file) = &mut self.read_opt else {
      return;
    };
    let mut flush = Vec::with_capacity(BUFFER_SIZE);
    loop {
      let mut buffer = [0_u8; BUFFER_SIZE];
      match file.read(&mut buffer).await {
        Err(_) | Ok(0) => {
          // EOF or error, just return. We make no guarantees about unflushed data at shutdown.
          self.read_opt.take();
          return;
        }
        Ok(read) => {
          flush.extend(&buffer[0..read]);

          // "ends_with" is cheaper, so check that first
          if flush.ends_with(HALF_SYNC_MARKER) {
            // We might have read the full sync marker.
            if flush.ends_with(SYNC_MARKER) {
              flush.truncate(flush.len() - SYNC_MARKER.len());
            } else {
              flush.truncate(flush.len() - HALF_SYNC_MARKER.len());
            }
            // Try to send our flushed buffer. If the channel is closed, this stream will
            // be marked as not alive.
            _ = self.send(flush);
            return;
          }

          // If we don't end with the marker, then we need to search the bytes we read plus four bytes
          // from before. There's still a possibility that the marker could be split because of a pipe
          // buffer that fills up, forcing the flush to be written across two writes and interleaving
          // data between, but that's a risk we take with this sync marker approach.
          let searcher = memmem::TwoWaySearcher::new(HALF_SYNC_MARKER);
          let start =
            (flush.len() - read).saturating_sub(HALF_SYNC_MARKER.len());
          if let Some(offset) = searcher.search_in(&flush[start..]) {
            flush.truncate(offset);
            // Try to send our flushed buffer. If the channel is closed, this stream will
            // be marked as not alive.
            _ = self.send(flush);
            return;
          }
        }
      }
    }
  }
}

/// A factory for creating [`TestEventSender`]s. This factory must be dropped
/// before the [`TestEventReceiver`] will complete.
pub struct TestEventSenderFactory {
  sender: UnboundedSender<(usize, TestEvent)>,
  worker_id: AtomicUsize,
}

impl TestEventSenderFactory {
  /// Create a [`TestEventWorkerSender`], along with a stdout/stderr stream.
  pub fn worker(&self) -> TestEventWorkerSender {
    let id = self.worker_id.fetch_add(1, Ordering::AcqRel);
    let (stdout_reader, stdout_writer) = pipe().unwrap();
    let (stderr_reader, stderr_writer) = pipe().unwrap();
    let (sync_sender, mut sync_receiver) =
      tokio::sync::mpsc::unbounded_channel::<(SendMutex, SendMutex)>();
    let stdout = stdout_writer.try_clone().unwrap();
    let stderr = stderr_writer.try_clone().unwrap();
    let sender = self.sender.clone();

    // Each worker spawns its own output monitoring and serialization task. This task will
    // poll the stdout/stderr streams and interleave that data with `TestEvents` generated
    // by the test runner worker.
    //
    // Note that this _must_ be a separate thread! Flushing requires locking coördination
    // on two threads and if we're blocking-locked on the mutex we've sent down the sync_receiver,
    // there's no way for us to process the actual flush operation here.
    //
    // Creating a mini-runtime to flush the stdout/stderr is the easiest way to do this, but
    // there's no reason we couldn't do it with non-blocking I/O, other than the difficulty
    // of setting up an I/O reactor in Windows.
    std::thread::spawn(move || {
      let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_io()
        .build()
        .unwrap();
      runtime.block_on(tokio::task::unconstrained(async move {
        let mut test_stdout = TestStream::new(
          id,
          TestStdioStream::Stdout,
          stdout_reader,
          sender.clone(),
        )?;
        let mut test_stderr =
          TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?;

        // This ensures that the stdout and stderr streams in the select! loop below cannot starve each
        // other.
        let mut alternate_stream_priority = false;

        // This function will be woken whenever a stream or the receiver is ready
        loop {
          alternate_stream_priority = !alternate_stream_priority;
          let (a, b) = if alternate_stream_priority {
            (&mut test_stdout, &mut test_stderr)
          } else {
            (&mut test_stderr, &mut test_stdout)
          };

          tokio::select! {
            biased; // We actually want to poll the channel first
            recv = sync_receiver.recv() => {
              match recv {
                // If the channel closed, we assume that all important data from the streams was synced,
                // so we just end this task immediately.
                None => { break },
                Some((mutex1, mutex2)) => {
                  // Two phase lock: mutex1 indicates that we are done our general read phase and are ready for
                  // the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock
                  // when the pipe is too full to accept the sync marker.
                  drop(mutex1);
                  for stream in [&mut test_stdout, &mut test_stderr] {
                    if stream.is_alive() {
                      stream.read_until_sync_marker().await;
                    }
                  }
                  drop(mutex2);
                }
              }
            }
            // Poll stdout first if `alternate_stream_priority` is true, otherwise poll stderr first.
            // This is necessary because of the `biased` flag above to avoid starvation.
            _ = a.pipe(), if a.is_alive() => {},
            _ = b.pipe(), if b.is_alive() => {},
          }
        }

        Ok::<_, std::io::Error>(())
      }))?;

      Ok::<_, std::io::Error>(())
    });

    let sender = TestEventSender {
      id,
      sender: self.sender.clone(),
      sync_sender,
      stdout_writer,
      stderr_writer,
    };

    TestEventWorkerSender {
      sender,
      stdout,
      stderr,
    }
  }

  /// A [`TestEventWeakSender`] has a unique ID, but will not keep the [`TestEventReceiver`] alive.
  /// This may be useful to add a `SIGINT` or other break handler to tests that isn't part of a
  /// specific test, but handles the overall orchestration of running tests:
  ///
  /// ```nocompile
  /// let mut cancel_sender = test_event_sender_factory.weak_sender();
  /// let sigint_handler_handle = spawn(async move {
  ///   signal::ctrl_c().await.unwrap();
  ///   cancel_sender.send(TestEvent::Sigint).ok();
  /// });
  /// ```
  pub fn weak_sender(&self) -> TestEventWeakSender {
    TestEventWeakSender {
      id: self.worker_id.fetch_add(1, Ordering::AcqRel),
      sender: self.sender.downgrade(),
    }
  }
}

pub struct TestEventWeakSender {
  pub id: usize,
  sender: WeakUnboundedSender<(usize, TestEvent)>,
}

impl TestEventWeakSender {
  pub fn send(&mut self, message: TestEvent) -> Result<(), ChannelClosedError> {
    Ok(
      self
        .sender
        .upgrade()
        .ok_or(ChannelClosedError)?
        .send((self.id, message))?,
    )
  }
}

pub struct TestEventWorkerSender {
  pub sender: TestEventSender,
  pub stdout: PipeWrite,
  pub stderr: PipeWrite,
}

/// Sends messages from a given worker into the test stream. If multiple clones of
/// this sender are kept alive, the worker is kept alive.
///
/// Any unflushed bytes in the stdout or stderr stream associated with this sender
/// are not guaranteed to be sent on drop unless flush is explicitly called.
pub struct TestEventSender {
  pub id: usize,
  sender: UnboundedSender<(usize, TestEvent)>,
  sync_sender: UnboundedSender<(SendMutex, SendMutex)>,
  stdout_writer: PipeWrite,
  stderr_writer: PipeWrite,
}

impl TestEventSender {
  pub fn send(&mut self, message: TestEvent) -> Result<(), ChannelClosedError> {
    // Certain messages require us to ensure that all output has been drained to ensure proper
    // interleaving of messages.
    if message.requires_stdio_sync() {
      self.flush()?;
    }
    Ok(self.sender.send((self.id, message))?)
  }

  /// Ensure that all output has been fully flushed by writing a sync marker into the
  /// stdout and stderr streams and waiting for it on the other side.
  pub fn flush(&mut self) -> Result<(), ChannelClosedError> {
    // Two phase lock: mutex1 indicates that we are done our general read phase and are ready for
    // the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock
    // when the pipe is too full to accept the sync marker.
    let mutex1 = parking_lot::RawMutex::INIT;
    mutex1.lock();
    let mutex2 = parking_lot::RawMutex::INIT;
    mutex2.lock();
    self
      .sync_sender
      .send((SendMutex(&mutex1 as _), SendMutex(&mutex2 as _)))?;
    if !mutex1.try_lock_for(Duration::from_secs(30)) {
      panic!(
        "Test flush deadlock 1, sender closed = {}",
        self.sync_sender.is_closed()
      );
    }
    _ = self.stdout_writer.write_all(SYNC_MARKER);
    _ = self.stderr_writer.write_all(SYNC_MARKER);
    if !mutex2.try_lock_for(Duration::from_secs(30)) {
      panic!(
        "Test flush deadlock 2, sender closed = {}",
        self.sync_sender.is_closed()
      );
    }
    Ok(())
  }
}

#[cfg(test)]
mod tests {
  use super::*;
  use crate::tools::test::TestResult;
  use deno_core::unsync::spawn;
  use deno_core::unsync::spawn_blocking;

  /// Test that output is correctly interleaved with messages.
  #[tokio::test]
  async fn spawn_worker() {
    test_util::timeout!(60);
    let (mut worker, mut receiver) = create_single_test_event_channel();

    let recv_handle = spawn(async move {
      let mut queue = vec![];
      while let Some((_, message)) = receiver.recv().await {
        let msg_str = format!("{message:?}");
        if msg_str.len() > 50 {
          eprintln!("message = {}...", &msg_str[..50]);
        } else {
          eprintln!("message = {}", msg_str);
        }
        queue.push(message);
      }
      eprintln!("done");
      queue
    });
    let send_handle = spawn_blocking(move || {
      worker.stdout.write_all(&[1; 100_000]).unwrap();
      eprintln!("Wrote bytes");
      worker.sender.send(TestEvent::StepWait(1)).unwrap();
      eprintln!("Sent");
      worker.stdout.write_all(&[2; 100_000]).unwrap();
      eprintln!("Wrote bytes 2");
      worker.sender.flush().unwrap();
      eprintln!("Done");
    });
    send_handle.await.unwrap();
    let messages = recv_handle.await.unwrap();

    let mut expected = 1;
    let mut count = 0;
    for message in messages {
      match message {
        TestEvent::Output(_, vec) => {
          assert_eq!(vec[0], expected);
          count += vec.len();
        }
        TestEvent::StepWait(_) => {
          assert_eq!(count, 100_000);
          count = 0;
          expected = 2;
        }
        _ => unreachable!(),
      }
    }
    assert_eq!(expected, 2);
    assert_eq!(count, 100_000);
  }

  /// Test that flushing a large number of times doesn't hang.
  #[tokio::test]
  async fn test_flush_lots() {
    test_util::timeout!(240);
    let (mut worker, mut receiver) = create_single_test_event_channel();
    let recv_handle = spawn(async move {
      let mut queue = vec![];
      while let Some((_, message)) = receiver.recv().await {
        assert!(!matches!(message, TestEvent::Output(..)));
        queue.push(message);
      }
      eprintln!("Receiver closed");
      queue
    });
    let send_handle = spawn_blocking(move || {
      for _ in 0..100000 {
        worker.sender.send(TestEvent::StepWait(1)).unwrap();
      }
      eprintln!("Sent all messages");
    });
    send_handle.await.unwrap();
    let messages = recv_handle.await.unwrap();
    assert_eq!(messages.len(), 100000);
  }

  /// Test that flushing a large number of times doesn't hang.
  #[tokio::test]
  async fn test_flush_large() {
    test_util::timeout!(240);
    let (mut worker, mut receiver) = create_single_test_event_channel();
    let recv_handle = spawn(async move {
      let mut queue = vec![];
      while let Some((_, message)) = receiver.recv().await {
        if let TestEvent::StepWait(..) = message {
          queue.push(());
        }
      }
      eprintln!("Receiver closed");
      queue
    });
    let send_handle = spawn_blocking(move || {
      for _ in 0..25000 {
        // Write one pipe buffer's worth of message here. We try a few different sizes of potentially
        // blocking writes.
        worker.stderr.write_all(&[0; 4 * 1024]).unwrap();
        worker.sender.send(TestEvent::StepWait(1)).unwrap();
        worker.stderr.write_all(&[0; 16 * 1024]).unwrap();
        worker.sender.send(TestEvent::StepWait(1)).unwrap();
        worker.stderr.write_all(&[0; 64 * 1024]).unwrap();
        worker.sender.send(TestEvent::StepWait(1)).unwrap();
        worker.stderr.write_all(&[0; 128 * 1024]).unwrap();
        worker.sender.send(TestEvent::StepWait(1)).unwrap();
      }
      eprintln!("Sent all messages");
    });
    send_handle.await.unwrap();
    let messages = recv_handle.await.unwrap();
    assert_eq!(messages.len(), 100000);
  }

  /// Test that flushing a large number of times doesn't hang.
  #[tokio::test]
  async fn test_flush_with_close() {
    test_util::timeout!(240);
    let (worker, mut receiver) = create_single_test_event_channel();
    let TestEventWorkerSender {
      mut sender,
      stderr,
      stdout,
    } = worker;
    let recv_handle = spawn(async move {
      let mut queue = vec![];
      while let Some((_, _)) = receiver.recv().await {
        queue.push(());
      }
      eprintln!("Receiver closed");
      queue
    });
    let send_handle = spawn_blocking(move || {
      let mut stdout = Some(stdout);
      let mut stderr = Some(stderr);
      for i in 0..100000 {
        if i == 20000 {
          stdout.take();
        }
        if i == 40000 {
          stderr.take();
        }
        if i % 2 == 0 {
          if let Some(stdout) = &mut stdout {
            stdout.write_all(b"message").unwrap();
          }
        } else if let Some(stderr) = &mut stderr {
          stderr.write_all(b"message").unwrap();
        }
        sender.send(TestEvent::StepWait(1)).unwrap();
      }
      eprintln!("Sent all messages");
    });
    send_handle.await.unwrap();
    let messages = recv_handle.await.unwrap();
    assert_eq!(messages.len(), 130000);
  }

  /// Test that large numbers of interleaved steps are routed properly.
  #[tokio::test]
  async fn test_interleave() {
    test_util::timeout!(60);
    const MESSAGE_COUNT: usize = 10_000;
    let (mut worker, mut receiver) = create_single_test_event_channel();
    let recv_handle = spawn(async move {
      let mut i = 0;
      while let Some((_, message)) = receiver.recv().await {
        if i % 2 == 0 {
          let expected_text = format!("{:08x}", i / 2).into_bytes();
          let TestEvent::Output(TestStdioStream::Stderr, text) = message else {
            panic!("Incorrect message: {message:?}");
          };
          assert_eq!(text, expected_text);
        } else {
          let TestEvent::Result(index, TestResult::Ok, 0) = message else {
            panic!("Incorrect message: {message:?}");
          };
          assert_eq!(index, i / 2);
        }
        i += 1;
      }
      eprintln!("Receiver closed");
      i
    });
    let send_handle: deno_core::unsync::JoinHandle<()> =
      spawn_blocking(move || {
        for i in 0..MESSAGE_COUNT {
          worker
            .stderr
            .write_all(format!("{i:08x}").as_str().as_bytes())
            .unwrap();
          worker
            .sender
            .send(TestEvent::Result(i, TestResult::Ok, 0))
            .unwrap();
        }
        eprintln!("Sent all messages");
      });
    send_handle.await.unwrap();
    let messages = recv_handle.await.unwrap();
    assert_eq!(messages, MESSAGE_COUNT * 2);
  }

  #[tokio::test]
  async fn test_sender_shutdown_before_receive() {
    test_util::timeout!(60);
    for _ in 0..10 {
      let (mut worker, mut receiver) = create_single_test_event_channel();
      worker.stderr.write_all(b"hello").unwrap();
      worker
        .sender
        .send(TestEvent::Result(0, TestResult::Ok, 0))
        .unwrap();
      drop(worker);
      let (_, message) = receiver.recv().await.unwrap();
      let TestEvent::Output(TestStdioStream::Stderr, text) = message else {
        panic!("Incorrect message: {message:?}");
      };
      assert_eq!(text.as_slice(), b"hello");
      let (_, message) = receiver.recv().await.unwrap();
      let TestEvent::Result(..) = message else {
        panic!("Incorrect message: {message:?}");
      };
      assert!(receiver.recv().await.is_none());
    }
  }

  /// Ensure nothing panics if we're racing the runtime shutdown.
  #[test]
  fn test_runtime_shutdown() {
    test_util::timeout!(60);
    let runtime = tokio::runtime::Builder::new_current_thread()
      .enable_all()
      .build()
      .unwrap();
    runtime.block_on(async {
      let (mut worker, mut receiver) = create_single_test_event_channel();
      tokio::task::spawn(async move {
        loop {
          if receiver.recv().await.is_none() {
            break;
          }
        }
      });
      tokio::task::spawn(async move {
        _ = worker.sender.send(TestEvent::Sigint);
      });
    });
  }
}