1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-22 06:09:25 -05:00
denoland-deno/cli/tools/jupyter/server.rs
Bartek Iwańczuk bf07604113
feat: Add "deno jupyter" subcommand (#20337)
This commit adds "deno jupyter" subcommand which
provides a Deno kernel for Jupyter notebooks.

The implementation is mostly based on Deno's REPL and
reuses large parts of it (though there's some clean up that
needs to happen in follow up PRs). Not all functionality of
Jupyter kernel is implemented and some message type
are still not implemented (eg. "inspect_request") but
the kernel is fully working and provides all the capatibilities
that the Deno REPL has; including TypeScript transpilation
and npm packages support.

Closes https://github.com/denoland/deno/issues/13016

---------

Co-authored-by: Adam Powers <apowers@ato.ms>
Co-authored-by: Kyle Kelley <rgbkrk@gmail.com>
2023-09-16 02:42:09 +02:00

724 lines
20 KiB
Rust

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// This file is forked/ported from <https://github.com/evcxr/evcxr>
// Copyright 2020 The Evcxr Authors. MIT license.
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;
use crate::tools::repl;
use crate::tools::repl::cdp;
use deno_core::error::AnyError;
use deno_core::futures;
use deno_core::futures::channel::mpsc;
use deno_core::futures::StreamExt;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use tokio::sync::Mutex;
use zeromq::SocketRecv;
use zeromq::SocketSend;
use super::jupyter_msg::Connection;
use super::jupyter_msg::JupyterMessage;
use super::ConnectionSpec;
pub enum StdioMsg {
Stdout(String),
Stderr(String),
}
pub struct JupyterServer {
execution_count: usize,
last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
// This is Arc<Mutex<>>, so we don't hold RefCell borrows across await
// points.
iopub_socket: Arc<Mutex<Connection<zeromq::PubSocket>>>,
repl_session: repl::ReplSession,
}
impl JupyterServer {
pub async fn start(
spec: ConnectionSpec,
mut stdio_rx: mpsc::UnboundedReceiver<StdioMsg>,
repl_session: repl::ReplSession,
) -> Result<(), AnyError> {
let mut heartbeat =
bind_socket::<zeromq::RepSocket>(&spec, spec.hb_port).await?;
let shell_socket =
bind_socket::<zeromq::RouterSocket>(&spec, spec.shell_port).await?;
let control_socket =
bind_socket::<zeromq::RouterSocket>(&spec, spec.control_port).await?;
let _stdin_socket =
bind_socket::<zeromq::RouterSocket>(&spec, spec.stdin_port).await?;
let iopub_socket =
bind_socket::<zeromq::PubSocket>(&spec, spec.iopub_port).await?;
let iopub_socket = Arc::new(Mutex::new(iopub_socket));
let last_execution_request = Rc::new(RefCell::new(None));
let cancel_handle = CancelHandle::new_rc();
let cancel_handle2 = CancelHandle::new_rc();
let mut server = Self {
execution_count: 0,
iopub_socket: iopub_socket.clone(),
last_execution_request: last_execution_request.clone(),
repl_session,
};
let handle1 = deno_core::unsync::spawn(async move {
if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await {
eprintln!("Heartbeat error: {}", err);
}
});
let handle2 = deno_core::unsync::spawn(async move {
if let Err(err) =
Self::handle_control(control_socket, cancel_handle2).await
{
eprintln!("Control error: {}", err);
}
});
let handle3 = deno_core::unsync::spawn(async move {
if let Err(err) = server.handle_shell(shell_socket).await {
eprintln!("Shell error: {}", err);
}
});
let handle4 = deno_core::unsync::spawn(async move {
while let Some(stdio_msg) = stdio_rx.next().await {
Self::handle_stdio_msg(
iopub_socket.clone(),
last_execution_request.clone(),
stdio_msg,
)
.await;
}
});
let join_fut =
futures::future::try_join_all(vec![handle1, handle2, handle3, handle4]);
if let Ok(result) = join_fut.or_cancel(cancel_handle).await {
result?;
}
Ok(())
}
async fn handle_stdio_msg<S: zeromq::SocketSend>(
iopub_socket: Arc<Mutex<Connection<S>>>,
last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
stdio_msg: StdioMsg,
) {
let maybe_exec_result = last_execution_request.borrow().clone();
if let Some(exec_request) = maybe_exec_result {
let (name, text) = match stdio_msg {
StdioMsg::Stdout(text) => ("stdout", text),
StdioMsg::Stderr(text) => ("stderr", text),
};
let result = exec_request
.new_message("stream")
.with_content(json!({
"name": name,
"text": text
}))
.send(&mut *iopub_socket.lock().await)
.await;
if let Err(err) = result {
eprintln!("Output {} error: {}", name, err);
}
}
}
async fn handle_heartbeat(
connection: &mut Connection<zeromq::RepSocket>,
) -> Result<(), AnyError> {
loop {
connection.socket.recv().await?;
connection
.socket
.send(zeromq::ZmqMessage::from(b"ping".to_vec()))
.await?;
}
}
async fn handle_control(
mut connection: Connection<zeromq::RouterSocket>,
cancel_handle: Rc<CancelHandle>,
) -> Result<(), AnyError> {
loop {
let msg = JupyterMessage::read(&mut connection).await?;
match msg.message_type() {
"kernel_info_request" => {
msg
.new_reply()
.with_content(kernel_info())
.send(&mut connection)
.await?;
}
"shutdown_request" => {
cancel_handle.cancel();
}
"interrupt_request" => {
eprintln!("Interrupt request currently not supported");
}
_ => {
eprintln!(
"Unrecognized control message type: {}",
msg.message_type()
);
}
}
}
}
async fn handle_shell(
&mut self,
mut connection: Connection<zeromq::RouterSocket>,
) -> Result<(), AnyError> {
loop {
let msg = JupyterMessage::read(&mut connection).await?;
self.handle_shell_message(msg, &mut connection).await?;
}
}
async fn handle_shell_message(
&mut self,
msg: JupyterMessage,
connection: &mut Connection<zeromq::RouterSocket>,
) -> Result<(), AnyError> {
msg
.new_message("status")
.with_content(json!({"execution_state": "busy"}))
.send(&mut *self.iopub_socket.lock().await)
.await?;
match msg.message_type() {
"kernel_info_request" => {
msg
.new_reply()
.with_content(kernel_info())
.send(connection)
.await?;
}
"is_complete_request" => {
msg
.new_reply()
.with_content(json!({"status": "complete"}))
.send(connection)
.await?;
}
"execute_request" => {
self
.handle_execution_request(msg.clone(), connection)
.await?;
}
"comm_open" => {
msg
.comm_close_message()
.send(&mut *self.iopub_socket.lock().await)
.await?;
}
"complete_request" => {
let user_code = msg.code();
let cursor_pos = msg.cursor_pos();
let lsp_completions = self
.repl_session
.language_server
.completions(user_code, cursor_pos)
.await;
if !lsp_completions.is_empty() {
let matches: Vec<String> = lsp_completions
.iter()
.map(|item| item.new_text.clone())
.collect();
let cursor_start = lsp_completions
.first()
.map(|item| item.range.start)
.unwrap_or(cursor_pos);
let cursor_end = lsp_completions
.last()
.map(|item| item.range.end)
.unwrap_or(cursor_pos);
msg
.new_reply()
.with_content(json!({
"status": "ok",
"matches": matches,
"cursor_start": cursor_start,
"cursor_end": cursor_end,
"metadata": {},
}))
.send(connection)
.await?;
} else {
let expr = get_expr_from_line_at_pos(user_code, cursor_pos);
// check if the expression is in the form `obj.prop`
let (completions, cursor_start) = if let Some(index) = expr.rfind('.')
{
let sub_expr = &expr[..index];
let prop_name = &expr[index + 1..];
let candidates =
get_expression_property_names(&mut self.repl_session, sub_expr)
.await
.into_iter()
.filter(|n| {
!n.starts_with("Symbol(")
&& n.starts_with(prop_name)
&& n != &*repl::REPL_INTERNALS_NAME
})
.collect();
(candidates, cursor_pos - prop_name.len())
} else {
// combine results of declarations and globalThis properties
let mut candidates = get_expression_property_names(
&mut self.repl_session,
"globalThis",
)
.await
.into_iter()
.chain(get_global_lexical_scope_names(&mut self.repl_session).await)
.filter(|n| n.starts_with(expr) && n != &*repl::REPL_INTERNALS_NAME)
.collect::<Vec<_>>();
// sort and remove duplicates
candidates.sort();
candidates.dedup(); // make sure to sort first
(candidates, cursor_pos - expr.len())
};
msg
.new_reply()
.with_content(json!({
"status": "ok",
"matches": completions,
"cursor_start": cursor_start,
"cursor_end": cursor_pos,
"metadata": {},
}))
.send(connection)
.await?;
}
}
"comm_msg" | "comm_info_request" | "history_request" => {
// We don't handle these messages
}
_ => {
eprintln!("Unrecognized shell message type: {}", msg.message_type());
}
}
msg
.new_message("status")
.with_content(json!({"execution_state": "idle"}))
.send(&mut *self.iopub_socket.lock().await)
.await?;
Ok(())
}
async fn handle_execution_request(
&mut self,
msg: JupyterMessage,
connection: &mut Connection<zeromq::RouterSocket>,
) -> Result<(), AnyError> {
self.execution_count += 1;
*self.last_execution_request.borrow_mut() = Some(msg.clone());
msg
.new_message("execute_input")
.with_content(json!({
"execution_count": self.execution_count,
"code": msg.code()
}))
.send(&mut *self.iopub_socket.lock().await)
.await?;
let result = self
.repl_session
.evaluate_line_with_object_wrapping(msg.code())
.await;
let evaluate_response = match result {
Ok(eval_response) => eval_response,
Err(err) => {
msg
.new_message("error")
.with_content(json!({
"ename": err.to_string(),
"evalue": "",
"traceback": [],
}))
.send(&mut *self.iopub_socket.lock().await)
.await?;
msg
.new_reply()
.with_content(json!({
"status": "error",
"execution_count": self.execution_count,
}))
.send(connection)
.await?;
return Ok(());
}
};
let repl::cdp::EvaluateResponse {
result,
exception_details,
} = evaluate_response.value;
if exception_details.is_none() {
let output =
get_jupyter_display_or_eval_value(&mut self.repl_session, &result)
.await?;
msg
.new_message("execute_result")
.with_content(json!({
"execution_count": self.execution_count,
"data": output,
"metadata": {},
}))
.send(&mut *self.iopub_socket.lock().await)
.await?;
msg
.new_reply()
.with_content(json!({
"status": "ok",
"execution_count": self.execution_count,
}))
.send(connection)
.await?;
// Let's sleep here for a few ms, so we give a chance to the task that is
// handling stdout and stderr streams to receive and flush the content.
// Otherwise, executing multiple cells one-by-one might lead to output
// from various cells be grouped together in another cell result.
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
} else {
let exception_details = exception_details.unwrap();
let name = if let Some(exception) = exception_details.exception {
if let Some(description) = exception.description {
description
} else if let Some(value) = exception.value {
value.to_string()
} else {
"undefined".to_string()
}
} else {
"Unknown exception".to_string()
};
// TODO(bartlomieju): fill all the fields
msg
.new_message("error")
.with_content(json!({
"ename": name,
"evalue": "",
"traceback": [],
}))
.send(&mut *self.iopub_socket.lock().await)
.await?;
msg
.new_reply()
.with_content(json!({
"status": "error",
"execution_count": self.execution_count,
}))
.send(connection)
.await?;
}
Ok(())
}
}
async fn bind_socket<S: zeromq::Socket>(
config: &ConnectionSpec,
port: u32,
) -> Result<Connection<S>, AnyError> {
let endpoint = format!("{}://{}:{}", config.transport, config.ip, port);
let mut socket = S::new();
socket.bind(&endpoint).await?;
Ok(Connection::new(socket, &config.key))
}
fn kernel_info() -> serde_json::Value {
json!({
"status": "ok",
"protocol_version": "5.3",
"implementation_version": crate::version::deno(),
"implementation": "Deno kernel",
"language_info": {
"name": "typescript",
"version": crate::version::TYPESCRIPT,
"mimetype": "text/x.typescript",
"file_extension": ".ts",
"pygments_lexer": "typescript",
"nb_converter": "script"
},
"help_links": [{
"text": "Visit Deno manual",
"url": "https://deno.land/manual"
}],
"banner": "Welcome to Deno kernel",
})
}
async fn get_jupyter_display(
session: &mut repl::ReplSession,
evaluate_result: &cdp::RemoteObject,
) -> Result<Option<HashMap<String, serde_json::Value>>, AnyError> {
let mut data = HashMap::default();
let response = session
.call_function_on_args(
r#"function (object) {{
return object[Symbol.for("Jupyter.display")]();
}}"#
.to_string(),
&[evaluate_result.clone()],
)
.await?;
if response.exception_details.is_some() {
return Ok(None);
}
let object_id = response.result.object_id.unwrap();
let get_properties_response_result = session
.post_message_with_event_loop(
"Runtime.getProperties",
Some(cdp::GetPropertiesArgs {
object_id,
own_properties: Some(true),
accessor_properties_only: None,
generate_preview: None,
non_indexed_properties_only: Some(true),
}),
)
.await;
let Ok(get_properties_response) = get_properties_response_result else {
return Ok(None);
};
let get_properties_response: cdp::GetPropertiesResponse =
serde_json::from_value(get_properties_response).unwrap();
for prop in get_properties_response.result.into_iter() {
if let Some(value) = &prop.value {
data.insert(
prop.name.to_string(),
value
.value
.clone()
.unwrap_or_else(|| serde_json::Value::Null),
);
}
}
if !data.is_empty() {
return Ok(Some(data));
}
Ok(None)
}
async fn get_jupyter_display_or_eval_value(
session: &mut repl::ReplSession,
evaluate_result: &cdp::RemoteObject,
) -> Result<HashMap<String, serde_json::Value>, AnyError> {
// Printing "undefined" generates a lot of noise, so let's skip
// these.
if evaluate_result.kind == "undefined" {
return Ok(HashMap::default());
}
if let Some(data) = get_jupyter_display(session, evaluate_result).await? {
return Ok(data);
}
let response = session
.call_function_on_args(
format!(
r#"function (object) {{
try {{
return {0}.inspectArgs(["%o", object], {{ colors: !{0}.noColor }});
}} catch (err) {{
return {0}.inspectArgs(["%o", err]);
}}
}}"#,
*repl::REPL_INTERNALS_NAME
),
&[evaluate_result.clone()],
)
.await?;
let mut data = HashMap::default();
if let Some(value) = response.result.value {
data.insert("text/plain".to_string(), value);
}
Ok(data)
}
// TODO(bartlomieju): dedup with repl::editor
fn get_expr_from_line_at_pos(line: &str, cursor_pos: usize) -> &str {
let start = line[..cursor_pos].rfind(is_word_boundary).unwrap_or(0);
let end = line[cursor_pos..]
.rfind(is_word_boundary)
.map(|i| cursor_pos + i)
.unwrap_or(cursor_pos);
let word = &line[start..end];
let word = word.strip_prefix(is_word_boundary).unwrap_or(word);
let word = word.strip_suffix(is_word_boundary).unwrap_or(word);
word
}
// TODO(bartlomieju): dedup with repl::editor
fn is_word_boundary(c: char) -> bool {
if matches!(c, '.' | '_' | '$') {
false
} else {
char::is_ascii_whitespace(&c) || char::is_ascii_punctuation(&c)
}
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_global_lexical_scope_names(
session: &mut repl::ReplSession,
) -> Vec<String> {
let evaluate_response = session
.post_message_with_event_loop(
"Runtime.globalLexicalScopeNames",
Some(cdp::GlobalLexicalScopeNamesArgs {
execution_context_id: Some(session.context_id),
}),
)
.await
.unwrap();
let evaluate_response: cdp::GlobalLexicalScopeNamesResponse =
serde_json::from_value(evaluate_response).unwrap();
evaluate_response.names
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_expression_property_names(
session: &mut repl::ReplSession,
expr: &str,
) -> Vec<String> {
// try to get the properties from the expression
if let Some(properties) = get_object_expr_properties(session, expr).await {
return properties;
}
// otherwise fall back to the prototype
let expr_type = get_expression_type(session, expr).await;
let object_expr = match expr_type.as_deref() {
// possibilities: https://chromedevtools.github.io/devtools-protocol/v8/Runtime/#type-RemoteObject
Some("object") => "Object.prototype",
Some("function") => "Function.prototype",
Some("string") => "String.prototype",
Some("boolean") => "Boolean.prototype",
Some("bigint") => "BigInt.prototype",
Some("number") => "Number.prototype",
_ => return Vec::new(), // undefined, symbol, and unhandled
};
get_object_expr_properties(session, object_expr)
.await
.unwrap_or_default()
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_expression_type(
session: &mut repl::ReplSession,
expr: &str,
) -> Option<String> {
evaluate_expression(session, expr)
.await
.map(|res| res.result.kind)
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_object_expr_properties(
session: &mut repl::ReplSession,
object_expr: &str,
) -> Option<Vec<String>> {
let evaluate_result = evaluate_expression(session, object_expr).await?;
let object_id = evaluate_result.result.object_id?;
let get_properties_response = session
.post_message_with_event_loop(
"Runtime.getProperties",
Some(cdp::GetPropertiesArgs {
object_id,
own_properties: None,
accessor_properties_only: None,
generate_preview: None,
non_indexed_properties_only: Some(true),
}),
)
.await
.ok()?;
let get_properties_response: cdp::GetPropertiesResponse =
serde_json::from_value(get_properties_response).ok()?;
Some(
get_properties_response
.result
.into_iter()
.map(|prop| prop.name)
.collect(),
)
}
// TODO(bartlomieju): dedup with repl::editor
async fn evaluate_expression(
session: &mut repl::ReplSession,
expr: &str,
) -> Option<cdp::EvaluateResponse> {
let evaluate_response = session
.post_message_with_event_loop(
"Runtime.evaluate",
Some(cdp::EvaluateArgs {
expression: expr.to_string(),
object_group: None,
include_command_line_api: None,
silent: None,
context_id: Some(session.context_id),
return_by_value: None,
generate_preview: None,
user_gesture: None,
await_promise: None,
throw_on_side_effect: Some(true),
timeout: Some(200),
disable_breaks: None,
repl_mode: None,
allow_unsafe_eval_blocked_by_csp: None,
unique_context_id: None,
}),
)
.await
.ok()?;
let evaluate_response: cdp::EvaluateResponse =
serde_json::from_value(evaluate_response).ok()?;
if evaluate_response.exception_details.is_some() {
None
} else {
Some(evaluate_response)
}
}