diff --git a/Cargo.lock b/Cargo.lock index 1ecf88c9e7..295371856b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -383,6 +383,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64-simd" version = "0.7.0" @@ -1070,7 +1076,7 @@ version = "1.43.5" dependencies = [ "async-trait", "base32", - "base64", + "base64 0.21.7", "bincode", "bytes", "cache_control", @@ -1137,6 +1143,7 @@ dependencies = [ "regex", "reqwest", "ring", + "runtimelib", "rustyline", "rustyline-derive", "serde", @@ -1171,7 +1178,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "584547d27786a734536fde7088f8429d355569c39410427be44695c300618408" dependencies = [ "anyhow", - "base64", + "base64 0.21.7", "deno_media_type", "deno_terminal", "dprint-swc-ext", @@ -1349,7 +1356,7 @@ dependencies = [ "aes", "aes-gcm", "aes-kw", - "base64", + "base64 0.21.7", "cbc", "const-oid", "ctr", @@ -1407,7 +1414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b80fef2bf1b6e14712633975f7f39a3b29b95a5769cafcb959ffa1a84b7680" dependencies = [ "anyhow", - "base64", + "base64 0.21.7", "deno_ast", "deno_graph", "escape8259", @@ -1503,7 +1510,7 @@ version = "0.150.0" dependencies = [ "async-compression", "async-trait", - "base64", + "base64 0.21.7", "bencher", "brotli 3.5.0", "bytes", @@ -1558,7 +1565,7 @@ version = "0.60.0" dependencies = [ "anyhow", "async-trait", - "base64", + "base64 0.21.7", "chrono", "deno_core", "deno_fetch", @@ -2155,6 +2162,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "displaydoc" version = "0.2.4" @@ -2512,7 +2540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f606daca1ce18c69ccdabc59aa1c7e077356b8ffcd74e12c7646f545320a2fd" dependencies = [ "anyhow", - "base64", + "base64 0.21.7", "deno_ast", "deno_graph", "deno_npm", @@ -2569,7 +2597,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1" dependencies = [ - "base64", + "base64 0.21.7", "http-body-util", "hyper 1.1.0", "hyper-util", @@ -3824,6 +3852,16 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.5.0", + "libc", +] + [[package]] name = "libsqlite3-sys" version = "0.26.0" @@ -4381,6 +4419,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "ordered-float" version = "2.10.1" @@ -5121,6 +5165,17 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_users" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" +dependencies = [ + "getrandom", + "libredox", + "thiserror", +] + [[package]] name = "ref-cast" version = "1.0.22" @@ -5183,7 +5238,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ "async-compression", - "base64", + "base64 0.21.7", "bytes", "encoding_rs", "futures-core", @@ -5269,7 +5324,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" dependencies = [ - "base64", + "base64 0.21.7", "bitflags 2.5.0", "serde", "serde_derive", @@ -5295,6 +5350,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "runtimelib" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4300b46ab6f2970f81c176f4f2f7ff0a48809f52be7a8fd4ca5a32e9002f6e8f" +dependencies = [ + "anyhow", + "base64 0.22.1", + "bytes", + "chrono", + "data-encoding", + "dirs", + "glob", + "rand", + "ring", + "serde", + "serde_json", + "shellexpand", + "tokio", + "uuid", + "zeromq", +] + [[package]] name = "rusqlite" version = "0.29.0" @@ -5391,7 +5469,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", ] [[package]] @@ -5705,6 +5783,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sha2" version = "0.10.8" @@ -5722,6 +5806,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f" +[[package]] +name = "shellexpand" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da03fa3b94cc19e3ebfc88c4229c49d8f08cdbd1228870a45f0ffdf84988e14b" +dependencies = [ + "dirs", +] + [[package]] name = "signal-hook" version = "0.3.17" @@ -6297,7 +6390,7 @@ version = "0.184.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "565a76c4ca47ce31d78301c0beab878e4c2cb4f624691254d834ec8c0e236755" dependencies = [ - "base64", + "base64 0.21.7", "dashmap", "indexmap", "once_cell", @@ -6560,7 +6653,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-stream", - "base64", + "base64 0.21.7", "bytes", "console_static_text", "deno_unsync", @@ -7192,6 +7285,7 @@ checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ "getrandom", "serde", + "sha1_smol", ] [[package]] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 2bc2d4c17a..a7856b8d74 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -129,6 +129,7 @@ rand = { workspace = true, features = ["small_rng"] } regex.workspace = true reqwest.workspace = true ring.workspace = true +runtimelib = "=0.9.0" rustyline.workspace = true rustyline-derive = "=0.7.0" serde.workspace = true diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs index 1c60bc2bc3..57ca93ff4d 100644 --- a/cli/ops/jupyter.rs +++ b/cli/ops/jupyter.rs @@ -4,9 +4,11 @@ use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; -use crate::tools::jupyter::jupyter_msg::Connection; -use crate::tools::jupyter::jupyter_msg::JupyterMessage; -use crate::tools::jupyter::server::StdioMsg; +use runtimelib::JupyterMessage; +use runtimelib::JupyterMessageContent; +use runtimelib::KernelIoPubConnection; +use runtimelib::StreamContent; + use deno_core::error::AnyError; use deno_core::op2; use deno_core::serde_json; @@ -19,7 +21,7 @@ deno_core::extension!(deno_jupyter, op_jupyter_broadcast, ], options = { - sender: mpsc::UnboundedSender, + sender: mpsc::UnboundedSender, }, middleware = |op| match op.name { "op_print" => op_print(), @@ -38,28 +40,40 @@ pub async fn op_jupyter_broadcast( #[serde] metadata: serde_json::Value, #[serde] buffers: Vec, ) -> Result<(), AnyError> { - let (iopub_socket, last_execution_request) = { + let (iopub_connection, last_execution_request) = { let s = state.borrow(); ( - s.borrow::>>>() - .clone(), + s.borrow::>>().clone(), s.borrow::>>>().clone(), ) }; let maybe_last_request = last_execution_request.borrow().clone(); if let Some(last_request) = maybe_last_request { - (*iopub_socket.lock().await) - .send( - &last_request - .new_message(&message_type) - .with_content(content) - .with_metadata(metadata) - .with_buffers( - buffers.into_iter().map(|b| b.to_vec().into()).collect(), - ), - ) + let content = JupyterMessageContent::from_type_and_content( + &message_type, + content.clone(), + ) + .map_err(|err| { + log::error!( + "Error deserializing content from jupyter.broadcast, message_type: {}:\n\n{}\n\n{}", + &message_type, + content, + err + ); + err + })?; + + let mut jupyter_message = JupyterMessage::new(content, Some(&last_request)); + + jupyter_message.metadata = metadata; + jupyter_message.buffers = + buffers.into_iter().map(|b| b.to_vec().into()).collect(); + jupyter_message.set_parent(last_request); + + (iopub_connection.lock().await) + .send(jupyter_message) .await?; } @@ -72,16 +86,16 @@ pub fn op_print( #[string] msg: &str, is_err: bool, ) -> Result<(), AnyError> { - let sender = state.borrow_mut::>(); + let sender = state.borrow_mut::>(); if is_err { - if let Err(err) = sender.send(StdioMsg::Stderr(msg.into())) { + if let Err(err) = sender.send(StreamContent::stderr(msg.into())) { log::error!("Failed to send stderr message: {}", err); } return Ok(()); } - if let Err(err) = sender.send(StdioMsg::Stdout(msg.into())) { + if let Err(err) = sender.send(StreamContent::stdout(msg.into())) { log::error!("Failed to send stdout message: {}", err); } Ok(()) diff --git a/cli/tools/jupyter/install.rs b/cli/tools/jupyter/install.rs index 69a75837e1..40f21d3c15 100644 --- a/cli/tools/jupyter/install.rs +++ b/cli/tools/jupyter/install.rs @@ -1,40 +1,31 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. -use deno_core::anyhow::bail; -use deno_core::anyhow::Context; use deno_core::error::AnyError; use deno_core::serde_json; use deno_core::serde_json::json; use std::env::current_exe; -use std::io::ErrorKind; use std::io::Write; use std::path::Path; -use tempfile::TempDir; + +use runtimelib::dirs::user_data_dir; const DENO_ICON_32: &[u8] = include_bytes!("./resources/deno-logo-32x32.png"); const DENO_ICON_64: &[u8] = include_bytes!("./resources/deno-logo-64x64.png"); const DENO_ICON_SVG: &[u8] = include_bytes!("./resources/deno-logo-svg.svg"); pub fn status() -> Result<(), AnyError> { - let output = std::process::Command::new("jupyter") - .args(["kernelspec", "list", "--json"]) - .output() - .context("Failed to get list of installed kernelspecs")?; - let json_output: serde_json::Value = - serde_json::from_slice(&output.stdout) - .context("Failed to parse JSON from kernelspec list")?; + let user_data_dir = user_data_dir()?; - if let Some(specs) = json_output.get("kernelspecs") { - if let Some(specs_obj) = specs.as_object() { - if specs_obj.contains_key("deno") { - log::info!("✅ Deno kernel already installed"); - return Ok(()); - } - } + let kernel_spec_dir_path = user_data_dir.join("kernels").join("deno"); + let kernel_spec_path = kernel_spec_dir_path.join("kernel.json"); + + if kernel_spec_path.exists() { + log::info!("✅ Deno kernel already installed"); + Ok(()) + } else { + log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up"); + Ok(()) } - - log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up"); - Ok(()) } fn install_icon( @@ -49,8 +40,12 @@ fn install_icon( } pub fn install() -> Result<(), AnyError> { - let temp_dir = TempDir::new().unwrap(); - let kernel_json_path = temp_dir.path().join("kernel.json"); + let user_data_dir = user_data_dir()?; + let kernel_dir = user_data_dir.join("kernels").join("deno"); + + std::fs::create_dir_all(&kernel_dir)?; + + let kernel_json_path = kernel_dir.join("kernel.json"); // TODO(bartlomieju): add remaining fields as per // https://jupyter-client.readthedocs.io/en/stable/kernels.html#kernel-specs @@ -63,51 +58,10 @@ pub fn install() -> Result<(), AnyError> { let f = std::fs::File::create(kernel_json_path)?; serde_json::to_writer_pretty(f, &json_data)?; - install_icon(temp_dir.path(), "logo-32x32.png", DENO_ICON_32)?; - install_icon(temp_dir.path(), "logo-64x64.png", DENO_ICON_64)?; - install_icon(temp_dir.path(), "logo-svg.svg", DENO_ICON_SVG)?; + install_icon(&user_data_dir, "logo-32x32.png", DENO_ICON_32)?; + install_icon(&user_data_dir, "logo-64x64.png", DENO_ICON_64)?; + install_icon(&user_data_dir, "logo-svg.svg", DENO_ICON_SVG)?; - let child_result = std::process::Command::new("jupyter") - .args([ - "kernelspec", - "install", - "--user", - "--name", - "deno", - &temp_dir.path().to_string_lossy(), - ]) - .spawn(); - let mut child = match child_result { - Ok(child) => child, - Err(err) - if matches!( - err.kind(), - ErrorKind::NotFound | ErrorKind::PermissionDenied - ) => - { - return Err(err).context(concat!( - "Failed to spawn 'jupyter' command. Is JupyterLab installed ", - "(https://jupyter.org/install) and available on the PATH?" - )); - } - Err(err) => { - return Err(err).context("Failed to spawn 'jupyter' command."); - } - }; - - let wait_result = child.wait(); - match wait_result { - Ok(status) => { - if !status.success() { - bail!("Failed to install kernelspec, try again."); - } - } - Err(err) => { - bail!("Failed to install kernelspec: {}", err); - } - } - - let _ = std::fs::remove_dir(temp_dir); log::info!("✅ Deno kernelspec installed successfully."); Ok(()) } diff --git a/cli/tools/jupyter/jupyter_msg.rs b/cli/tools/jupyter/jupyter_msg.rs deleted file mode 100644 index 233efcc8e4..0000000000 --- a/cli/tools/jupyter/jupyter_msg.rs +++ /dev/null @@ -1,305 +0,0 @@ -// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. - -// This file is forked/ported from -// Copyright 2020 The Evcxr Authors. MIT license. - -use bytes::Bytes; -use data_encoding::HEXLOWER; -use deno_core::anyhow::anyhow; -use deno_core::anyhow::bail; -use deno_core::error::AnyError; -use deno_core::serde_json; -use deno_core::serde_json::json; -use ring::hmac; -use std::fmt; -use uuid::Uuid; - -use crate::util::time::utc_now; - -pub struct Connection { - socket: S, - /// Will be None if our key was empty (digest authentication disabled). - mac: Option, -} - -impl Connection { - pub fn new(socket: S, key: &str) -> Self { - let mac = if key.is_empty() { - None - } else { - Some(hmac::Key::new(hmac::HMAC_SHA256, key.as_bytes())) - }; - Connection { socket, mac } - } -} - -impl Connection { - pub async fn single_heartbeat(&mut self) -> Result<(), AnyError> { - self.socket.recv().await?; - self - .socket - .send(zeromq::ZmqMessage::from(b"ping".to_vec())) - .await?; - Ok(()) - } -} - -impl Connection { - pub async fn read(&mut self) -> Result { - let multipart = self.socket.recv().await?; - let raw_message = RawMessage::from_multipart(multipart, self.mac.as_ref())?; - JupyterMessage::from_raw_message(raw_message) - } -} - -impl Connection { - pub async fn send( - &mut self, - message: &JupyterMessage, - ) -> Result<(), AnyError> { - // If performance is a concern, we can probably avoid the clone and to_vec calls with a bit - // of refactoring. - let mut jparts: Vec = vec![ - serde_json::to_string(&message.header) - .unwrap() - .as_bytes() - .to_vec() - .into(), - serde_json::to_string(&message.parent_header) - .unwrap() - .as_bytes() - .to_vec() - .into(), - serde_json::to_string(&message.metadata) - .unwrap() - .as_bytes() - .to_vec() - .into(), - serde_json::to_string(&message.content) - .unwrap() - .as_bytes() - .to_vec() - .into(), - ]; - jparts.extend_from_slice(&message.buffers); - let raw_message = RawMessage { - zmq_identities: message.zmq_identities.clone(), - jparts, - }; - self.send_raw(raw_message).await - } - - async fn send_raw( - &mut self, - raw_message: RawMessage, - ) -> Result<(), AnyError> { - let hmac = if let Some(key) = &self.mac { - let ctx = digest(key, &raw_message.jparts); - let tag = ctx.sign(); - HEXLOWER.encode(tag.as_ref()) - } else { - String::new() - }; - let mut parts: Vec = Vec::new(); - for part in &raw_message.zmq_identities { - parts.push(part.to_vec().into()); - } - parts.push(DELIMITER.into()); - parts.push(hmac.as_bytes().to_vec().into()); - for part in &raw_message.jparts { - parts.push(part.to_vec().into()); - } - // ZmqMessage::try_from only fails if parts is empty, which it never - // will be here. - let message = zeromq::ZmqMessage::try_from(parts).unwrap(); - self.socket.send(message).await?; - Ok(()) - } -} - -fn digest(mac: &hmac::Key, jparts: &[Bytes]) -> hmac::Context { - let mut hmac_ctx = hmac::Context::with_key(mac); - for part in jparts { - hmac_ctx.update(part); - } - hmac_ctx -} - -struct RawMessage { - zmq_identities: Vec, - jparts: Vec, -} - -impl RawMessage { - pub fn from_multipart( - multipart: zeromq::ZmqMessage, - mac: Option<&hmac::Key>, - ) -> Result { - let delimiter_index = multipart - .iter() - .position(|part| &part[..] == DELIMITER) - .ok_or_else(|| anyhow!("Missing delimiter"))?; - let mut parts = multipart.into_vec(); - let jparts: Vec<_> = parts.drain(delimiter_index + 2..).collect(); - let expected_hmac = parts.pop().unwrap(); - // Remove delimiter, so that what's left is just the identities. - parts.pop(); - let zmq_identities = parts; - - let raw_message = RawMessage { - zmq_identities, - jparts, - }; - - if let Some(key) = mac { - let sig = HEXLOWER.decode(&expected_hmac)?; - let mut msg = Vec::new(); - for part in &raw_message.jparts { - msg.extend(part); - } - - if let Err(err) = hmac::verify(key, msg.as_ref(), sig.as_ref()) { - bail!("{}", err); - } - } - - Ok(raw_message) - } -} - -#[derive(Clone)] -pub struct JupyterMessage { - zmq_identities: Vec, - header: serde_json::Value, - parent_header: serde_json::Value, - metadata: serde_json::Value, - content: serde_json::Value, - buffers: Vec, -} - -const DELIMITER: &[u8] = b""; - -impl JupyterMessage { - fn from_raw_message( - raw_message: RawMessage, - ) -> Result { - if raw_message.jparts.len() < 4 { - bail!("Insufficient message parts {}", raw_message.jparts.len()); - } - - Ok(JupyterMessage { - zmq_identities: raw_message.zmq_identities, - header: serde_json::from_slice(&raw_message.jparts[0])?, - parent_header: serde_json::from_slice(&raw_message.jparts[1])?, - metadata: serde_json::from_slice(&raw_message.jparts[2])?, - content: serde_json::from_slice(&raw_message.jparts[3])?, - buffers: if raw_message.jparts.len() > 4 { - raw_message.jparts[4..].to_vec() - } else { - vec![] - }, - }) - } - - pub fn message_type(&self) -> &str { - self.header["msg_type"].as_str().unwrap_or("") - } - - pub fn store_history(&self) -> bool { - self.content["store_history"].as_bool().unwrap_or(true) - } - - pub fn silent(&self) -> bool { - self.content["silent"].as_bool().unwrap_or(false) - } - - pub fn code(&self) -> &str { - self.content["code"].as_str().unwrap_or("") - } - - pub fn cursor_pos(&self) -> usize { - self.content["cursor_pos"].as_u64().unwrap_or(0) as usize - } - - pub fn comm_id(&self) -> &str { - self.content["comm_id"].as_str().unwrap_or("") - } - - // Creates a new child message of this message. ZMQ identities are not transferred. - pub fn new_message(&self, msg_type: &str) -> JupyterMessage { - let mut header = self.header.clone(); - header["msg_type"] = serde_json::Value::String(msg_type.to_owned()); - header["username"] = serde_json::Value::String("kernel".to_owned()); - header["msg_id"] = serde_json::Value::String(Uuid::new_v4().to_string()); - header["date"] = serde_json::Value::String(utc_now().to_rfc3339()); - - JupyterMessage { - zmq_identities: Vec::new(), - header, - parent_header: self.header.clone(), - metadata: json!({}), - content: json!({}), - buffers: vec![], - } - } - - // Creates a reply to this message. This is a child with the message type determined - // automatically by replacing "request" with "reply". ZMQ identities are transferred. - pub fn new_reply(&self) -> JupyterMessage { - let mut reply = - self.new_message(&self.message_type().replace("_request", "_reply")); - reply.zmq_identities = self.zmq_identities.clone(); - reply - } - - #[must_use = "Need to send this message for it to have any effect"] - pub fn comm_close_message(&self) -> JupyterMessage { - self.new_message("comm_close").with_content(json!({ - "comm_id": self.comm_id() - })) - } - - pub fn with_content(mut self, content: serde_json::Value) -> JupyterMessage { - self.content = content; - self - } - - pub fn with_metadata( - mut self, - metadata: serde_json::Value, - ) -> JupyterMessage { - self.metadata = metadata; - self - } - - pub fn with_buffers(mut self, buffers: Vec) -> JupyterMessage { - self.buffers = buffers; - self - } -} - -impl fmt::Debug for JupyterMessage { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - writeln!( - f, - "\nHeader: {}", - serde_json::to_string_pretty(&self.header).unwrap() - )?; - writeln!( - f, - "Parent header: {}", - serde_json::to_string_pretty(&self.parent_header).unwrap() - )?; - writeln!( - f, - "Metadata: {}", - serde_json::to_string_pretty(&self.metadata).unwrap() - )?; - writeln!( - f, - "Content: {}\n", - serde_json::to_string_pretty(&self.content).unwrap() - )?; - Ok(()) - } -} diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs index da1c4bc4d8..a4d0bb27d9 100644 --- a/cli/tools/jupyter/mod.rs +++ b/cli/tools/jupyter/mod.rs @@ -3,7 +3,6 @@ use crate::args::Flags; use crate::args::JupyterFlags; use crate::ops; -use crate::tools::jupyter::server::StdioMsg; use crate::tools::repl; use crate::tools::test::create_single_test_event_channel; use crate::tools::test::reporters::PrettyTestReporter; @@ -15,7 +14,6 @@ use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::located_script_name; use deno_core::resolve_url_or_path; -use deno_core::serde::Deserialize; use deno_core::serde_json; use deno_core::url::Url; use deno_runtime::deno_io::Stdio; @@ -24,11 +22,13 @@ use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; use deno_runtime::WorkerExecutionMode; use deno_terminal::colors; + +use runtimelib::jupyter::ConnectionInfo; +use runtimelib::messaging::StreamContent; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; mod install; -pub mod jupyter_msg; pub mod server; pub async fn kernel( @@ -73,7 +73,7 @@ pub async fn kernel( std::fs::read_to_string(&connection_filepath).with_context(|| { format!("Couldn't read connection file: {:?}", connection_filepath) })?; - let spec: ConnectionSpec = + let spec: ConnectionInfo = serde_json::from_str(&conn_file).with_context(|| { format!( "Connection file is not a valid JSON: {:?}", @@ -119,12 +119,14 @@ pub async fn kernel( test_event_receiver, ) .await?; - struct TestWriter(UnboundedSender); + struct TestWriter(UnboundedSender); impl std::io::Write for TestWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { self .0 - .send(StdioMsg::Stdout(String::from_utf8_lossy(buf).into_owned())) + .send(StreamContent::stdout( + String::from_utf8_lossy(buf).into_owned(), + )) .ok(); Ok(buf.len()) } @@ -150,15 +152,3 @@ pub async fn kernel( Ok(()) } - -#[derive(Debug, Deserialize)] -pub struct ConnectionSpec { - ip: String, - transport: String, - control_port: u32, - shell_port: u32, - stdin_port: u32, - hb_port: u32, - iopub_port: u32, - key: String, -} diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs index 4021cf6a31..3d273ee744 100644 --- a/cli/tools/jupyter/server.rs +++ b/cli/tools/jupyter/server.rs @@ -19,48 +19,54 @@ use deno_core::CancelHandle; use tokio::sync::mpsc; use tokio::sync::Mutex; -use super::jupyter_msg::Connection; -use super::jupyter_msg::JupyterMessage; -use super::ConnectionSpec; +use runtimelib::ConnectionInfo; +use runtimelib::KernelControlConnection; +use runtimelib::KernelHeartbeatConnection; +use runtimelib::KernelIoPubConnection; +use runtimelib::KernelShellConnection; -pub enum StdioMsg { - Stdout(String), - Stderr(String), -} +use runtimelib::messaging; +use runtimelib::AsChildOf; +use runtimelib::JupyterMessage; +use runtimelib::JupyterMessageContent; +use runtimelib::ReplyError; +use runtimelib::ReplyStatus; +use runtimelib::StreamContent; pub struct JupyterServer { execution_count: usize, last_execution_request: Rc>>, // This is Arc>, so we don't hold RefCell borrows across await // points. - iopub_socket: Arc>>, + iopub_connection: Arc>, repl_session: repl::ReplSession, } impl JupyterServer { pub async fn start( - spec: ConnectionSpec, - mut stdio_rx: mpsc::UnboundedReceiver, + connection_info: ConnectionInfo, + mut stdio_rx: mpsc::UnboundedReceiver, mut repl_session: repl::ReplSession, ) -> Result<(), AnyError> { let mut heartbeat = - bind_socket::(&spec, spec.hb_port).await?; - let shell_socket = - bind_socket::(&spec, spec.shell_port).await?; - let control_socket = - bind_socket::(&spec, spec.control_port).await?; - let _stdin_socket = - bind_socket::(&spec, spec.stdin_port).await?; - let iopub_socket = - bind_socket::(&spec, spec.iopub_port).await?; - let iopub_socket = Arc::new(Mutex::new(iopub_socket)); + connection_info.create_kernel_heartbeat_connection().await?; + let shell_connection = + connection_info.create_kernel_shell_connection().await?; + let control_connection = + connection_info.create_kernel_control_connection().await?; + let _stdin_connection = + connection_info.create_kernel_stdin_connection().await?; + let iopub_connection = + connection_info.create_kernel_iopub_connection().await?; + + let iopub_connection = Arc::new(Mutex::new(iopub_connection)); let last_execution_request = Rc::new(RefCell::new(None)); - // Store `iopub_socket` in the op state so it's accessible to the runtime API. + // Store `iopub_connection` in the op state so it's accessible to the runtime API. { let op_state_rc = repl_session.worker.js_runtime.op_state(); let mut op_state = op_state_rc.borrow_mut(); - op_state.put(iopub_socket.clone()); + op_state.put(iopub_connection.clone()); op_state.put(last_execution_request.clone()); } @@ -68,14 +74,18 @@ impl JupyterServer { let mut server = Self { execution_count: 0, - iopub_socket: iopub_socket.clone(), + iopub_connection: iopub_connection.clone(), last_execution_request: last_execution_request.clone(), repl_session, }; let handle1 = deno_core::unsync::spawn(async move { if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await { - log::error!("Heartbeat error: {}", err); + log::error!( + "Heartbeat error: {}\nBacktrace:\n{}", + err, + err.backtrace() + ); } }); @@ -83,23 +93,27 @@ impl JupyterServer { let cancel_handle = cancel_handle.clone(); async move { if let Err(err) = - Self::handle_control(control_socket, cancel_handle).await + Self::handle_control(control_connection, cancel_handle).await { - log::error!("Control error: {}", err); + log::error!( + "Control error: {}\nBacktrace:\n{}", + err, + err.backtrace() + ); } } }); let handle3 = deno_core::unsync::spawn(async move { - if let Err(err) = server.handle_shell(shell_socket).await { - log::error!("Shell error: {}", err); + if let Err(err) = server.handle_shell(shell_connection).await { + log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace()); } }); let handle4 = deno_core::unsync::spawn(async move { while let Some(stdio_msg) = stdio_rx.recv().await { Self::handle_stdio_msg( - iopub_socket.clone(), + iopub_connection.clone(), last_execution_request.clone(), stdio_msg, ) @@ -117,33 +131,25 @@ impl JupyterServer { Ok(()) } - async fn handle_stdio_msg( - iopub_socket: Arc>>, + async fn handle_stdio_msg( + iopub_connection: Arc>, last_execution_request: Rc>>, - stdio_msg: StdioMsg, + stdio_msg: StreamContent, ) { let maybe_exec_result = last_execution_request.borrow().clone(); if let Some(exec_request) = maybe_exec_result { - let (name, text) = match stdio_msg { - StdioMsg::Stdout(text) => ("stdout", text), - StdioMsg::Stderr(text) => ("stderr", text), - }; - - let result = (*iopub_socket.lock().await) - .send(&exec_request.new_message("stream").with_content(json!({ - "name": name, - "text": text - }))) + let result = (iopub_connection.lock().await) + .send(stdio_msg.as_child_of(&exec_request)) .await; if let Err(err) = result { - log::error!("Output {} error: {}", name, err); + log::error!("Output error: {}", err); } } } async fn handle_heartbeat( - connection: &mut Connection, + connection: &mut KernelHeartbeatConnection, ) -> Result<(), AnyError> { loop { connection.single_heartbeat().await?; @@ -151,23 +157,30 @@ impl JupyterServer { } async fn handle_control( - mut connection: Connection, + mut connection: KernelControlConnection, cancel_handle: Rc, ) -> Result<(), AnyError> { loop { let msg = connection.read().await?; - match msg.message_type() { - "kernel_info_request" => { - connection - .send(&msg.new_reply().with_content(kernel_info())) - .await?; + + match msg.content { + JupyterMessageContent::KernelInfoRequest(_) => { + // normally kernel info is sent from the shell channel + // however, some frontends will send it on the control channel + // and it's no harm to send a kernel info reply on control + connection.send(kernel_info().as_child_of(&msg)).await?; } - "shutdown_request" => { + JupyterMessageContent::ShutdownRequest(_) => { cancel_handle.cancel(); } - "interrupt_request" => { + JupyterMessageContent::InterruptRequest(_) => { log::error!("Interrupt request currently not supported"); } + JupyterMessageContent::DebugRequest(_) => { + log::error!("Debug request currently not supported"); + // See https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request + // and https://microsoft.github.io/debug-adapter-protocol/ + } _ => { log::error!( "Unrecognized control message type: {}", @@ -180,7 +193,7 @@ impl JupyterServer { async fn handle_shell( &mut self, - mut connection: Connection, + mut connection: KernelShellConnection, ) -> Result<(), AnyError> { loop { let msg = connection.read().await?; @@ -191,43 +204,28 @@ impl JupyterServer { async fn handle_shell_message( &mut self, msg: JupyterMessage, - connection: &mut Connection, + connection: &mut KernelShellConnection, ) -> Result<(), AnyError> { + let parent = &msg.clone(); + self - .send_iopub( - &msg - .new_message("status") - .with_content(json!({"execution_state": "busy"})), - ) + .send_iopub(messaging::Status::busy().as_child_of(parent)) .await?; - match msg.message_type() { - "kernel_info_request" => { - connection - .send(&msg.new_reply().with_content(kernel_info())) - .await?; - } - "is_complete_request" => { - connection - .send(&msg.new_reply().with_content(json!({"status": "complete"}))) - .await?; - } - "execute_request" => { + match msg.content { + JupyterMessageContent::ExecuteRequest(execute_request) => { self - .handle_execution_request(msg.clone(), connection) + .handle_execution_request(execute_request, parent, connection) .await?; } - "comm_open" => { - self.send_iopub(&msg.comm_close_message()).await?; - } - "complete_request" => { - let user_code = msg.code(); - let cursor_pos = msg.cursor_pos(); + JupyterMessageContent::CompleteRequest(req) => { + let user_code = req.code; + let cursor_pos = req.cursor_pos; let lsp_completions = self .repl_session .language_server - .completions(user_code, cursor_pos) + .completions(&user_code, cursor_pos) .await; if !lsp_completions.is_empty() { @@ -247,16 +245,20 @@ impl JupyterServer { .unwrap_or(cursor_pos); connection - .send(&msg.new_reply().with_content(json!({ - "status": "ok", - "matches": matches, - "cursor_start": cursor_start, - "cursor_end": cursor_end, - "metadata": {}, - }))) + .send( + messaging::CompleteReply { + matches, + cursor_start, + cursor_end, + metadata: Default::default(), + status: ReplyStatus::Ok, + error: None, + } + .as_child_of(parent), + ) .await?; } else { - let expr = get_expr_from_line_at_pos(user_code, cursor_pos); + let expr = get_expr_from_line_at_pos(&user_code, cursor_pos); // check if the expression is in the form `obj.prop` let (completions, cursor_start) = if let Some(index) = expr.rfind('.') { @@ -292,72 +294,173 @@ impl JupyterServer { (candidates, cursor_pos - expr.len()) }; + connection - .send(&msg.new_reply().with_content(json!({ - "status": "ok", - "matches": completions, - "cursor_start": cursor_start, - "cursor_end": cursor_pos, - "metadata": {}, - }))) + .send( + messaging::CompleteReply { + matches: completions, + cursor_start, + cursor_end: cursor_pos, + metadata: Default::default(), + status: ReplyStatus::Ok, + error: None, + } + .as_child_of(parent), + ) .await?; } } - "comm_msg" | "comm_info_request" | "history_request" => { - // We don't handle these messages + + JupyterMessageContent::InspectRequest(_req) => { + // TODO(bartlomieju?): implement introspection request + // The inspect request is used to get information about an object at cursor position. + // There are two detail levels: 0 is typically documentation, 1 is typically source code + + // The response includes a MimeBundle to render the object: + // { + // "status": "ok", + // "found": true, + // "data": { + // "text/plain": "Plain documentation here", + // "text/html": "
Rich documentation here
", + // "application/json": { + // "key1": "value1", + // "key2": "value2" + // } + // }, + // } + + connection + .send( + messaging::InspectReply { + status: ReplyStatus::Ok, + found: false, + data: Default::default(), + metadata: Default::default(), + error: None, + } + .as_child_of(parent), + ) + .await?; } + + JupyterMessageContent::IsCompleteRequest(_) => { + connection + .send(messaging::IsCompleteReply::complete().as_child_of(parent)) + .await?; + } + JupyterMessageContent::KernelInfoRequest(_) => { + connection.send(kernel_info().as_child_of(parent)).await?; + } + JupyterMessageContent::CommOpen(comm) => { + connection + .send( + messaging::CommClose { + comm_id: comm.comm_id, + data: Default::default(), + } + .as_child_of(parent), + ) + .await?; + } + JupyterMessageContent::HistoryRequest(_req) => { + connection + .send( + messaging::HistoryReply { + history: vec![], + error: None, + status: ReplyStatus::Ok, + } + .as_child_of(parent), + ) + .await?; + } + JupyterMessageContent::InputReply(_rep) => { + // TODO(@zph): implement input reply from https://github.com/denoland/deno/pull/23592 + // NOTE: This will belong on the stdin channel, not the shell channel + } + JupyterMessageContent::CommInfoRequest(_req) => { + connection + .send( + messaging::CommInfoReply { + comms: Default::default(), + status: ReplyStatus::Ok, + error: None, + } + .as_child_of(parent), + ) + .await?; + } + JupyterMessageContent::CommMsg(_) + | JupyterMessageContent::CommClose(_) => { + // Do nothing with regular comm messages + } + // Any unknown message type is ignored _ => { - log::error!("Unrecognized shell message type: {}", msg.message_type()); + log::error!( + "Unrecognized shell message type: {}", + msg.content.message_type() + ); } } self - .send_iopub( - &msg - .new_message("status") - .with_content(json!({"execution_state": "idle"})), - ) + .send_iopub(messaging::Status::idle().as_child_of(parent)) .await?; + Ok(()) } async fn handle_execution_request( &mut self, - msg: JupyterMessage, - connection: &mut Connection, + execute_request: messaging::ExecuteRequest, + parent_message: &JupyterMessage, + connection: &mut KernelShellConnection, ) -> Result<(), AnyError> { - if !msg.silent() && msg.store_history() { + if !execute_request.silent && execute_request.store_history { self.execution_count += 1; } - *self.last_execution_request.borrow_mut() = Some(msg.clone()); + *self.last_execution_request.borrow_mut() = Some(parent_message.clone()); self - .send_iopub(&msg.new_message("execute_input").with_content(json!({ - "execution_count": self.execution_count, - "code": msg.code() - }))) + .send_iopub( + messaging::ExecuteInput { + execution_count: self.execution_count, + code: execute_request.code.clone(), + } + .as_child_of(parent_message), + ) .await?; let result = self .repl_session - .evaluate_line_with_object_wrapping(msg.code()) + .evaluate_line_with_object_wrapping(&execute_request.code) .await; let evaluate_response = match result { Ok(eval_response) => eval_response, Err(err) => { self - .send_iopub(&msg.new_message("error").with_content(json!({ - "ename": err.to_string(), - "evalue": " ", // Fake value, otherwise old Jupyter frontends don't show the error - "traceback": [], - }))) + .send_iopub( + messaging::ErrorOutput { + ename: err.to_string(), + evalue: err.to_string(), + traceback: vec![], + } + .as_child_of(parent_message), + ) .await?; connection - .send(&msg.new_reply().with_content(json!({ - "status": "error", - "execution_count": self.execution_count, - }))) + .send( + messaging::ExecuteReply { + execution_count: self.execution_count, + status: ReplyStatus::Error, + payload: None, + user_expressions: None, + error: None, + } + .as_child_of(parent_message), + ) .await?; return Ok(()); } @@ -373,11 +476,16 @@ impl JupyterServer { .await?; connection - .send(&msg.new_reply().with_content(json!({ - "status": "ok", - "execution_count": self.execution_count, - // FIXME: also include user_expressions - }))) + .send( + messaging::ExecuteReply { + execution_count: self.execution_count, + status: ReplyStatus::Ok, + user_expressions: None, + payload: None, + error: None, + } + .as_child_of(parent_message), + ) .await?; // Let's sleep here for a few ms, so we give a chance to the task that is // handling stdout and stderr streams to receive and flush the content. @@ -458,17 +566,30 @@ impl JupyterServer { }; self - .send_iopub(&msg.new_message("error").with_content(json!({ - "ename": ename, - "evalue": evalue, - "traceback": traceback, - }))) + .send_iopub( + messaging::ErrorOutput { + ename: ename.clone(), + evalue: evalue.clone(), + traceback: traceback.clone(), + } + .as_child_of(parent_message), + ) .await?; connection - .send(&msg.new_reply().with_content(json!({ - "status": "error", - "execution_count": self.execution_count, - }))) + .send( + messaging::ExecuteReply { + execution_count: self.execution_count, + status: ReplyStatus::Error, + error: Some(ReplyError { + ename, + evalue, + traceback, + }), + user_expressions: None, + payload: None, + } + .as_child_of(parent_message), + ) .await?; } @@ -477,42 +598,35 @@ impl JupyterServer { async fn send_iopub( &mut self, - message: &JupyterMessage, + message: JupyterMessage, ) -> Result<(), AnyError> { - self.iopub_socket.lock().await.send(message).await + self.iopub_connection.lock().await.send(message).await } } -async fn bind_socket( - config: &ConnectionSpec, - port: u32, -) -> Result, AnyError> { - let endpoint = format!("{}://{}:{}", config.transport, config.ip, port); - let mut socket = S::new(); - socket.bind(&endpoint).await?; - Ok(Connection::new(socket, &config.key)) -} - -fn kernel_info() -> serde_json::Value { - json!({ - "status": "ok", - "protocol_version": "5.3", - "implementation_version": crate::version::deno(), - "implementation": "Deno kernel", - "language_info": { - "name": "typescript", - "version": crate::version::TYPESCRIPT, - "mimetype": "text/x.typescript", - "file_extension": ".ts", - "pygments_lexer": "typescript", - "nb_converter": "script" +fn kernel_info() -> messaging::KernelInfoReply { + messaging::KernelInfoReply { + status: ReplyStatus::Ok, + protocol_version: "5.3".to_string(), + implementation: "Deno kernel".to_string(), + implementation_version: crate::version::deno().to_string(), + language_info: messaging::LanguageInfo { + name: "typescript".to_string(), + version: crate::version::TYPESCRIPT.to_string(), + mimetype: "text/x.typescript".to_string(), + file_extension: ".ts".to_string(), + pygments_lexer: "typescript".to_string(), + codemirror_mode: messaging::CodeMirrorMode::typescript(), + nbconvert_exporter: "script".to_string(), }, - "help_links": [{ - "text": "Visit Deno manual", - "url": "https://deno.land/manual" + banner: "Welcome to Deno kernel".to_string(), + help_links: vec![messaging::HelpLink { + text: "Visit Deno manual".to_string(), + url: "https://deno.land/manual".to_string(), }], - "banner": "Welcome to Deno kernel", - }) + debugger: false, + error: None, + } } async fn publish_result( diff --git a/tests/integration/jupyter_tests.rs b/tests/integration/jupyter_tests.rs index 3c4efbdacb..c7b2712e88 100644 --- a/tests/integration/jupyter_tests.rs +++ b/tests/integration/jupyter_tests.rs @@ -10,6 +10,8 @@ use test_util::DenoChild; use test_util::TestContext; use test_util::TestContextBuilder; +use chrono::DateTime; +use chrono::Utc; use deno_core::anyhow::Result; use deno_core::serde_json; use deno_core::serde_json::json; @@ -119,7 +121,7 @@ impl Default for JupyterMsg { struct MsgHeader { msg_id: Uuid, session: Uuid, - date: String, + date: DateTime, username: String, msg_type: String, version: String, @@ -136,7 +138,7 @@ impl Default for MsgHeader { Self { msg_id: Uuid::new_v4(), session: Uuid::new_v4(), - date: utc_now().to_rfc3339(), + date: utc_now(), username: "test".into(), msg_type: "kernel_info_request".into(), version: "5.3".into(), @@ -517,7 +519,7 @@ async fn jupyter_kernel_info() -> Result<()> { "mimetype": "text/x.typescript", "file_extension": ".ts", "pygments_lexer": "typescript", - "nb_converter": "script" + "nbconvert_exporter": "script" }, }), ); @@ -612,7 +614,7 @@ async fn jupyter_store_history_false() -> Result<()> { json!({ "silent": false, "store_history": false, - "code": "console.log(\"asdf\")" + "code": "console.log(\"asdf\")", }), ) .await?; diff --git a/tests/specs/jupyter/install_command_not_exists/__test__.jsonc b/tests/specs/jupyter/install_command/__test__.jsonc similarity index 51% rename from tests/specs/jupyter/install_command_not_exists/__test__.jsonc rename to tests/specs/jupyter/install_command/__test__.jsonc index 9552157bd4..df60c3b867 100644 --- a/tests/specs/jupyter/install_command_not_exists/__test__.jsonc +++ b/tests/specs/jupyter/install_command/__test__.jsonc @@ -1,8 +1,8 @@ { "args": "jupyter --install", - "output": "install_command_not_exists.out", + "output": "install_command.out", "envs": { "PATH": "" }, - "exitCode": 1 + "exitCode": 0 } diff --git a/tests/specs/jupyter/install_command/install_command.out b/tests/specs/jupyter/install_command/install_command.out new file mode 100644 index 0000000000..62875d9cfc --- /dev/null +++ b/tests/specs/jupyter/install_command/install_command.out @@ -0,0 +1,2 @@ +Warning "deno jupyter" is unstable and might change in the future. +✅ Deno kernelspec installed successfully. diff --git a/tests/specs/jupyter/install_command_not_exists/install_command_not_exists.out b/tests/specs/jupyter/install_command_not_exists/install_command_not_exists.out deleted file mode 100644 index 1bb176e202..0000000000 --- a/tests/specs/jupyter/install_command_not_exists/install_command_not_exists.out +++ /dev/null @@ -1,5 +0,0 @@ -Warning "deno jupyter" is unstable and might change in the future. -error: Failed to spawn 'jupyter' command. Is JupyterLab installed (https://jupyter.org/install) and available on the PATH? - -Caused by: -[WILDCARD]