1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 13:00:36 -05:00

core: implement 'AsyncRefCell' and 'ResourceTable2' (#8273)

This commit is contained in:
Bert Belder 2020-11-25 00:38:23 +01:00
parent 605874ee98
commit 8d12653738
No known key found for this signature in database
GPG key ID: 7A77887B2E2ED461
13 changed files with 1264 additions and 168 deletions

250
Cargo.lock generated
View file

@ -103,11 +103,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9021768bcce77296b64648cc7a7460e3df99979b97ed5c925c38d1cc83778d98"
dependencies = [
"brotli",
"bytes",
"bytes 0.5.6",
"flate2",
"futures-core",
"memchr",
"pin-project-lite",
"pin-project-lite 0.1.7",
]
[[package]]
@ -242,6 +242,12 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bytes"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16"
[[package]]
name = "cargo_gn"
version = "0.0.15"
@ -260,6 +266,12 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.15"
@ -295,6 +307,15 @@ dependencies = [
"bitflags",
]
[[package]]
name = "cloudabi"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467"
dependencies = [
"bitflags",
]
[[package]]
name = "const-random"
version = "0.1.8"
@ -336,7 +357,7 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
]
[[package]]
@ -356,7 +377,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
"autocfg 1.0.1",
"cfg-if",
"cfg-if 0.1.10",
"lazy_static",
]
@ -402,7 +423,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5"
dependencies = [
"ahash",
"cfg-if",
"cfg-if 0.1.10",
"num_cpus",
]
@ -413,7 +434,7 @@ dependencies = [
"atty",
"base64 0.12.3",
"byteorder",
"bytes",
"bytes 0.5.6",
"chrono",
"clap",
"deno_core",
@ -453,7 +474,7 @@ dependencies = [
"tempfile",
"termcolor",
"test_util",
"tokio",
"tokio 0.2.22",
"tokio-rustls",
"tokio-tungstenite",
"uuid",
@ -479,7 +500,7 @@ dependencies = [
"serde",
"serde_json",
"smallvec",
"tokio",
"tokio 0.3.4",
"url",
]
@ -640,7 +661,7 @@ version = "0.8.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a51b8cf747471cb9499b6d59e59b0444f4c90eba8968c4e44874e92b5b64ace2"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
]
[[package]]
@ -691,7 +712,7 @@ version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ed85775dcc68644b5c950ac06a2b23768d3bc9390464151aaf27136998dcf9e"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"redox_syscall",
"winapi 0.3.9",
@ -709,7 +730,7 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "766d0e77a2c1502169d4a93ff3b8c15a71fd946cd0126309752104e5f3c46d94"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"crc32fast",
"libc",
"miniz_oxide",
@ -913,7 +934,7 @@ version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
]
@ -924,7 +945,7 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "993f9e0baeed60001cf565546b0d3dbe6a6ad23f2bd31644a133c641eccf6d53"
dependencies = [
"bytes",
"bytes 0.5.6",
"fnv",
"futures-core",
"futures-sink",
@ -932,7 +953,7 @@ dependencies = [
"http",
"indexmap",
"slab",
"tokio",
"tokio 0.2.22",
"tokio-util",
"tracing",
]
@ -951,7 +972,7 @@ checksum = "ed18eb2459bf1a09ad2d6b1547840c3e5e62882fa09b9a6a20b1de8e3228848f"
dependencies = [
"base64 0.12.3",
"bitflags",
"bytes",
"bytes 0.5.6",
"headers-core",
"http",
"mime",
@ -983,7 +1004,7 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9"
dependencies = [
"bytes",
"bytes 0.5.6",
"fnv",
"itoa",
]
@ -994,7 +1015,7 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
dependencies = [
"bytes",
"bytes 0.5.6",
"http",
]
@ -1019,7 +1040,7 @@ version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e68a8dd9716185d9e64ea473ea6ef63529252e3e27623295a0378a19665d5eb"
dependencies = [
"bytes",
"bytes 0.5.6",
"futures-channel",
"futures-core",
"futures-util",
@ -1031,7 +1052,7 @@ dependencies = [
"pin-project 0.4.23",
"socket2",
"time",
"tokio",
"tokio 0.2.22",
"tower-service",
"tracing",
"want",
@ -1043,12 +1064,12 @@ version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37743cc83e8ee85eacfce90f2f4102030d9ff0a95244098d781e9bee4a90abb6"
dependencies = [
"bytes",
"bytes 0.5.6",
"futures-util",
"hyper",
"log",
"rustls",
"tokio",
"tokio 0.2.22",
"tokio-rustls",
"webpki",
]
@ -1112,7 +1133,16 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754"
dependencies = [
"bytes",
"bytes 0.5.6",
]
[[package]]
name = "instant"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
@ -1206,7 +1236,7 @@ checksum = "db65c6da02e61f55dae90a0ae427b2a5f6b3e8db09f58d10efab23af92592616"
dependencies = [
"arrayvec",
"bitflags",
"cfg-if",
"cfg-if 0.1.10",
"ryu",
"static_assertions",
]
@ -1217,13 +1247,22 @@ version = "0.2.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614"
[[package]]
name = "lock_api"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
]
[[package]]
@ -1276,7 +1315,7 @@ version = "0.6.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"fuchsia-zircon",
"fuchsia-zircon-sys",
"iovec",
@ -1289,6 +1328,19 @@ dependencies = [
"winapi 0.2.8",
]
[[package]]
name = "mio"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f33bc887064ef1fd66020c9adfc45bb9f33d75a42096c81e7c56c65b75dd1a8b"
dependencies = [
"libc",
"log",
"miow 0.3.6",
"ntapi",
"winapi 0.3.9",
]
[[package]]
name = "mio-extras"
version = "2.0.6"
@ -1297,7 +1349,7 @@ checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19"
dependencies = [
"lazycell",
"log",
"mio",
"mio 0.6.22",
"slab",
]
@ -1308,8 +1360,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656"
dependencies = [
"log",
"mio",
"miow 0.3.5",
"mio 0.6.22",
"miow 0.3.6",
"winapi 0.3.9",
]
@ -1321,7 +1373,7 @@ checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0"
dependencies = [
"iovec",
"libc",
"mio",
"mio 0.6.22",
]
[[package]]
@ -1338,9 +1390,9 @@ dependencies = [
[[package]]
name = "miow"
version = "0.3.5"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e"
checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
dependencies = [
"socket2",
"winapi 0.3.9",
@ -1370,7 +1422,7 @@ version = "0.2.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ebc3ec692ed7c9a255596c67808dee269f64655d8baf7b4f0638e51ba1d6853"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"winapi 0.3.9",
]
@ -1389,7 +1441,7 @@ checksum = "83450fe6a6142ddd95fb064b746083fc4ef1705fe81f64a64e1d4b39f54a1055"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"cfg-if 0.1.10",
"libc",
]
@ -1418,12 +1470,21 @@ dependencies = [
"fsevent-sys",
"inotify",
"libc",
"mio",
"mio 0.6.22",
"mio-extras",
"walkdir",
"winapi 0.3.9",
]
[[package]]
name = "ntapi"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "num-bigint"
version = "0.2.6"
@ -1511,6 +1572,32 @@ dependencies = [
"stable_deref_trait",
]
[[package]]
name = "parking_lot"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b"
dependencies = [
"cfg-if 0.1.10",
"cloudabi 0.1.0",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
@ -1617,6 +1704,12 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
[[package]]
name = "pin-project-lite"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c"
[[package]]
name = "pin-utils"
version = "0.1.0"
@ -1837,7 +1930,7 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071"
dependencies = [
"cloudabi",
"cloudabi 0.0.3",
"fuchsia-cprng",
"libc",
"rand_core 0.4.2",
@ -1929,7 +2022,7 @@ checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e"
dependencies = [
"async-compression",
"base64 0.12.3",
"bytes",
"bytes 0.5.6",
"encoding_rs",
"futures-core",
"futures-util",
@ -1944,11 +2037,11 @@ dependencies = [
"mime",
"mime_guess",
"percent-encoding",
"pin-project-lite",
"pin-project-lite 0.1.7",
"rustls",
"serde",
"serde_urlencoded",
"tokio",
"tokio 0.2.22",
"tokio-rustls",
"url",
"wasm-bindgen",
@ -2020,7 +2113,7 @@ version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f0d5e7b0219a3eadd5439498525d4765c59b7c993ef0c12244865cd2d988413"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"log",
"memchr",
@ -2169,7 +2262,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "170a36ea86c864a3f16dd2687712dd6646f7019f301e57537c7f4dc9f5916770"
dependencies = [
"block-buffer 0.9.0",
"cfg-if",
"cfg-if 0.1.10",
"cpuid-bool",
"digest 0.9.0",
"opaque-debug 0.3.0",
@ -2211,11 +2304,11 @@ checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252"
[[package]]
name = "socket2"
version = "0.3.15"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1fa70dc5c8104ec096f4fe7ede7a221d35ae13dcd19ba1ad9a81d2cab9a1c44"
checksum = "2c29947abdee2a218277abeca306f25789c938e500ea5a9d4b12a5a504466902"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"libc",
"redox_syscall",
"winapi 0.3.9",
@ -2348,7 +2441,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dd78e25ecc138a4667f5e5ea4d1a1c35d424477882b549d4fc011062eecd50e"
dependencies = [
"ast_node",
"cfg-if",
"cfg-if 0.1.10",
"either",
"from_variant",
"fxhash",
@ -2616,7 +2709,7 @@ version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"libc",
"rand 0.7.3",
"redox_syscall",
@ -2646,14 +2739,14 @@ dependencies = [
name = "test_util"
version = "0.1.0"
dependencies = [
"bytes",
"bytes 0.5.6",
"futures",
"lazy_static",
"os_pipe",
"pty",
"regex",
"tempfile",
"tokio",
"tokio 0.2.22",
"warp",
]
@ -2718,21 +2811,43 @@ version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd"
dependencies = [
"bytes",
"bytes 0.5.6",
"fnv",
"futures-core",
"iovec",
"lazy_static",
"libc",
"memchr",
"mio",
"mio 0.6.22",
"mio-named-pipes",
"mio-uds",
"num_cpus",
"pin-project-lite",
"pin-project-lite 0.1.7",
"signal-hook-registry",
"slab",
"tokio-macros",
"tokio-macros 0.2.5",
"winapi 0.3.9",
]
[[package]]
name = "tokio"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dfe2523e6fa84ddf5e688151d4e5fddc51678de9752c6512a24714c23818d61"
dependencies = [
"autocfg 1.0.1",
"bytes 0.6.0",
"futures-core",
"lazy_static",
"libc",
"memchr",
"mio 0.7.6",
"num_cpus",
"parking_lot",
"pin-project-lite 0.2.0",
"signal-hook-registry",
"slab",
"tokio-macros 0.3.1",
"winapi 0.3.9",
]
@ -2747,6 +2862,17 @@ dependencies = [
"syn 1.0.48",
]
[[package]]
name = "tokio-macros"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21d30fdbb5dc2d8f91049691aa1a9d4d4ae422a21c334ce8936e5886d30c5c45"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.48",
]
[[package]]
name = "tokio-rustls"
version = "0.14.1"
@ -2755,7 +2881,7 @@ checksum = "e12831b255bcfa39dc0436b01e19fea231a37db570686c06ee72c423479f889a"
dependencies = [
"futures-core",
"rustls",
"tokio",
"tokio 0.2.22",
"webpki",
]
@ -2768,7 +2894,7 @@ dependencies = [
"futures-util",
"log",
"pin-project 0.4.23",
"tokio",
"tokio 0.2.22",
"tungstenite",
]
@ -2778,12 +2904,12 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499"
dependencies = [
"bytes",
"bytes 0.5.6",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
"pin-project-lite 0.1.7",
"tokio 0.2.22",
]
[[package]]
@ -2807,7 +2933,7 @@ version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d79ca061b032d6ce30c660fded31189ca0b9922bf483cd70759f13a2d86786c"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"log",
"tracing-core",
]
@ -2845,7 +2971,7 @@ checksum = "f0308d80d86700c5878b9ef6321f020f29b1bb9d5ff3cab25e75e23f3a492a23"
dependencies = [
"base64 0.12.3",
"byteorder",
"bytes",
"bytes 0.5.6",
"http",
"httparse",
"input_buffer",
@ -3006,7 +3132,7 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f41be6df54c97904af01aa23e613d4521eed7ab23537cede692d4058f6449407"
dependencies = [
"bytes",
"bytes 0.5.6",
"futures",
"headers",
"http",
@ -3020,7 +3146,7 @@ dependencies = [
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio 0.2.22",
"tokio-rustls",
"tokio-tungstenite",
"tower-service",
@ -3047,7 +3173,7 @@ version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"serde",
"serde_json",
"wasm-bindgen-macro",
@ -3074,7 +3200,7 @@ version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7866cab0aa01de1edf8b5d7936938a7e397ee50ce24119aef3e1eaa3b6171da"
dependencies = [
"cfg-if",
"cfg-if 0.1.10",
"js-sys",
"wasm-bindgen",
"web-sys",

View file

@ -14,7 +14,7 @@ path = "lib.rs"
[dependencies]
anyhow = "1.0.32"
futures = "0.3.5"
futures = "0.3.8"
indexmap = "1.6.0"
lazy_static = "1.4.0"
libc = "0.2.77"
@ -35,4 +35,4 @@ path = "examples/http_bench_json_ops.rs"
# These dependendencies are only used for the 'http_bench_*_ops' examples.
[dev-dependencies]
tokio = { version = "0.2.22", features = ["full"] }
tokio = { version = "0.3.4", features = ["full"] }

713
core/async_cell.rs Normal file
View file

@ -0,0 +1,713 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use std::any::Any;
use std::borrow::Borrow;
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::ops::Deref;
use std::rc::Rc;
use self::internal as i;
pub type AsyncRef<T> = i::AsyncBorrowImpl<T, i::Shared>;
pub type AsyncMut<T> = i::AsyncBorrowImpl<T, i::Exclusive>;
pub type AsyncRefFuture<T> = i::AsyncBorrowFutureImpl<T, i::Shared>;
pub type AsyncMutFuture<T> = i::AsyncBorrowFutureImpl<T, i::Exclusive>;
pub struct AsyncRefCell<T> {
value: UnsafeCell<T>,
borrow_count: Cell<i::BorrowCount>,
waiters: Cell<VecDeque<Option<i::Waiter>>>,
turn: Cell<usize>,
}
impl<T: 'static> AsyncRefCell<T> {
/// Create a new `AsyncRefCell` that encapsulates the specified value.
/// Note that in order to borrow the inner value, the `AsyncRefCell`
/// needs to be wrapped in an `Rc` or an `RcRef`. These can be created
/// either manually, or by using the convenience method
/// `AsyncRefCell::new_rc()`.
pub fn new(value: T) -> Self {
Self {
value: UnsafeCell::new(value),
borrow_count: Default::default(),
waiters: Default::default(),
turn: Default::default(),
}
}
pub fn new_rc(value: T) -> Rc<Self> {
Rc::new(Self::new(value))
}
pub fn as_ptr(&self) -> *mut T {
self.value.get()
}
}
impl<T: Default + 'static> Default for AsyncRefCell<T> {
fn default() -> Self {
Self::new(Default::default())
}
}
impl<T: Default + 'static> AsyncRefCell<T> {
pub fn default_rc() -> Rc<Self> {
Rc::new(Default::default())
}
}
impl<T: 'static> From<T> for AsyncRefCell<T> {
fn from(value: T) -> Self {
Self::new(value)
}
}
impl<T> AsyncRefCell<T> {
pub fn borrow(self: &Rc<Self>) -> AsyncRefFuture<T> {
AsyncRefFuture::new(self)
}
pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<T> {
AsyncMutFuture::new(self)
}
pub fn try_borrow(self: &Rc<Self>) -> Option<AsyncRef<T>> {
Self::borrow_sync(self)
}
pub fn try_borrow_mut(self: &Rc<Self>) -> Option<AsyncMut<T>> {
Self::borrow_sync(self)
}
}
impl<T> RcRef<AsyncRefCell<T>> {
pub fn borrow(&self) -> AsyncRefFuture<T> {
AsyncRefFuture::new(self)
}
pub fn borrow_mut(&self) -> AsyncMutFuture<T> {
AsyncMutFuture::new(self)
}
pub fn try_borrow(&self) -> Option<AsyncRef<T>> {
AsyncRefCell::<T>::borrow_sync(self)
}
pub fn try_borrow_mut(&self) -> Option<AsyncMut<T>> {
AsyncRefCell::<T>::borrow_sync(self)
}
}
/// An `RcRef` encapsulates a reference counted pointer, just like a regular
/// `std::rc::Rc`. However, unlike a regular `Rc`, it can be remapped so that
/// it dereferences to any value that's reachable through the reference-counted
/// pointer. This is achieved through the associated method, `RcRef::map()`,
/// similar to how `std::cell::Ref::map()` works. Example:
///
/// ```rust
/// # use std::rc::Rc;
/// # use deno_core::async_cell::RcRef;
///
/// struct Stuff {
/// foo: u32,
/// bar: String,
/// }
///
/// let stuff_rc = Rc::new(Stuff {
/// foo: 42,
/// bar: "hello".to_owned(),
/// });
///
/// // `foo_rc` and `bar_rc` dereference to different types, however
/// // they share a reference count.
/// let foo_rc: RcRef<u32> = RcRef::map(stuff_rc.clone(), |v| &v.foo);
/// let bar_rc: RcRef<String> = RcRef::map(stuff_rc, |v| &v.bar);
/// ```
pub struct RcRef<T> {
rc: Rc<dyn Any>,
value: *const T,
}
impl<T: 'static> RcRef<T> {
pub fn new(value: T) -> Self {
Self::from(Rc::new(value))
}
pub fn map<S: 'static, R: i::RcLike<S>, F: FnOnce(&S) -> &T>(
source: R,
map_fn: F,
) -> RcRef<T> {
let RcRef::<S> { rc, value } = source.into();
let value = map_fn(unsafe { &*value });
RcRef { rc, value }
}
}
impl<T: Default + 'static> Default for RcRef<T> {
fn default() -> Self {
Self::new(Default::default())
}
}
impl<T: 'static> From<Rc<T>> for RcRef<T> {
fn from(rc: Rc<T>) -> Self {
Self {
value: &*rc,
rc: rc as Rc<_>,
}
}
}
impl<T> Clone for RcRef<T> {
fn clone(&self) -> Self {
Self {
rc: self.rc.clone(),
value: self.value,
}
}
}
impl<T> Deref for RcRef<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.value }
}
}
impl<T> Borrow<T> for RcRef<T> {
fn borrow(&self) -> &T {
&**self
}
}
impl<T> AsRef<T> for RcRef<T> {
fn as_ref(&self) -> &T {
&**self
}
}
mod internal {
use super::AsyncRefCell;
use super::RcRef;
use futures::future::Future;
use futures::ready;
use futures::task::Context;
use futures::task::Poll;
use futures::task::Waker;
use std::borrow::Borrow;
use std::borrow::BorrowMut;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
impl<T> AsyncRefCell<T> {
/// Borrow the cell's contents synchronouslym without creating an
/// intermediate future. If the cell has already been borrowed and either
/// the existing or the requested borrow is exclusive, this function returns
/// `None`.
pub(super) fn borrow_sync<
M: BorrowModeTrait,
R: RcLike<AsyncRefCell<T>>,
>(
cell: &R,
) -> Option<AsyncBorrowImpl<T, M>> {
// Don't allow synchronous borrows to cut in line; if there are any
// enqueued waiters, return `None`, even if the current borrow is a shared
// one and the requested borrow is too.
let waiters = unsafe { &mut *cell.waiters.as_ptr() };
if waiters.is_empty() {
// There are no enqueued waiters, but it is still possible that the cell
// is currently borrowed. If there are no current borrows, or both the
// existing and requested ones are shared, `try_add()` returns the
// adjusted borrow count.
let new_borrow_count =
cell.borrow_count.get().try_add(M::borrow_mode())?;
cell.borrow_count.set(new_borrow_count);
Some(AsyncBorrowImpl::<T, M>::new(cell.clone().into()))
} else {
None
}
}
fn drop_borrow<M: BorrowModeTrait>(&self) {
let new_borrow_count = self.borrow_count.get().remove(M::borrow_mode());
self.borrow_count.set(new_borrow_count);
if new_borrow_count.is_empty() {
self.wake_waiters()
}
}
fn create_waiter<M: BorrowModeTrait>(&self) -> usize {
let waiter = Waiter::new(M::borrow_mode());
let turn = self.turn.get();
let index = {
let waiters = unsafe { &mut *self.waiters.as_ptr() };
waiters.push_back(Some(waiter));
waiters.len() - 1
};
if index == 0 {
// SAFETY: the `waiters` reference used above *must* be dropped here.
self.wake_waiters()
}
// Return the new waiter's id.
turn + index
}
fn poll_waiter<M: BorrowModeTrait>(
&self,
id: usize,
cx: &mut Context,
) -> Poll<()> {
let borrow_count = self.borrow_count.get();
let turn = self.turn.get();
if id < turn {
// This waiter made it to the front of the line; we reserved a borrow
// for it, woke its Waker, and removed the waiter from the queue.
// Assertion: BorrowCount::remove() will panic if `mode` is incorrect.
let _ = borrow_count.remove(M::borrow_mode());
Poll::Ready(())
} else {
// This waiter is still in line and has not yet been woken.
let waiters = unsafe { &mut *self.waiters.as_ptr() };
// Sanity check: id cannot be higher than the last queue element.
assert!(id < turn + waiters.len());
// Sanity check: since we always call wake_waiters() when the queue head
// is updated, it should be impossible to add it to the current borrow.
assert!(id > turn || borrow_count.try_add(M::borrow_mode()).is_none());
// Save or update the waiter's Waker.
// TODO(piscisaureus): Use will_wake() to make this more efficient.
let waiter_mut = waiters[id - turn].as_mut().unwrap();
waiter_mut.set_waker(cx.waker().clone());
Poll::Pending
}
}
fn wake_waiters(&self) {
let mut borrow_count = self.borrow_count.get();
let waiters = unsafe { &mut *self.waiters.as_ptr() };
let mut turn = self.turn.get();
loop {
let waiter_entry = match waiters.front().map(Option::as_ref) {
None => break, // Queue empty.
Some(w) => w,
};
let borrow_mode = match waiter_entry {
None => {
// Queue contains a hole. This happens when a Waiter is dropped
// before it makes it to the front of the queue.
waiters.pop_front();
turn += 1;
continue;
}
Some(waiter) => waiter.borrow_mode(),
};
// See if the waiter at the front of the queue can borrow the cell's
// value now. If it does, `try_add()` returns the new borrow count,
// effectively "reserving" the borrow until the associated
// AsyncBorrowFutureImpl future gets polled and produces the actual
// borrow.
borrow_count = match borrow_count.try_add(borrow_mode) {
None => break, // Can't borrow yet.
Some(b) => b,
};
// Drop from queue.
let mut waiter = waiters.pop_front().unwrap().unwrap();
turn += 1;
// Wake this waiter, so the AsyncBorrowFutureImpl future gets polled.
if let Some(waker) = waiter.take_waker() {
waker.wake()
}
}
// Save updated counters.
self.borrow_count.set(borrow_count);
self.turn.set(turn);
}
fn drop_waiter<M: BorrowModeTrait>(&self, id: usize) {
let turn = self.turn.get();
if id < turn {
// We already made a borrow count reservation for this waiter but the
// borrow will never be picked up and removesequently, never dropped.
// Therefore, call the borrow drop handler here.
self.drop_borrow::<M>();
} else {
// This waiter is still in the queue, take it out and leave a "hole".
let waiters = unsafe { &mut *self.waiters.as_ptr() };
waiters[id - turn].take().unwrap();
}
if id == turn {
// Since the first entry in the waiter queue was touched we have to
// reprocess the waiter queue.
self.wake_waiters()
}
}
}
pub struct AsyncBorrowFutureImpl<T: 'static, M: BorrowModeTrait> {
cell: Option<RcRef<AsyncRefCell<T>>>,
id: usize,
_phantom: PhantomData<M>,
}
impl<T, M: BorrowModeTrait> AsyncBorrowFutureImpl<T, M> {
pub fn new<R: RcLike<AsyncRefCell<T>>>(cell: &R) -> Self {
Self {
cell: Some(cell.clone().into()),
id: cell.create_waiter::<M>(),
_phantom: PhantomData,
}
}
}
impl<T: 'static, M: BorrowModeTrait> Future for AsyncBorrowFutureImpl<T, M> {
type Output = AsyncBorrowImpl<T, M>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(self.cell.as_ref().unwrap().poll_waiter::<M>(self.id, cx));
let self_mut = unsafe { Pin::get_unchecked_mut(self) };
let cell = self_mut.cell.take().unwrap();
Poll::Ready(AsyncBorrowImpl::<T, M>::new(cell))
}
}
impl<T, M: BorrowModeTrait> Drop for AsyncBorrowFutureImpl<T, M> {
fn drop(&mut self) {
// The expected mode of operation is that this future gets polled until it
// is ready and yields a value of type `AsyncBorrowImpl`, which has a drop
// handler that adjusts the `AsyncRefCell` borrow counter. However if the
// `cell` field still holds a value at this point, it means that the
// future was never polled to completion and no `AsyncBorrowImpl` was ever
// created, so we have to adjust the borrow count here.
if let Some(cell) = self.cell.take() {
cell.drop_waiter::<M>(self.id)
}
}
}
pub struct AsyncBorrowImpl<T: 'static, M: BorrowModeTrait> {
cell: RcRef<AsyncRefCell<T>>,
_phantom: PhantomData<M>,
}
impl<T, M: BorrowModeTrait> AsyncBorrowImpl<T, M> {
fn new(cell: RcRef<AsyncRefCell<T>>) -> Self {
Self {
cell,
_phantom: PhantomData,
}
}
}
impl<T, M: BorrowModeTrait> Deref for AsyncBorrowImpl<T, M> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.cell.as_ptr() }
}
}
impl<T, M: BorrowModeTrait> Borrow<T> for AsyncBorrowImpl<T, M> {
fn borrow(&self) -> &T {
&**self
}
}
impl<T, M: BorrowModeTrait> AsRef<T> for AsyncBorrowImpl<T, M> {
fn as_ref(&self) -> &T {
&**self
}
}
impl<T> DerefMut for AsyncBorrowImpl<T, Exclusive> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.cell.as_ptr() }
}
}
impl<T> BorrowMut<T> for AsyncBorrowImpl<T, Exclusive> {
fn borrow_mut(&mut self) -> &mut T {
&mut **self
}
}
impl<T> AsMut<T> for AsyncBorrowImpl<T, Exclusive> {
fn as_mut(&mut self) -> &mut T {
&mut **self
}
}
impl<T, M: BorrowModeTrait> Drop for AsyncBorrowImpl<T, M> {
fn drop(&mut self) {
self.cell.drop_borrow::<M>()
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum BorrowMode {
Shared,
Exclusive,
}
pub trait BorrowModeTrait: Copy {
fn borrow_mode() -> BorrowMode;
}
#[derive(Copy, Clone, Debug)]
pub struct Shared;
impl BorrowModeTrait for Shared {
fn borrow_mode() -> BorrowMode {
BorrowMode::Shared
}
}
#[derive(Copy, Clone, Debug)]
pub struct Exclusive;
impl BorrowModeTrait for Exclusive {
fn borrow_mode() -> BorrowMode {
BorrowMode::Exclusive
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum BorrowCount {
Shared(usize),
Exclusive,
}
impl Default for BorrowCount {
fn default() -> Self {
Self::Shared(0)
}
}
impl BorrowCount {
pub fn is_empty(self) -> bool {
matches!(self, BorrowCount::Shared(0))
}
pub fn try_add(self, mode: BorrowMode) -> Option<BorrowCount> {
match (self, mode) {
(BorrowCount::Shared(refs), BorrowMode::Shared) => {
Some(BorrowCount::Shared(refs + 1))
}
(BorrowCount::Shared(0), BorrowMode::Exclusive) => {
Some(BorrowCount::Exclusive)
}
_ => None,
}
}
#[allow(dead_code)]
pub fn add(self, mode: BorrowMode) -> BorrowCount {
match self.try_add(mode) {
Some(value) => value,
None => panic!("Can't add {:?} to {:?}", mode, self),
}
}
pub fn try_remove(self, mode: BorrowMode) -> Option<BorrowCount> {
match (self, mode) {
(BorrowCount::Shared(refs), BorrowMode::Shared) if refs > 0 => {
Some(BorrowCount::Shared(refs - 1))
}
(BorrowCount::Exclusive, BorrowMode::Exclusive) => {
Some(BorrowCount::Shared(0))
}
_ => None,
}
}
pub fn remove(self, mode: BorrowMode) -> BorrowCount {
match self.try_remove(mode) {
Some(value) => value,
None => panic!("Can't remove {:?} from {:?}", mode, self),
}
}
}
/// The `waiters` queue that is associated with an individual `AsyncRefCell`
/// contains elements of the `Waiter` type.
pub struct Waiter {
borrow_mode: BorrowMode,
waker: Option<Waker>,
}
impl Waiter {
pub fn new(borrow_mode: BorrowMode) -> Self {
Self {
borrow_mode,
waker: None,
}
}
pub fn borrow_mode(&self) -> BorrowMode {
self.borrow_mode
}
pub fn set_waker(&mut self, waker: Waker) {
self.waker.replace(waker);
}
pub fn take_waker(&mut self) -> Option<Waker> {
self.waker.take()
}
}
/// The `RcLike` trait provides an abstraction over `std::rc::Rc` and `RcRef`,
/// so that applicable methods can operate on either type.
pub trait RcLike<T>: Clone + Deref<Target = T> + Into<RcRef<T>> {}
impl<T: 'static> RcLike<T> for Rc<T> {}
impl<T: 'static> RcLike<T> for RcRef<T> {}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Default)]
struct Thing {
touch_count: usize,
_private: (),
}
impl Thing {
pub fn look(&self) -> usize {
self.touch_count
}
pub fn touch(&mut self) -> usize {
self.touch_count += 1;
self.touch_count
}
}
#[tokio::test]
async fn async_ref_cell_borrow() {
let cell = AsyncRefCell::<Thing>::default_rc();
let fut1 = cell.borrow();
let fut2 = cell.borrow_mut();
let fut3 = cell.borrow();
let fut4 = cell.borrow();
let fut5 = cell.borrow();
let fut6 = cell.borrow();
let fut7 = cell.borrow_mut();
let fut8 = cell.borrow();
// The `try_borrow` and `try_borrow_mut` methods should always return `None`
// if there's a queue of async borrowers.
assert!(cell.try_borrow().is_none());
assert!(cell.try_borrow_mut().is_none());
assert_eq!(fut1.await.look(), 0);
assert_eq!(fut2.await.touch(), 1);
{
let ref5 = fut5.await;
let ref4 = fut4.await;
let ref3 = fut3.await;
let ref6 = fut6.await;
assert_eq!(ref3.look(), 1);
assert_eq!(ref4.look(), 1);
assert_eq!(ref5.look(), 1);
assert_eq!(ref6.look(), 1);
}
{
let mut ref7 = fut7.await;
assert_eq!(ref7.look(), 1);
assert_eq!(ref7.touch(), 2);
}
{
let ref8 = fut8.await;
assert_eq!(ref8.look(), 2);
}
}
#[test]
fn async_ref_cell_try_borrow() {
let cell = AsyncRefCell::<Thing>::default_rc();
{
let ref1 = cell.try_borrow().unwrap();
assert_eq!(ref1.look(), 0);
assert!(cell.try_borrow_mut().is_none());
}
{
let mut ref2 = cell.try_borrow_mut().unwrap();
assert_eq!(ref2.touch(), 1);
assert!(cell.try_borrow().is_none());
assert!(cell.try_borrow_mut().is_none());
}
{
let ref3 = cell.try_borrow().unwrap();
let ref4 = cell.try_borrow().unwrap();
let ref5 = cell.try_borrow().unwrap();
let ref6 = cell.try_borrow().unwrap();
assert_eq!(ref3.look(), 1);
assert_eq!(ref4.look(), 1);
assert_eq!(ref5.look(), 1);
assert_eq!(ref6.look(), 1);
assert!(cell.try_borrow_mut().is_none());
}
{
let mut ref7 = cell.try_borrow_mut().unwrap();
assert_eq!(ref7.look(), 1);
assert_eq!(ref7.touch(), 2);
assert!(cell.try_borrow().is_none());
assert!(cell.try_borrow_mut().is_none());
}
{
let ref8 = cell.try_borrow().unwrap();
assert_eq!(ref8.look(), 2);
assert!(cell.try_borrow_mut().is_none());
assert!(cell.try_borrow().is_some());
}
}
#[derive(Default)]
struct ThreeThings {
pub thing1: AsyncRefCell<Thing>,
pub thing2: AsyncRefCell<Thing>,
pub thing3: AsyncRefCell<Thing>,
}
#[tokio::test]
async fn rc_ref_map() {
let three_cells = Rc::new(ThreeThings::default());
let rc1 = RcRef::map(three_cells.clone(), |things| &things.thing1);
let rc2 = RcRef::map(three_cells.clone(), |things| &things.thing2);
let rc3 = RcRef::map(three_cells, |things| &things.thing3);
let mut ref1 = rc1.borrow_mut().await;
let ref2 = rc2.borrow().await;
let mut ref3 = rc3.borrow_mut().await;
assert_eq!(ref1.look(), 0);
assert_eq!(ref3.touch(), 1);
assert_eq!(ref1.touch(), 1);
assert_eq!(ref2.look(), 0);
assert_eq!(ref3.touch(), 2);
assert_eq!(ref1.look(), 1);
assert_eq!(ref1.touch(), 2);
assert_eq!(ref3.touch(), 3);
assert_eq!(ref1.touch(), 3);
}
}

View file

@ -134,7 +134,6 @@ async function main() {
for (;;) {
const rid = await accept(listenerRid);
// Deno.core.print(`accepted ${rid}`);
if (rid < 0) {
Deno.core.print(`accept error ${rid}`);
return;

View file

@ -3,16 +3,21 @@
#[macro_use]
extern crate log;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::AsyncRefFuture;
use deno_core::BufVec;
use deno_core::JsRuntime;
use deno_core::Op;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::future::TryFuture;
use futures::future::TryFutureExt;
use std::cell::RefCell;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::env;
use std::fmt::Debug;
@ -20,14 +25,10 @@ use std::io::Error;
use std::io::ErrorKind;
use std::mem::size_of;
use std::net::SocketAddr;
use std::pin::Pin;
use std::ptr;
use std::rc::Rc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::runtime;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
struct Logger;
@ -45,6 +46,64 @@ impl log::Log for Logger {
fn flush(&self) {}
}
// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in
// a cell, because it only supports one op (`accept`) which does not require
// a mutable reference to the listener.
struct TcpListener(AsyncRefCell<tokio::net::TcpListener>);
impl Resource for TcpListener {}
impl TcpListener {
/// Returns a future that yields a shared borrow of the TCP listener.
fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> {
RcRef::map(self, |r| &r.0).borrow()
}
}
impl TryFrom<std::net::TcpListener> for TcpListener {
type Error = Error;
fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> {
tokio::net::TcpListener::try_from(l)
.map(AsyncRefCell::new)
.map(Self)
}
}
struct TcpStream {
rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>,
wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>,
}
impl Resource for TcpStream {}
impl TcpStream {
/// Returns a future that yields an exclusive borrow of the read end of the
/// tcp stream.
fn rd_borrow_mut(
self: Rc<Self>,
) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> {
RcRef::map(self, |r| &r.rd).borrow_mut()
}
/// Returns a future that yields an exclusive borrow of the write end of the
/// tcp stream.
fn wr_borrow_mut(
self: Rc<Self>,
) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> {
RcRef::map(self, |r| &r.wr).borrow_mut()
}
}
impl From<tokio::net::TcpStream> for TcpStream {
fn from(s: tokio::net::TcpStream) -> Self {
let (rd, wr) = s.into_split();
Self {
rd: rd.into(),
wr: wr.into(),
}
}
}
#[derive(Copy, Clone, Debug, PartialEq)]
struct Record {
promise_id: u32,
@ -94,8 +153,9 @@ fn op_listen(
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?;
let listener = TcpListener::from_std(std_listener)?;
let rid = state.resource_table.add("tcpListener", Box::new(listener));
std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?;
let rid = state.resource_table_2.add(listener);
Ok(rid)
}
@ -106,7 +166,7 @@ fn op_close(
) -> Result<u32, Error> {
debug!("close rid={}", rid);
state
.resource_table
.resource_table_2
.close(rid)
.map(|_| 0)
.ok_or_else(bad_resource_id)
@ -119,56 +179,52 @@ async fn op_accept(
) -> Result<u32, Error> {
debug!("accept rid={}", rid);
poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let listener_rc = state
.borrow()
.resource_table_2
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
let listener_ref = listener_rc.borrow().await;
let listener = resource_table
.get_mut::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
listener.poll_accept(cx).map_ok(|(stream, _addr)| {
resource_table.add("tcpStream", Box::new(stream))
})
})
.await
let stream: TcpStream = listener_ref.accept().await?.0.into();
let rid = state.borrow_mut().resource_table_2.add(stream);
Ok(rid)
}
fn op_read(
async fn op_read(
state: Rc<RefCell<OpState>>,
rid: u32,
bufs: BufVec,
) -> impl TryFuture<Ok = usize, Error = Error> {
mut bufs: BufVec,
) -> Result<usize, Error> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let mut buf = bufs[0].clone();
debug!("read rid={}", rid);
poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let stream_rc = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let mut rd_stream_mut = stream_rc.rd_borrow_mut().await;
let stream = resource_table
.get_mut::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
Pin::new(stream).poll_read(cx, &mut buf)
})
rd_stream_mut.read(&mut bufs[0]).await
}
fn op_write(
async fn op_write(
state: Rc<RefCell<OpState>>,
rid: u32,
bufs: BufVec,
) -> impl TryFuture<Ok = usize, Error = Error> {
) -> Result<usize, Error> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let buf = bufs[0].clone();
debug!("write rid={}", rid);
poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let stream_rc = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let mut wr_stream_mut = stream_rc.wr_borrow_mut().await;
let stream = resource_table
.get_mut::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
Pin::new(stream).poll_write(cx, &buf)
})
wr_stream_mut.write(&bufs[0]).await
}
fn register_op_bin_sync<F>(
@ -247,8 +303,7 @@ fn main() {
deno_core::v8_set_flags(env::args().collect());
let mut js_runtime = create_js_runtime();
let mut runtime = runtime::Builder::new()
.basic_scheduler()
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

View file

@ -5,25 +5,25 @@ extern crate log;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::AsyncRefFuture;
use deno_core::BufVec;
use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
use futures::future::Future;
use serde_json::Value;
use std::cell::RefCell;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::env;
use std::io::Error;
use std::net::SocketAddr;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::runtime;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
struct Logger;
@ -41,6 +41,64 @@ impl log::Log for Logger {
fn flush(&self) {}
}
// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in
// a cell, because it only supports one op (`accept`) which does not require
// a mutable reference to the listener.
struct TcpListener(AsyncRefCell<tokio::net::TcpListener>);
impl Resource for TcpListener {}
impl TcpListener {
/// Returns a future that yields a shared borrow of the TCP listener.
fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> {
RcRef::map(self, |r| &r.0).borrow()
}
}
impl TryFrom<std::net::TcpListener> for TcpListener {
type Error = Error;
fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> {
tokio::net::TcpListener::try_from(l)
.map(AsyncRefCell::new)
.map(Self)
}
}
struct TcpStream {
rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>,
wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>,
}
impl Resource for TcpStream {}
impl TcpStream {
/// Returns a future that yields an exclusive borrow of the read end of the
/// tcp stream.
fn rd_borrow_mut(
self: Rc<Self>,
) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> {
RcRef::map(self, |r| &r.rd).borrow_mut()
}
/// Returns a future that yields an exclusive borrow of the write end of the
/// tcp stream.
fn wr_borrow_mut(
self: Rc<Self>,
) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> {
RcRef::map(self, |r| &r.wr).borrow_mut()
}
}
impl From<tokio::net::TcpStream> for TcpStream {
fn from(s: tokio::net::TcpStream) -> Self {
let (rd, wr) = s.into_split();
Self {
rd: rd.into(),
wr: wr.into(),
}
}
}
fn create_js_runtime() -> JsRuntime {
let mut runtime = JsRuntime::new(Default::default());
runtime.register_op("listen", deno_core::json_op_sync(op_listen));
@ -59,8 +117,9 @@ fn op_listen(
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?;
let listener = TcpListener::from_std(std_listener)?;
let rid = state.resource_table.add("tcpListener", Box::new(listener));
std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?;
let rid = state.resource_table_2.add(listener);
Ok(serde_json::json!({ "rid": rid }))
}
@ -78,17 +137,17 @@ fn op_close(
.unwrap();
debug!("close rid={}", rid);
state
.resource_table
.resource_table_2
.close(rid)
.map(|_| serde_json::json!(()))
.ok_or_else(bad_resource_id)
}
fn op_accept(
async fn op_accept(
state: Rc<RefCell<OpState>>,
args: Value,
_bufs: BufVec,
) -> impl Future<Output = Result<Value, AnyError>> {
) -> Result<Value, AnyError> {
let rid: u32 = args
.get("rid")
.unwrap()
@ -98,26 +157,24 @@ fn op_accept(
.unwrap();
debug!("accept rid={}", rid);
poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let listener_rc = state
.borrow()
.resource_table_2
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
let listener_ref = listener_rc.borrow().await;
let listener = resource_table
.get_mut::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
listener.poll_accept(cx)?.map(|(stream, _addr)| {
let rid = resource_table.add("tcpStream", Box::new(stream));
Ok(serde_json::json!({ "rid": rid }))
})
})
let stream: TcpStream = listener_ref.accept().await?.0.into();
let rid = state.borrow_mut().resource_table_2.add(stream);
Ok(serde_json::json!({ "rid": rid }))
}
fn op_read(
async fn op_read(
state: Rc<RefCell<OpState>>,
args: Value,
mut bufs: BufVec,
) -> impl Future<Output = Result<Value, AnyError>> {
) -> Result<Value, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let rid: u32 = args
.get("rid")
.unwrap()
@ -127,25 +184,23 @@ fn op_read(
.unwrap();
debug!("read rid={}", rid);
poll_fn(move |cx| -> Poll<Result<Value, AnyError>> {
let resource_table = &mut state.borrow_mut().resource_table;
let stream_rc = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let mut rd_stream_mut = stream_rc.rd_borrow_mut().await;
let stream = resource_table
.get_mut::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
Pin::new(stream)
.poll_read(cx, &mut bufs[0])?
.map(|nread| Ok(serde_json::json!({ "nread": nread })))
})
let nread = rd_stream_mut.read(&mut bufs[0]).await?;
Ok(serde_json::json!({ "nread": nread }))
}
fn op_write(
async fn op_write(
state: Rc<RefCell<OpState>>,
args: Value,
bufs: BufVec,
) -> impl Future<Output = Result<Value, AnyError>> {
) -> Result<Value, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
let rid: u32 = args
.get("rid")
.unwrap()
@ -155,16 +210,15 @@ fn op_write(
.unwrap();
debug!("write rid={}", rid);
poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let stream_rc = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let mut wr_stream_mut = stream_rc.wr_borrow_mut().await;
let stream = resource_table
.get_mut::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
Pin::new(stream)
.poll_write(cx, &bufs[0])?
.map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten })))
})
let nwritten = wr_stream_mut.write(&bufs[0]).await?;
Ok(serde_json::json!({ "nwritten": nwritten }))
}
fn main() {
@ -180,8 +234,7 @@ fn main() {
deno_core::v8_set_flags(env::args().collect());
let mut js_runtime = create_js_runtime();
let mut runtime = runtime::Builder::new()
.basic_scheduler()
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

View file

@ -5,6 +5,7 @@ extern crate lazy_static;
#[macro_use]
extern crate log;
mod async_cell;
mod bindings;
pub mod error;
mod flags;
@ -15,6 +16,7 @@ mod normalize_path;
mod ops;
pub mod plugin_api;
mod resources;
mod resources2;
mod runtime;
mod shared_queue;
mod zero_copy_buf;
@ -26,6 +28,12 @@ pub use serde;
pub use serde_json;
pub use url;
pub use crate::async_cell::AsyncMut;
pub use crate::async_cell::AsyncMutFuture;
pub use crate::async_cell::AsyncRef;
pub use crate::async_cell::AsyncRefCell;
pub use crate::async_cell::AsyncRefFuture;
pub use crate::async_cell::RcRef;
pub use crate::flags::v8_set_flags;
pub use crate::module_specifier::ModuleResolutionError;
pub use crate::module_specifier::ModuleSpecifier;
@ -47,6 +55,9 @@ pub use crate::ops::OpId;
pub use crate::ops::OpState;
pub use crate::ops::OpTable;
pub use crate::resources::ResourceTable;
pub use crate::resources2::Resource;
pub use crate::resources2::ResourceId;
pub use crate::resources2::ResourceTable2;
pub use crate::runtime::GetErrorClassFn;
pub use crate::runtime::JsRuntime;
pub use crate::runtime::RuntimeOptions;

View file

@ -34,6 +34,7 @@ pub enum Op {
/// Maintains the resources and ops inside a JS runtime.
pub struct OpState {
pub resource_table: crate::ResourceTable,
pub resource_table_2: crate::resources2::ResourceTable,
pub op_table: OpTable,
pub get_error_class_fn: crate::runtime::GetErrorClassFn,
gotham_state: GothamState,
@ -45,10 +46,11 @@ impl Default for OpState {
// pub(crate) fn new() -> OpState
fn default() -> OpState {
OpState {
resource_table: crate::ResourceTable::default(),
resource_table: Default::default(),
resource_table_2: Default::default(),
op_table: OpTable::default(),
get_error_class_fn: &|_| "Error",
gotham_state: GothamState::default(),
gotham_state: Default::default(),
}
}
}

View file

@ -6,13 +6,10 @@
// Resources may or may not correspond to a real operating system file
// descriptor (hence the different name).
use crate::resources2::ResourceId;
use std::any::Any;
use std::collections::HashMap;
/// ResourceId is Deno's version of a file descriptor. ResourceId is also referred
/// to as `rid` in the code base.
pub type ResourceId = u32;
/// These store Deno's file descriptors. These are not necessarily the operating
/// system ones.
type ResourceMap = HashMap<ResourceId, (String, Box<dyn Any>)>;

140
core/resources2.rs Normal file
View file

@ -0,0 +1,140 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// Think of Resources as File Descriptors. They are integers that are allocated
// by the privileged side of Deno which refer to various rust objects that need
// to be persisted between various ops. For example, network sockets are
// resources. Resources may or may not correspond to a real operating system
// file descriptor (hence the different name).
use std::any::type_name;
use std::any::Any;
use std::any::TypeId;
use std::borrow::Cow;
use std::collections::HashMap;
use std::iter::Iterator;
use std::rc::Rc;
/// All objects that can be store in the resource table should implement the
/// `Resource` trait.
pub trait Resource: Any + 'static {
/// Returns a string representation of the resource which is made available
/// to JavaScript code through `op_resources`. The default implementation
/// returns the Rust type name, but specific resource types may override this
/// trait method.
fn name(&self) -> Cow<str> {
type_name::<Self>().into()
}
}
impl dyn Resource {
#[inline(always)]
fn is<T: Resource>(&self) -> bool {
self.type_id() == TypeId::of::<T>()
}
#[inline(always)]
fn downcast_rc<'a, T: Resource>(self: &'a Rc<Self>) -> Option<&'a Rc<T>> {
if self.is::<T>() {
let ptr = self as *const Rc<_> as *const Rc<T>;
Some(unsafe { &*ptr })
} else {
None
}
}
}
/// A `ResourceId` is an integer value referencing a resource. It could be
/// considered to be the Deno equivalent of a `file descriptor` in POSIX like
/// operating systems. Elsewhere in the code base it is commonly abbreviated
/// to `rid`.
// TODO: use `u64` instead?
pub type ResourceId = u32;
/// Temporary alias for `crate::resources2::ResourceTable`.
// TODO: remove this when the old `ResourceTable` is obsolete.
pub type ResourceTable2 = ResourceTable;
/// Map-like data structure storing Deno's resources (equivalent to file
/// descriptors).
///
/// Provides basic methods for element access. A resource can be of any type.
/// Different types of resources can be stored in the same map, and provided
/// with a name for description.
///
/// Each resource is identified through a _resource ID (rid)_, which acts as
/// the key in the map.
#[derive(Default)]
pub struct ResourceTable {
index: HashMap<ResourceId, Rc<dyn Resource>>,
next_rid: ResourceId,
}
impl ResourceTable {
/// Returns true if any resource with the given `rid` is exists.
pub fn has(&self, rid: ResourceId) -> bool {
self.index.contains_key(&rid)
}
/// Returns a reference counted pointer to the resource of type `T` with the
/// given `rid`. If `rid` is not present or has a type different than `T`,
/// this function returns `None`.
pub fn get<T: Resource>(&self, rid: ResourceId) -> Option<Rc<T>> {
self
.index
.get(&rid)
.and_then(|resource| resource.downcast_rc::<T>())
.map(Clone::clone)
}
/// Inserts resource into the resource table, which takes ownership of it.
///
/// The resource type is erased at runtime and must be statically known
/// when retrieving it through `get()`.
///
/// Returns a unique resource ID, which acts as a key for this resource.
pub fn add<T: Resource>(&mut self, resource: T) -> ResourceId {
self.add_rc(Rc::new(resource))
}
/// Inserts a `Rc`-wrapped resource into the resource table.
///
/// The resource type is erased at runtime and must be statically known
/// when retrieving it through `get()`.
///
/// Returns a unique resource ID, which acts as a key for this resource.
pub fn add_rc<T: Resource>(&mut self, resource: Rc<T>) -> ResourceId {
let resource = resource as Rc<dyn Resource>;
let rid = self.next_rid;
let removed_resource = self.index.insert(rid, resource);
assert!(removed_resource.is_none());
self.next_rid += 1;
rid
}
/// Removes the resource with the given `rid` from the resource table. If the
/// only reference to this resource existed in the resource table, this will
/// cause the resource to be dropped. However, since resources are reference
/// counted, therefore pending ops are not automatically cancelled.
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
self.index.remove(&rid).map(|_| ())
}
/// Returns an iterator that yields a `(id, name)` pair for every resource
/// that's currently in the resource table. This can be used for debugging
/// purposes or to implement the `op_resources` op. Note that the order in
/// which items appear is not specified.
///
/// # Example
///
/// ```
/// # use deno_core::resources2::ResourceTable;
/// # let resource_table = ResourceTable::default();
/// let resource_names = resource_table.names().collect::<Vec<_>>();
/// ```
pub fn names(&self) -> impl Iterator<Item = (ResourceId, Cow<str>)> {
self
.index
.iter()
.map(|(&id, resource)| (id, resource.name()))
}
}

View file

@ -19,4 +19,4 @@ idna = "0.2.0"
serde = { version = "1.0.116", features = ["derive"] }
[dev-dependencies]
futures = "0.3.5"
futures = "0.3.8"

View file

@ -11,7 +11,7 @@ publish = false
crate-type = ["cdylib"]
[dependencies]
futures = "0.3.5"
futures = "0.3.8"
deno_core = { path = "../core" }
[dev-dependencies]

View file

@ -13,7 +13,7 @@ path = "src/test_server.rs"
[dependencies]
tokio = { version = "0.2.22", features = ["full"] }
futures = "0.3.5"
futures = "0.3.8"
bytes = "0.5.6"
lazy_static = "1.4.0"
os_pipe = "0.9.2"