diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml index 7352fa5..8d7458a 100644 --- a/.github/FUNDING.yml +++ b/.github/FUNDING.yml @@ -1,3 +1,3 @@ # These are supported funding model platforms -github: [dr-orlovsky, cloudhead, rust-amplify, ubideco] +github: [ dr-orlovsky, cloudhead, rust-amplify, ubideco, indcs ] diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 71a7f5e..f7bb860 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -81,7 +81,7 @@ jobs: strategy: fail-fast: false matrix: - toolchain: [ nightly, beta, stable, 1.66.0 ] + toolchain: [ nightly, beta, stable, 1.75.0 ] steps: - uses: actions/checkout@v2 - name: Install rust ${{ matrix.toolchain }} diff --git a/Cargo.lock b/Cargo.lock index aa30dde..329af0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "amplify" -version = "4.6.0" +version = "4.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e711289a6cb28171b4f0e6c8019c69ff9476050508dc082167575d458ff74d0" +checksum = "448cf0c3afc71439b5f837aac5399a1ef2b223f5f38324dbfb4343deec3b80cc" dependencies = [ "amplify_derive", "amplify_num", @@ -16,9 +16,9 @@ dependencies = [ [[package]] name = "amplify_derive" -version = "4.0.0" +version = "4.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759dcbfaf94d838367a86d493ec34ccc8aa6fe365cb7880d6bf89006de24d9c1" +checksum = "2a6309e6b8d89b36b9f959b7a8fa093583b94922a0f6438a24fb08936de4d428" dependencies = [ "amplify_syn", "proc-macro2", @@ -28,9 +28,9 @@ dependencies = [ [[package]] name = "amplify_num" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04c009c5c4de814911b177e2ea59e4930bb918978ed3cce4900d846a6ceb0838" +checksum = "99bcb75a2982047f733547042fc3968c0f460dfcf7d90b90dea3b2744580e9ad" dependencies = [ "wasm-bindgen", ] @@ -54,15 +54,15 @@ checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" [[package]] name = "bitflags" -version = "2.6.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" [[package]] name = "bumpalo" -version = "3.16.0" +version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" [[package]] name = "cfg-if" @@ -81,35 +81,29 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.13" +version = "0.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" dependencies = [ "crossbeam-utils", ] [[package]] name = "crossbeam-utils" -version = "0.8.20" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "errno" -version = "0.3.9" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.59.0", ] -[[package]] -name = "hermit-abi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" - [[package]] name = "hermit-abi" version = "0.4.0" @@ -118,11 +112,10 @@ checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" [[package]] name = "io-reactor" -version = "0.5.2" +version = "0.6.0" dependencies = [ "amplify", "crossbeam-channel", - "libc", "log", "mio", "polling", @@ -131,105 +124,110 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "linux-raw-sys" -version = "0.4.14" +version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "log" -version = "0.4.22" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" dependencies = [ "value-bag", ] [[package]] name = "mio" -version = "1.0.0" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4929e1f84c5e54c3ec6141cd5d8b5a5c055f031f80cf78f2072920173cb4d880" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ - "hermit-abi 0.3.9", "libc", "log", "wasi", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] name = "once_cell" -version = "1.19.0" +version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "pin-project-lite" -version = "0.2.14" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "polling" -version = "3.7.2" +version = "3.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3ed00ed3fbf728b5816498ecd316d1716eecaced9c0c8d2c5a6740ca214985b" +checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi 0.4.0", + "hermit-abi", "pin-project-lite", "rustix", "tracing", - "windows-sys", + "windows-sys 0.59.0", ] [[package]] name = "popol" -version = "3.0.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93406933502e4446250941cf95d5e62851feb62a25b742acf7ffce96755c53e3" +checksum = "eaac4c1cff8bea6334f5c2646ca0f007648dc14d0471cf3e522cd34900126568" dependencies = [ "libc", ] [[package]] name = "proc-macro2" -version = "1.0.86" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.36" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" dependencies = [ "proc-macro2", ] [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ "bitflags", "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.59.0", ] +[[package]] +name = "rustversion" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" + [[package]] name = "syn" version = "1.0.109" @@ -243,9 +241,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.68" +version = "2.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" +checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" dependencies = [ "proc-macro2", "quote", @@ -254,9 +252,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", "tracing-core", @@ -264,21 +262,21 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" [[package]] name = "unicode-ident" -version = "1.0.12" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" [[package]] name = "value-bag" -version = "1.9.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101" +checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" [[package]] name = "wasi" @@ -288,34 +286,35 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.92" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" dependencies = [ "cfg-if", + "once_cell", + "rustversion", "wasm-bindgen-macro", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.92" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.100", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.92" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -323,22 +322,25 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.92" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.100", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.92" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] [[package]] name = "windows-sys" @@ -349,6 +351,15 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-targets" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index ecbe041..8e64e8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "io-reactor" -version = "0.5.2" +version = "0.6.0" description = "Concurrent I/O resource management using reactor pattern" authors = [ "Dr. Maxim Orlovsky ", @@ -10,7 +10,7 @@ keywords = ["reactor", "networking", "patterns", "concurrency", "poll"] categories = ["concurrency", "asynchronous", "network-programming", "rust-patterns"] homepage = "https://github.com/rust-amplify" repository = "https://github.com/rust-amplify/io-reactor" -rust-version = "1.66" # Due to amplify dependency +rust-version = "1.75" # Due to amplify dependency edition = "2021" license = "Apache-2.0" readme = "README.md" @@ -19,14 +19,13 @@ readme = "README.md" name = "reactor" [dependencies] -amplify = { version = "4.6.0", features = ["hex"] } -crossbeam-channel = "0.5.13" -popol = { version = "3.0.0", optional = true } -polling = { version = "3.7.2", optional = true } +amplify = { version = "4.8.0", features = ["hex"] } +crossbeam-channel = "0.5.15" +popol = { version = "3.0.1", optional = true } +polling = { version = "3.7.4", optional = true } # epoll = { version = "4.3.1", optional = true } - NB: epoll not supported on MacOS -mio = { version = "1.0.0", optional = true } -log = { version = "0.4.22", optional = true, features = ["kv_unstable"] } -libc = "0.2.155" +mio = { version = "1.0.3", optional = true } +log = { version = "0.4.27", optional = true, features = ["kv_unstable"] } [features] default = ["popol"] diff --git a/LICENSE b/LICENSE index d1b68de..d9a10c0 100644 --- a/LICENSE +++ b/LICENSE @@ -174,28 +174,3 @@ of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright 2022-2023 UBIDECO Institute, Switzerland - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/MANIFEST.yml b/MANIFEST.yml deleted file mode 100644 index 52261e7..0000000 --- a/MANIFEST.yml +++ /dev/null @@ -1,17 +0,0 @@ -Name: io-reactor -Type: Library -Kind: Free software -License: Apache-2.0 -Language: Rust -Compiler: 1.66 -Author: Maxim Orlovsky -Maintained: UBIDECO Institute, Switzerland -Maintainers: - Maxim Orlovsky: - GitHub: @dr-orlovsky - GPG: EAE730CEC0C663763F028A5860094BAF18A26EC9 - SSH: BoSGFzbyOKC7Jm28MJElFboGepihCpHop60nS8OoG/A - EMail: dr@orlovsky.ch - Alexis Sellier: - GitHub: @cloudhead - SSH: iTDjRHSIaoL8dpHbQ0mv+y0IQqPufGl2hQwk4TbXFlw diff --git a/src/lib.rs b/src/lib.rs index a730b5d..fc6e9aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,24 +2,22 @@ // // SPDX-License-Identifier: Apache-2.0 // -// Written in 2021-2023 by +// Written in 2021-2025 by // Dr. Maxim Orlovsky // Alexis Sellier // -// Copyright 2022-2023 UBIDECO Institute, Switzerland -// Copyright 2021 Alexis Sellier +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. #![deny( non_upper_case_globals, @@ -72,13 +70,14 @@ extern crate amplify; pub mod poller; -mod reactor; mod resource; mod timeouts; +pub mod resources; +pub mod runtimes; +mod reactor; +pub use reactor::Reactor; pub use resource::{ - Io, Resource, ResourceId, ResourceIdGenerator, ResourceType, WriteAtomic, WriteError, + ImpossibleListener, Io, Resource, ResourceId, ResourceIdGenerator, WriteAtomic, WriteError, }; pub use timeouts::{Timer, Timestamp}; - -pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime}; diff --git a/src/poller/epoll.rs b/src/poller/epoll.rs index c09b1dc..ba6b7ec 100644 --- a/src/poller/epoll.rs +++ b/src/poller/epoll.rs @@ -2,21 +2,19 @@ // // SPDX-License-Identifier: Apache-2.0 // -// Written in 2021-2023 by +// Written in 2021-2025 by // Dr. Maxim Orlovsky // Alexis Sellier // -// Copyright 2022-2023 UBIDECO Institute, Switzerland -// Copyright 2021 Alexis Sellier +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. diff --git a/src/poller/mio.rs b/src/poller/mio.rs index c09b1dc..ba6b7ec 100644 --- a/src/poller/mio.rs +++ b/src/poller/mio.rs @@ -2,21 +2,19 @@ // // SPDX-License-Identifier: Apache-2.0 // -// Written in 2021-2023 by +// Written in 2021-2025 by // Dr. Maxim Orlovsky // Alexis Sellier // -// Copyright 2022-2023 UBIDECO Institute, Switzerland -// Copyright 2021 Alexis Sellier +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. diff --git a/src/poller/mod.rs b/src/poller/mod.rs index ab9d577..b26ec7e 100644 --- a/src/poller/mod.rs +++ b/src/poller/mod.rs @@ -2,24 +2,22 @@ // // SPDX-License-Identifier: Apache-2.0 // -// Written in 2021-2023 by +// Written in 2021-2025 by // Dr. Maxim Orlovsky // Alexis Sellier // -// Copyright 2022-2023 UBIDECO Institute, Switzerland -// Copyright 2021 Alexis Sellier +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. //! OS and implementation-specific poll engines. @@ -44,46 +42,57 @@ pub struct IoType { } impl IoType { + #[inline] /// Indicates no I/O operations are tracked. - pub fn none() -> Self { + pub const fn none() -> Self { Self { read: false, write: false, } } + #[inline] /// Indicates interest in only read I/O events. - pub fn read_only() -> Self { + pub const fn read_only() -> Self { Self { read: true, write: false, } } + #[inline] /// Indicates interest in only write I/O events. - pub fn write_only() -> Self { + pub const fn write_only() -> Self { Self { read: false, write: true, } } + #[inline] /// Indicates interest in both read and write I/O events. - pub fn read_write() -> Self { + pub const fn read_write() -> Self { Self { read: true, write: true, } } + #[inline] /// Indicates no I/O operations has happened on a resource. - pub fn is_none(self) -> bool { !self.read && !self.write } + pub const fn is_none(self) -> bool { !self.read && !self.write } + + #[inline] /// Indicates data available to be read from a resource. - pub fn is_read_only(self) -> bool { self.read && !self.write } + pub const fn is_read_only(self) -> bool { self.read && !self.write } + + #[inline] /// Indicates that the resource is ready to accept data. - pub fn is_write_only(self) -> bool { !self.read && self.write } + pub const fn is_write_only(self) -> bool { !self.read && self.write } + + #[inline] /// Indicates that the resource can accept data - and has aa data which can be read. - pub fn is_read_write(self) -> bool { self.read && self.write } + pub const fn is_read_write(self) -> bool { self.read && self.write } } impl ops::Not for IoType { diff --git a/src/poller/polling.rs b/src/poller/polling.rs index c09b1dc..ba6b7ec 100644 --- a/src/poller/polling.rs +++ b/src/poller/polling.rs @@ -2,21 +2,19 @@ // // SPDX-License-Identifier: Apache-2.0 // -// Written in 2021-2023 by +// Written in 2021-2025 by // Dr. Maxim Orlovsky // Alexis Sellier // -// Copyright 2022-2023 UBIDECO Institute, Switzerland -// Copyright 2021 Alexis Sellier +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. diff --git a/src/poller/popol.rs b/src/poller/popol.rs index d791a06..68cd942 100644 --- a/src/poller/popol.rs +++ b/src/poller/popol.rs @@ -2,24 +2,22 @@ // // SPDX-License-Identifier: Apache-2.0 // -// Written in 2021-2023 by +// Written in 2021-2025 by // Dr. Maxim Orlovsky // Alexis Sellier // -// Copyright 2022-2023 UBIDECO Institute, Switzerland -// Copyright 2021 Alexis Sellier +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. //! Poll engine provided by the [`popol`] crate. diff --git a/src/reactor.rs b/src/reactor.rs index a7f8bb1..7772850 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -2,208 +2,41 @@ // // SPDX-License-Identifier: Apache-2.0 // -// Written in 2021-2023 by +// Written in 2021-2025 by // Dr. Maxim Orlovsky // Alexis Sellier // -// Copyright 2022-2023 UBIDECO Institute, Switzerland -// Copyright 2021 Alexis Sellier +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. -#![allow(unused_variables)] // because we need them for feature-gated logger - -use std::collections::HashMap; -use std::fmt::{Debug, Display, Formatter}; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::fd::AsRawFd; use std::thread::JoinHandle; -use std::time::Duration; use std::{io, thread}; -use crossbeam_channel as chan; - -use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend}; -use crate::resource::WriteError; -use crate::{Resource, ResourceId, ResourceType, Timer, Timestamp, WriteAtomic}; - -/// Maximum amount of time to wait for I/O. -const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); - -/// Reactor errors -#[derive(Error, Display, From)] -#[display(doc_comments)] -pub enum Error { - /// transport {0} got disconnected during poll operation. - ListenerDisconnect(ResourceId, L), - - /// transport {0} got disconnected during poll operation. - TransportDisconnect(ResourceId, T), - - /// polling multiple reactor has failed. Details: {0:?} - Poll(io::Error), -} - -impl Debug for Error { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } -} - -/// Actions which can be provided to the reactor by the [`Handler`]. -/// -/// Reactor reads actions on each event loop using [`Handler`] iterator interface. -#[derive(Display)] -pub enum Action { - /// Register a new listener resource for the reactor poll. - /// - /// Reactor can't instantiate the resource, like bind a network listener. - /// Reactor only can register already active resource for polling in the event loop. - #[display("register_listener")] - RegisterListener(L), - - /// Register a new transport resource for the reactor poll. - /// - /// Reactor can't instantiate the resource, like open a file or establish network connection. - /// Reactor only can register already active resource for polling in the event loop. - #[display("register_transport")] - RegisterTransport(T), - - /// Unregister listener resource from the reactor poll and handover it to the [`Handler`] via - /// [`Handler::handover_listener`]. - /// - /// When the resource is unregistered no action is performed, i.e. the file descriptor is not - /// closed, listener is not unbound, connections are not closed etc. All these actions must be - /// handled by the handler upon the handover event. - #[display("unregister_listener")] - UnregisterListener(ResourceId), - - /// Unregister transport resource from the reactor poll and handover it to the [`Handler`] via - /// [`Handler::handover_transport`]. - /// - /// When the resource is unregistered no action is performed, i.e. the file descriptor is not - /// closed, listener is not unbound, connections are not closed etc. All these actions must be - /// handled by the handler upon the handover event. - #[display("unregister_transport")] - UnregisterTransport(ResourceId), - - /// Write the data to one of the transport resources using [`io::Write`]. - #[display("send_to({0})")] - Send(ResourceId, Vec), - - /// Set a new timer for a given duration from this moment. - /// - /// When the timer fires reactor will timeout poll syscall and call [`Handler::handle_timer`]. - #[display("set_timer({0:?})")] - SetTimer(Duration), -} - -/// A service which handles I/O events generated in the [`Reactor`]. -pub trait Handler: Send + Iterator> { - /// Type for a listener resource. - /// - /// Listener resources are resources which may spawn more resources and can't be written to. A - /// typical example of a listener resource is a [`std::net::TcpListener`], however this may also - /// be a special form of a peripheral device or something else. - type Listener: Resource; - - /// Type for a transport resource. - /// - /// Transport is a "full" resource which can be read from - and written to. Usual files, network - /// connections, database connections etc are all fall into this category. - type Transport: Resource; - - /// A command which may be sent to the [`Handler`] from outside of the [`Reactor`], including - /// other threads. - /// - /// The handler object is owned by the reactor runtime and executes always in the context of the - /// reactor runtime thread. Thus, if other (micro)services within the app needs to communicate - /// to the handler they have to use this data type, which usually is an enumeration for a set of - /// commands supported by the handler. - /// - /// The commands are sent by using reactor [`Controller`] API. - type Command: Debug + Send; - - /// Method called by the reactor on the start of each event loop once the poll has returned. - fn tick(&mut self, time: Timestamp); - - /// Method called by the reactor when a previously set timeout is fired. - /// - /// Related: [`Action::SetTimer`]. - fn handle_timer(&mut self); - - /// Method called by the reactor upon an I/O event on a listener resource. - /// - /// Since listener doesn't support writing, it can be only a read event (indicating that a new - /// resource can be spawned from the listener). - fn handle_listener_event( - &mut self, - id: ResourceId, - event: ::Event, - time: Timestamp, - ); - - /// Method called by the reactor upon I/O event on a transport resource. - fn handle_transport_event( - &mut self, - id: ResourceId, - event: ::Event, - time: Timestamp, - ); - - /// Method called by the reactor when a given resource was successfully registered and provided - /// with a resource id. - /// - /// The resource id will be used later in [`Self::handle_listener_event`], - /// [`Self::handle_transport_event`], [`Self::handover_listener`] and [`handover_transport`] - /// calls to the handler. - fn handle_registered(&mut self, fd: RawFd, id: ResourceId, ty: ResourceType); - - /// Method called by the reactor when a [`Self::Command`] is received for the [`Handler`]. - /// - /// The commands are sent via [`Controller`] from outside of the reactor, including other - /// threads. - fn handle_command(&mut self, cmd: Self::Command); - - /// Method called by the reactor on any kind of error during the event loop, including errors of - /// the poll syscall or I/O errors returned as a part of the poll result events. - /// - /// See [`enum@Error`] for the details on errors which may happen. - fn handle_error(&mut self, err: Error); - - /// Method called by the reactor upon receiving [`Action::UnregisterListener`]. - /// - /// Passes the listener resource to the [`Handler`] when it is already not a part of the reactor - /// poll. From this point of time it is safe to send the resource to other threads (like - /// workers) or close the resource. - fn handover_listener(&mut self, id: ResourceId, listener: Self::Listener); - - /// Method called by the reactor upon receiving [`Action::UnregisterTransport`]. - /// - /// Passes the transport resource to the [`Handler`] when it is already not a part of the - /// reactor poll. From this point of time it is safe to send the resource to other threads - /// (like workers) or close the resource. - fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport); -} +use crate::poller::{Poll, Waker}; +use crate::runtimes::{Controller, ReactorHandler, ReactorRuntime}; /// High-level reactor API wrapping reactor [`Runtime`] into a thread and providing basic thread /// management for it. /// /// Apps running the [`Reactor`] can interface it and a [`Handler`] via use of the [`Controller`] /// API. -pub struct Reactor { +pub struct Reactor, P: Poll> { thread: JoinHandle<()>, - controller: Controller::Send>, + controller: Controller<::Command, ::Send>, } -impl Reactor { +impl, P: Poll> Reactor { /// Creates new reactor using provided [`Poll`] engine and a service exposing [`Handler`] API to /// the reactor. /// @@ -213,11 +46,10 @@ impl Reactor { /// # Error /// /// Errors with a system/OS error if it was impossible to spawn a thread. - pub fn new>(service: H, poller: P) -> Result + pub fn new(service: R::Handler, poller: P) -> Result where - H: 'static, P: 'static, - C: 'static + Send, + R: 'static, { Reactor::with(service, poller, thread::Builder::new()) } @@ -232,15 +64,10 @@ impl Reactor { /// # Error /// /// Errors with a system/OS error if it was impossible to spawn a thread. - pub fn named>( - service: H, - poller: P, - thread_name: String, - ) -> Result + pub fn named(service: R::Handler, poller: P, thread_name: String) -> Result where - H: 'static, P: 'static, - C: 'static + Send, + R: 'static, { Reactor::with(service, poller, thread::Builder::new().name(thread_name)) } @@ -252,27 +79,26 @@ impl Reactor { /// constructed. Both poll engine and the service are sent to the newly created reactor /// thread which runs the reactor [`Runtime`]. /// + /// # Blocking + /// + /// This call is blocking. + /// /// # Error /// /// Errors with a system/OS error if it was impossible to spawn a thread. - pub fn with>( - service: H, + pub fn with( + service: R::Handler, mut poller: P, builder: thread::Builder, ) -> Result where - H: 'static, P: 'static, - C: 'static + Send, { - let (ctl_send, ctl_recv) = chan::unbounded(); + let (ctl_send, ctl_recv) = crossbeam_channel::unbounded(); let (waker_writer, waker_reader) = P::Waker::pair()?; - let controller = Controller { - ctl_send, - waker: waker_writer, - }; + let controller = Controller::new(ctl_send, waker_writer); #[cfg(feature = "log")] log::debug!(target: "reactor-controller", "Initializing reactor thread..."); @@ -283,16 +109,7 @@ impl Reactor { log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); poller.register_waker(&waker_reader); - let runtime = Runtime { - service, - poller, - controller: runtime_controller, - ctl_recv, - listeners: empty!(), - transports: empty!(), - waker: waker_reader, - timeouts: Timer::new(), - }; + let runtime = R::with(service, poller, runtime_controller, ctl_recv, waker_reader); #[cfg(feature = "log")] log::info!(target: "reactor", "Entering reactor event loop"); @@ -306,527 +123,15 @@ impl Reactor { } /// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service - /// running inside of its thread. - /// - /// See [`Handler::Command`] for the details. - pub fn controller(&self) -> Controller::Send> { self.controller.clone() } - - /// Joins the reactor thread. - pub fn join(self) -> thread::Result<()> { self.thread.join() } -} - -enum Ctl { - Cmd(C), - Shutdown, -} - -/// Control API to the service which is run inside a reactor. -/// -/// The service is passed to the [`Reactor`] constructor as a parameter and also exposes [`Handler`] -/// API to the reactor itself for receiving reactor-generated events. This API is used by the -/// reactor to inform the service about incoming commands, sent via this [`Controller`] API (see -/// [`Handler::Command`] for the details). -pub struct Controller { - ctl_send: chan::Sender>, - waker: W, -} - -impl Clone for Controller { - fn clone(&self) -> Self { - Controller { - ctl_send: self.ctl_send.clone(), - waker: self.waker.clone(), - } - } -} - -impl Controller { - /// Send a command to the service inside a [`Reactor`] or a reactor [`Runtime`]. - #[allow(unused_mut)] // because of the `log` feature gate - pub fn cmd(&self, mut command: C) -> Result<(), io::Error> - where C: 'static { - #[cfg(feature = "log")] - { - use std::any::Any; - - let cmd = Box::new(command); - let any = cmd as Box; - let any = match any.downcast::>() { - Err(any) => { - log::debug!(target: "reactor-controller", "Sending command to the reactor"); - any - } - Ok(debug) => { - log::debug!(target: "reactor-controller", "Sending command {debug:?} to the reactor"); - debug - } - }; - command = *any.downcast().expect("from upcast"); - } - - self.ctl_send.send(Ctl::Cmd(command)).map_err(|_| io::ErrorKind::BrokenPipe)?; - self.wake()?; - Ok(()) - } - - /// Shutdown the reactor. - pub fn shutdown(self) -> Result<(), Self> { - #[cfg(feature = "log")] - log::info!(target: "reactor-controller", "Initiating reactor shutdown..."); - - let res1 = self.ctl_send.send(Ctl::Shutdown); - let res2 = self.wake(); - res1.or(res2).map_err(|_| self) - } - - fn wake(&self) -> io::Result<()> { - #[cfg(feature = "log")] - log::trace!(target: "reactor-controller", "Wakening the reactor"); - self.waker.wake() - } -} - -/// Internal [`Reactor`] runtime which is run in a dedicated thread. -/// -/// Use this structure direactly only if you'd like to have the full control over the reactor -/// thread. -/// -/// This runtime structure **does not** spawns a thread and is **blocking**. It implements the -/// actual reactor event loop. -pub struct Runtime { - service: H, - poller: P, - controller: Controller::Send>, - ctl_recv: chan::Receiver>, - listeners: HashMap, - transports: HashMap, - waker: ::Recv, - timeouts: Timer, -} - -impl Runtime { - /// Creates new reactor runtime using provided [`Poll`] engine and a service exposing - /// [`Handler`] API to the reactor. - pub fn with(service: H, mut poller: P) -> io::Result { - let (ctl_send, ctl_recv) = chan::unbounded(); - - let (waker_writer, waker_reader) = P::Waker::pair()?; - - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); - poller.register_waker(&waker_reader); - - let controller = Controller { - ctl_send, - waker: waker_writer, - }; - - Ok(Runtime { - service, - poller, - controller, - ctl_recv, - listeners: empty!(), - transports: empty!(), - waker: waker_reader, - timeouts: Timer::new(), - }) - } - - /// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service - /// running inside of its thread. + /// running inside its thread. /// /// See [`Handler::Command`] for the details. - pub fn controller(&self) -> Controller::Send> { + pub fn controller( + &self, + ) -> Controller<::Command, ::Send> { self.controller.clone() } - fn run(mut self) { - loop { - let before_poll = Timestamp::now(); - let timeout = self.timeouts.next_expiring_from(before_poll).unwrap_or(WAIT_TIMEOUT); - - for (id, res) in &self.listeners { - self.poller.set_interest(*id, res.interests()); - } - for (id, res) in &self.transports { - self.poller.set_interest(*id, res.interests()); - } - - // Blocking - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Polling with timeout {timeout:?}"); - - let res = self.poller.poll(Some(timeout)); - let now = Timestamp::now(); - self.service.tick(now); - - // Nb. The way this is currently used basically ignores which keys have - // timed out. So as long as *something* timed out, we wake the service. - let timers_fired = self.timeouts.remove_expired_by(now); - if timers_fired > 0 { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Timer has fired"); - self.service.handle_timer(); - } - - match res { - Ok(0) if timers_fired == 0 => { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Poll timeout; no I/O events had happened"); - } - Err(err) => { - #[cfg(feature = "log")] - log::error!(target: "reactor", "Error during polling: {err}"); - self.service.handle_error(Error::Poll(err)); - } - _ => {} - } - - let awoken = self.handle_events(now); - - // Process the commands only if we awaken by the waker - if awoken { - loop { - match self.ctl_recv.try_recv() { - Err(chan::TryRecvError::Empty) => break, - Err(chan::TryRecvError::Disconnected) => { - panic!("control channel is broken") - } - Ok(Ctl::Shutdown) => return self.handle_shutdown(), - Ok(Ctl::Cmd(cmd)) => self.service.handle_command(cmd), - } - } - } - - self.handle_actions(now); - } - } - - /// # Returns - /// - /// Whether it was awakened by a waker - fn handle_events(&mut self, time: Timestamp) -> bool { - let mut awoken = false; - - while let Some((id, res)) = self.poller.next() { - if id == ResourceId::WAKER { - if let Err(err) = res { - #[cfg(feature = "log")] - log::error!(target: "reactor", "Polling waker has failed: {err}"); - panic!("waker failure: {err}"); - }; - - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Awoken by the controller"); - - self.waker.reset(); - awoken = true; - } else if self.listeners.contains_key(&id) { - match res { - Ok(io) => { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Got `{io}` event from listener {id}"); - - let listener = self.listeners.get_mut(&id).expect("resource disappeared"); - for io in io { - if let Some(event) = listener.handle_io(io) { - self.service.handle_listener_event(id, event, time); - } - } - } - Err(err) => { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Listener {id} {err}"); - let listener = - self.unregister_listener(id).expect("listener has disappeared"); - self.service.handle_error(Error::ListenerDisconnect(id, listener)); - } - } - } else if self.transports.contains_key(&id) { - match res { - Ok(io) => { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Got `{io}` event from transport {id}"); - - let transport = self.transports.get_mut(&id).expect("resource disappeared"); - for io in io { - if let Some(event) = transport.handle_io(io) { - self.service.handle_transport_event(id, event, time); - } - } - } - Err(err) => { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Transport {id} {err}"); - let transport = - self.unregister_transport(id).expect("transport has disappeared"); - self.service.handle_error(Error::TransportDisconnect(id, transport)); - } - } - } else { - panic!( - "file descriptor in reactor which is not a known waker, listener or transport" - ) - } - } - - awoken - } - - fn handle_actions(&mut self, time: Timestamp) { - while let Some(action) = self.service.next() { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Handling action {action} from the service"); - - // NB: Deadlock may happen here if the service will generate events over and over - // in the handle_* calls we may never get out of this loop - if let Err(err) = self.handle_action(action, time) { - #[cfg(feature = "log")] - log::error!(target: "reactor", "Error: {err}"); - self.service.handle_error(err); - } - } - } - - /// # Safety - /// - /// Panics on `Action::Send` for read-only resources or resources which are not ready for a - /// write operation (i.e. returning `false` from [`WriteAtomic::is_ready_to_write`] - /// implementation. - fn handle_action( - &mut self, - action: Action, - time: Timestamp, - ) -> Result<(), Error> { - match action { - Action::RegisterListener(listener) => { - let fd = listener.as_raw_fd(); - - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Registering listener with fd={fd}"); - - let id = self.poller.register(&listener, IoType::read_only()); - self.listeners.insert(id, listener); - self.service.handle_registered(fd, id, ResourceType::Listener); - } - Action::RegisterTransport(transport) => { - let fd = transport.as_raw_fd(); - - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Registering transport with fd={fd}"); - - let id = self.poller.register(&transport, IoType::read_only()); - self.transports.insert(id, transport); - self.service.handle_registered(fd, id, ResourceType::Transport); - } - Action::UnregisterListener(id) => { - let Some(listener) = self.unregister_listener(id) else { - return Ok(()); - }; - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Handling over listener {id}"); - self.service.handover_listener(id, listener); - } - Action::UnregisterTransport(id) => { - let Some(transport) = self.unregister_transport(id) else { - return Ok(()); - }; - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Handling over transport {id}"); - self.service.handover_transport(id, transport); - } - Action::Send(id, data) => { - #[cfg(feature = "log")] - log::trace!(target: "reactor", "Sending {} bytes to {id}", data.len()); - - let Some(transport) = self.transports.get_mut(&id) else { - #[cfg(feature = "log")] - log::error!(target: "reactor", "Transport {id} is not in the reactor"); - - return Ok(()); - }; - match transport.write_atomic(&data) { - Err(WriteError::NotReady) => { - #[cfg(feature = "log")] - log::error!(target: "reactor", internal = true; - "An attempt to write to transport {id} before it got ready"); - panic!( - "application business logic error: write to transport {id} which is \ - read-only or not ready for a write operation" - ); - } - Err(WriteError::Io(e)) => { - #[cfg(feature = "log")] - log::error!(target: "reactor", "Fatal error writing to transport {id}, disconnecting. Error details: {e:?}"); - if let Some(transport) = self.unregister_transport(id) { - return Err(Error::TransportDisconnect(id, transport)); - } - } - Ok(_) => {} - } - } - Action::SetTimer(duration) => { - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Adding timer {duration:?} from now"); - - self.timeouts.set_timeout(duration, time); - } - } - Ok(()) - } - - fn handle_shutdown(self) { - #[cfg(feature = "log")] - log::info!(target: "reactor", "Shutdown"); - - // We just drop here? - } - - fn unregister_listener(&mut self, id: ResourceId) -> Option { - let Some(listener) = self.listeners.remove(&id) else { - #[cfg(feature = "log")] - log::warn!(target: "reactor", "Unregistering non-registered listener {id}"); - return None; - }; - - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Handling over listener {id} (fd={})", listener.as_raw_fd()); - - self.poller.unregister(id); - - Some(listener) - } - - fn unregister_transport(&mut self, id: ResourceId) -> Option { - let Some(transport) = self.transports.remove(&id) else { - #[cfg(feature = "log")] - log::warn!(target: "reactor", "Unregistering non-registered transport {id}"); - return None; - }; - - #[cfg(feature = "log")] - log::debug!(target: "reactor", "Unregistering over transport {id} (fd={})", transport.as_raw_fd()); - - self.poller.unregister(id); - - Some(transport) - } -} - -#[cfg(test)] -mod test { - use std::io::stdout; - use std::thread::sleep; - - use super::*; - use crate::{poller, Io}; - - pub struct DumbRes(Box); - impl DumbRes { - pub fn new() -> DumbRes { DumbRes(Box::new(stdout())) } - } - impl AsRawFd for DumbRes { - fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() } - } - impl io::Write for DumbRes { - fn write(&mut self, buf: &[u8]) -> io::Result { Ok(buf.len()) } - fn flush(&mut self) -> io::Result<()> { Ok(()) } - } - impl WriteAtomic for DumbRes { - fn is_ready_to_write(&self) -> bool { true } - fn empty_write_buf(&mut self) -> io::Result { Ok(true) } - fn write_or_buf(&mut self, _buf: &[u8]) -> io::Result<()> { Ok(()) } - } - impl Resource for DumbRes { - type Event = (); - fn interests(&self) -> IoType { IoType::read_write() } - fn handle_io(&mut self, _io: Io) -> Option { None } - } - - #[test] - fn timer() { - #[derive(Clone, Eq, PartialEq, Debug)] - enum Cmd { - Init, - Expect(Vec), - } - #[derive(Clone, Eq, PartialEq, Debug)] - enum Event { - Timer, - } - #[derive(Clone, Debug, Default)] - struct DumbService { - pub add_resource: bool, - pub set_timer: bool, - pub log: Vec, - } - impl Iterator for DumbService { - type Item = Action; - fn next(&mut self) -> Option { - if self.add_resource { - self.add_resource = false; - Some(Action::RegisterTransport(DumbRes::new())) - } else if self.set_timer { - self.set_timer = false; - Some(Action::SetTimer(Duration::from_millis(3))) - } else { - None - } - } - } - impl Handler for DumbService { - type Listener = DumbRes; - type Transport = DumbRes; - type Command = Cmd; - - fn tick(&mut self, _time: Timestamp) {} - fn handle_timer(&mut self) { - self.log.push(Event::Timer); - self.set_timer = true; - } - fn handle_listener_event( - &mut self, - _d: ResourceId, - _event: ::Event, - _time: Timestamp, - ) { - unreachable!() - } - fn handle_transport_event( - &mut self, - _id: ResourceId, - _event: ::Event, - _time: Timestamp, - ) { - unreachable!() - } - fn handle_registered(&mut self, _fd: RawFd, _id: ResourceId, _ty: ResourceType) {} - fn handle_command(&mut self, cmd: Self::Command) { - match cmd { - Cmd::Init => { - self.add_resource = true; - self.set_timer = true; - } - Cmd::Expect(expected) => { - assert_eq!(expected, self.log); - } - } - } - fn handle_error(&mut self, err: Error) { - panic!("{err}") - } - fn handover_listener(&mut self, _id: ResourceId, _listener: Self::Listener) { - unreachable!() - } - fn handover_transport(&mut self, _id: ResourceId, _transport: Self::Transport) { - unreachable!() - } - } - - let reactor = Reactor::new(DumbService::default(), poller::popol::Poller::new()).unwrap(); - reactor.controller().cmd(Cmd::Init).unwrap(); - sleep(Duration::from_secs(2)); - reactor.controller().cmd(Cmd::Expect(vec![Event::Timer; 6])).unwrap(); - } + /// Joins the reactor thread. + pub fn join(self) -> thread::Result<()> { self.thread.join() } } diff --git a/src/resource.rs b/src/resource.rs index c83a75c..c30d588 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -2,25 +2,24 @@ // // SPDX-License-Identifier: Apache-2.0 // -// Written in 2021-2023 by +// Written in 2021-2025 by // Dr. Maxim Orlovsky // Alexis Sellier // -// Copyright 2022-2023 UBIDECO Institute, Switzerland -// Copyright 2021 Alexis Sellier +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. +use std::convert::Infallible; use std::fmt::Debug; use std::hash::Hash; use std::io::{self, ErrorKind}; @@ -38,15 +37,6 @@ pub enum Io { Write, } -/// Type of the resource. -#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] -pub enum ResourceType { - /// Listener resource. - Listener, - /// Transport resource. - Transport, -} - /// Generator for the new [`ResourceId`]s which should be used by pollers implementing [`Poll`] /// trait. #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)] @@ -99,7 +89,7 @@ pub enum WriteError { /// a connection has not yet established in full or handshake is not /// complete. A specific case in which this error is returned is defined /// by an underlying resource type; however, this error happens only - /// due to a business logic bugs in a [`crate::reactor::Handler`] + /// due to a business logic bugs in a [`crate::controller::Handler`] /// implementation. #[display("resource not ready to accept the data")] NotReady, @@ -165,3 +155,40 @@ pub trait WriteAtomic: io::Write { /// panic. fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()>; } + +/// Listener which can't be instantiated. Used for reactors which do not have the concept of a +/// listener (for instance file-based). +pub enum ImpossibleListener {} + +impl AsRawFd for ImpossibleListener { + fn as_raw_fd(&self) -> i32 { unreachable!("ImpossibleListener is not a valid resource") } +} + +impl io::Write for ImpossibleListener { + fn write(&mut self, _: &[u8]) -> io::Result { + unreachable!("ImpossibleListener is not a valid resource") + } + fn flush(&mut self) -> io::Result<()> { + unreachable!("ImpossibleListener is not a valid resource") + } +} + +impl WriteAtomic for ImpossibleListener { + fn is_ready_to_write(&self) -> bool { + unreachable!("ImpossibleListener is not a valid resource") + } + fn empty_write_buf(&mut self) -> io::Result { + unreachable!("ImpossibleListener is not a valid resource") + } + fn write_or_buf(&mut self, _: &[u8]) -> io::Result<()> { + unreachable!("ImpossibleListener is not a valid resource") + } +} + +impl Resource for ImpossibleListener { + type Event = Infallible; + fn interests(&self) -> IoType { unreachable!("ImpossibleListener is not a valid resource") } + fn handle_io(&mut self, _: Io) -> Option { + unreachable!("ImpossibleListener is not a valid resource") + } +} diff --git a/src/resources/file.rs b/src/resources/file.rs new file mode 100644 index 0000000..65c1e56 --- /dev/null +++ b/src/resources/file.rs @@ -0,0 +1,265 @@ +// Library for concurrent I/O resource management using reactor pattern. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Written in 2021-2025 by +// Dr. Maxim Orlovsky +// Alexis Sellier +// +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. + +//! Implementation of a [`FileResource`] resource which can be used inside the I/O reactor. + +use std::collections::VecDeque; +use std::fs::File; +use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::os::fd::{AsRawFd, RawFd}; + +use crate::poller::IoType; +use crate::{Io, Resource, WriteAtomic, WriteError}; + +#[allow(dead_code)] +const NAME: &str = "file"; + +/// I/O events generated by [`FileResource`]. +pub enum FileEvent { + /// New data read from file. + Read(Vec), + /// End of file reached. + Eof, + /// File has been closed due to an error. + Closed(io::Error), +} + +/// File resource usable in a reactor. +#[derive(Debug)] +pub struct FileResource { + file: File, + writable: bool, + write_intent: bool, + write_buffer: VecDeque, + read_buffer: Vec, +} + +impl AsRawFd for FileResource { + fn as_raw_fd(&self) -> RawFd { self.file.as_raw_fd() } +} + +impl Resource for FileResource { + type Event = FileEvent; + + fn interests(&self) -> IoType { + if self.writable && self.write_intent { + IoType::read_write() + } else { + IoType::read_only() + } + } + + fn handle_io(&mut self, io: Io) -> Option { + match io { + Io::Read => self.handle_readable(), + Io::Write => self.handle_writeable(), + } + } +} + +impl WriteAtomic for FileResource { + fn is_ready_to_write(&self) -> bool { self.writable } + + fn empty_write_buf(&mut self) -> io::Result { + let len = self.file.write(self.write_buffer.make_contiguous())?; + self.write_buffer.drain(..len); + if self.write_buf_len() > 0 { + return Err(io::ErrorKind::WouldBlock.into()); + } + Ok(len > 0) + } + + fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()> { + if buf.is_empty() { + // Write empty data is a non-op + return Ok(()); + } + self.write_buffer.extend(buf); + self.flush_buffer() + } +} + +impl Write for FileResource { + fn write(&mut self, buf: &[u8]) -> io::Result { + match self.write_atomic(buf) { + Ok(_) => Ok(buf.len()), + Err(WriteError::NotReady) => Err(io::ErrorKind::NotConnected.into()), + Err(WriteError::Io(err)) => Err(err), + } + } + + fn flush(&mut self) -> io::Result<()> { self.flush_buffer() } +} + +// TODO: This is blocking, we need to handle it in some other way +impl Seek for FileResource { + fn seek(&mut self, pos: SeekFrom) -> io::Result { self.file.seek(pos) } +} + +/// Configuration for constructing [`FileResource`]. +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] +pub struct FileResourceConfig { + /// Is file opened with a write access. + pub writable: bool, + /// Size for the write buffer. + pub write_buf_len: usize, + /// Size for the read buffer. + pub read_buf_len: usize, +} + +impl FileResourceConfig { + /// Configuration for read-only file resource. + pub const fn read_only() -> Self { + Self { + writable: false, + write_buf_len: 1024, + read_buf_len: 1024, + } + } + + /// Configuration for read-write file resource. + pub const fn read_write() -> Self { + let mut me = Self::read_only(); + me.writable = true; + me + } + + /// Adjusted configuration changing read buffer length. + pub const fn set_read_buf_len(mut self, len: usize) -> Self { + self.read_buf_len = len; + self + } + + /// Adjusted configuration changing write buffer length. + pub const fn set_write_buf_len(mut self, len: usize) -> Self { + debug_assert!( + self.writable, + "adjusting size of a write buffer for read-only file doesn't make sense" + ); + self.write_buf_len = len; + self + } +} + +impl FileResource { + /// Constructs file resource from a provided file and given configuration + pub fn new(file: File, conf: FileResourceConfig) -> Self { + Self { + file, + writable: conf.writable, + write_intent: false, + write_buffer: VecDeque::with_capacity(conf.write_buf_len), + read_buffer: Vec::with_capacity(conf.read_buf_len), + } + } + + /// Provides human-readable string describing the file handle. + pub fn display(&self) -> String { format!("fd={}", self.file.as_raw_fd()) } + + fn handle_readable(&mut self) -> Option { + match self.file.read(&mut self.read_buffer) { + Ok(0) => Some(FileEvent::Eof), + Ok(len) => Some(FileEvent::Read(self.read_buffer[..len].to_vec())), + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + // This shouldn't normally happen, since this function is only called + // when there's data on the socket. We leave it here in case external + // conditions change. + #[cfg(feature = "log")] + log::warn!(target: NAME, + "WOULD_BLOCK on resource which had read intent - probably normal thing to happen" + ); + None + } + Err(err) => Some(self.terminate(err)), + } + } + + fn handle_writeable(&mut self) -> Option { + match self.flush() { + Ok(_) => None, + // In this case, write couldn't complete. Leave `needs_flush` set to be notified when + // the socket is ready to write again. + Err(err) + if [ + io::ErrorKind::WouldBlock, + io::ErrorKind::WriteZero, + io::ErrorKind::OutOfMemory, + io::ErrorKind::Interrupted, + ] + .contains(&err.kind()) => + { + #[cfg(feature = "log")] + log::warn!(target: NAME, "Resource {} was not able to consume any data even though it has announced its write readiness", self.display()); + self.write_intent = true; + None + } + Err(err) => Some(self.terminate(err)), + } + } + + /// Returns the length of the read buffer. + pub fn read_buf_len(&self) -> usize { self.read_buffer.len() } + + /// Returns the length of the write buffer. + pub fn write_buf_len(&self) -> usize { self.write_buffer.len() } + + fn flush_buffer(&mut self) -> io::Result<()> { + let orig_len = self.write_buffer.len(); + #[cfg(feature = "log")] + log::trace!(target: NAME, "Resource {} is flushing its buffer of {orig_len} bytes", self.display()); + let len = + self.file.write(self.write_buffer.make_contiguous()).or_else(|err| { + match err.kind() { + io::ErrorKind::WouldBlock + | io::ErrorKind::OutOfMemory + | io::ErrorKind::WriteZero + | io::ErrorKind::Interrupted => { + #[cfg(feature = "log")] + log::warn!(target: NAME, "Resource {} kernel buffer is fulled (system message is '{err}')", self.display()); + Ok(0) + }, + _ => { + #[cfg(feature = "log")] + log::error!(target: NAME, "Resource {} failed write operation with message '{err}'", self.display()); + Err(err) + }, + } + })?; + if orig_len > len { + #[cfg(feature = "log")] + log::debug!(target: NAME, "Resource {} was able to consume only a part of the buffered data ({len} of {orig_len} bytes)", self.display()); + self.write_intent = true; + } else { + #[cfg(feature = "log")] + log::trace!(target: NAME, "Resource {} was able to consume all the buffered data ({len} of {orig_len} bytes)", self.display()); + self.write_intent = false; + } + self.write_buffer.drain(..len); + Ok(()) + } + + fn terminate(&mut self, reason: io::Error) -> FileEvent { + #[cfg(feature = "log")] + log::trace!(target: NAME, "Terminating session {} due to {reason:?}", self.display()); + + FileEvent::Closed(reason) + } +} diff --git a/src/resources/mod.rs b/src/resources/mod.rs new file mode 100644 index 0000000..151fb8e --- /dev/null +++ b/src/resources/mod.rs @@ -0,0 +1,26 @@ +// Library for concurrent I/O resource management using reactor pattern. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Written in 2021-2025 by +// Dr. Maxim Orlovsky +// Alexis Sellier +// +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. + +//! Reactor resources. + +mod file; + +pub use file::{FileEvent, FileResource, FileResourceConfig}; diff --git a/src/runtimes/controller.rs b/src/runtimes/controller.rs new file mode 100644 index 0000000..cec0003 --- /dev/null +++ b/src/runtimes/controller.rs @@ -0,0 +1,102 @@ +// Library for concurrent I/O resource management using reactor pattern. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Written in 2021-2025 by +// Dr. Maxim Orlovsky +// Alexis Sellier +// +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. + +#![allow(unused_variables)] // because we need them for feature-gated logger + +use std::io; + +use crossbeam_channel as chan; + +use crate::poller::WakerSend; + +pub enum Ctl { + Cmd(C), + Shutdown, +} + +/// Control API to the service which is run inside a reactor. +/// +/// The service is passed to the [`Reactor`] constructor as a parameter and also exposes [`Handler`] +/// API to the reactor itself for receiving reactor-generated events. This API is used by the +/// reactor to inform the service about incoming commands, sent via this [`Controller`] API (see +/// [`Handler::Command`] for the details). +pub struct Controller { + ctl_send: chan::Sender>, + waker: W, +} + +impl Clone for Controller { + fn clone(&self) -> Self { + Controller { + ctl_send: self.ctl_send.clone(), + waker: self.waker.clone(), + } + } +} + +impl Controller { + pub(crate) fn new(ctl_send: chan::Sender>, waker: W) -> Self { Self { ctl_send, waker } } + + /// Send a command to the service inside a [`Reactor`] or a reactor [`Runtime`]. + #[allow(unused_mut)] // because of the `log` feature gate + pub fn cmd(&self, mut command: C) -> Result<(), io::Error> + where C: 'static { + #[cfg(feature = "log")] + { + use std::any::Any; + use std::fmt::Debug; + + let cmd = Box::new(command); + let any = cmd as Box; + let any = match any.downcast::>() { + Err(any) => { + log::debug!(target: "reactor-controller", "Sending command to the reactor"); + any + } + Ok(debug) => { + log::debug!(target: "reactor-controller", "Sending command {debug:?} to the reactor"); + debug + } + }; + command = *any.downcast().expect("from upcast"); + } + + self.ctl_send.send(Ctl::Cmd(command)).map_err(|_| io::ErrorKind::BrokenPipe)?; + self.wake()?; + Ok(()) + } + + /// Shutdown the reactor. + pub fn shutdown(self) -> Result<(), Self> { + #[cfg(feature = "log")] + log::info!(target: "reactor-controller", "Initiating reactor shutdown..."); + + let res1 = self.ctl_send.send(Ctl::Shutdown); + let res2 = self.wake(); + res1.or(res2).map_err(|_| self) + } + + pub(crate) fn wake(&self) -> io::Result<()> { + #[cfg(feature = "log")] + log::trace!(target: "reactor-controller", "Wakening the reactor"); + self.waker.wake() + } +} diff --git a/src/runtimes/file.rs b/src/runtimes/file.rs new file mode 100644 index 0000000..3e0ff10 --- /dev/null +++ b/src/runtimes/file.rs @@ -0,0 +1,303 @@ +// Library for concurrent I/O resource management using reactor pattern. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Written in 2021-2025 by +// Dr. Maxim Orlovsky +// Alexis Sellier +// +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. + +//! File reactor runtime. + +#![allow(unused_variables)] // because we need them for feature-gated logger + +use std::collections::HashMap; +use std::fmt::Debug; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::time::Duration; + +use crossbeam_channel::{Receiver, TryRecvError}; + +use super::{Controller, Error as ReactorError, ReactorHandler, ReactorRuntime}; +use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend}; +use crate::resource::WriteError; +use crate::resources::{FileEvent, FileResource}; +use crate::runtimes::controller::Ctl; +use crate::{Resource, ResourceId, Timer, Timestamp, WriteAtomic}; + +/// Reactor errors +#[derive(Debug, Error, Display, From)] +#[display(doc_comments)] +pub enum Error { + /// file {0} got closed during poll operation. + Closed(ResourceId, FileResource), +} + +/// Actions which can be provided to the reactor by the [`Handler`]. +/// +/// Reactor reads actions on each event loop using [`Handler`] iterator interface. +#[derive(Display)] +pub enum Action { + /// Register a new file resource for the reactor poll. + /// + /// NB: Reactor can't open the file. Reactor only can register already opened file as a + /// resource for polling in the event loop. + #[display("register_file")] + RegisterFile(FileResource), + + /// Unregister file resource from the reactor poll and handover it to the [`Handler`] via + /// [`Handler::handover_file`]. + /// + /// When the file is unregistered no action is performed, i.e. the file descriptor is not + /// closed. All these actions must be handled by the handler upon the handover event. + #[display("unregister_transport")] + UnregisterFile(ResourceId), + + /// Write the data to one of the file resources using [`io::Write`]. + #[display("send_to({0})")] + Write(ResourceId, Vec), + + // TODO: Add read, seek and close actions. + /// Set a new timer for a given duration from this moment. + /// + /// When the timer fires reactor will timeout poll syscall and call [`Handler::handle_timer`]. + #[display("set_timer({0:?})")] + SetTimer(Duration), + + /// Graceffully terminate the reactor. + #[display("terminate")] + Terminate, +} + +/// A service which handles I/O events generated in the [`Reactor`]. +pub trait Handler: ReactorHandler { + /// Method called by the reactor when a given file was successfully registered and provided + /// with a resource id. + /// + /// The resource id will be used later in [`Self::handle_event`] and [`handover_file`] + /// calls to the handler. + fn handle_registered(&mut self, fd: RawFd, id: ResourceId); + + /// Method called by the reactor upon I/O event on a file resource. + fn handle_event(&mut self, fd: RawFd, id: ResourceId, event: FileEvent, time: Timestamp); + + /// Method called by the reactor upon receiving [`Action::UnregisterFile`]. + /// + /// Passes the transport resource to the [`Handler`] when it is already not a part of the + /// reactor poll. From this point of time it is safe to send the resource to other threads + /// (like workers) or close the resource. + fn handover_file(&mut self, id: ResourceId, file: FileResource); +} + +/// Internal [`Reactor`] runtime which is run in a dedicated thread. +/// +/// Use this structure direactly only if you'd like to have the full control over the reactor +/// thread. +/// +/// This runtime structure **does not** spawns a thread and is **blocking**. It implements the +/// actual reactor event loop. +pub struct Runtime { + service: H, + poller: P, + controller: Controller::Send>, + ctl_recv: Receiver>, + files: HashMap, + waker: ::Recv, + timeouts: Timer, +} + +impl Runtime { + fn unregister_file(&mut self, id: ResourceId) -> Option { + let Some(file) = self.files.remove(&id) else { + #[cfg(feature = "log")] + log::warn!(target: "reactor", "Unregistering non-registered file {id}"); + return None; + }; + + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Unregistering over file {id} (fd={})", file.as_raw_fd()); + + self.poller.unregister(id); + + Some(file) + } +} + +impl ReactorRuntime

for Runtime { + const WAIT_TIMEOUT: Duration = Duration::from_secs(1); + + type Handler = H; + + fn with( + service: Self::Handler, + poller: P, + controller: Controller< + ::Command, + ::Send, + >, + ctl_recv: Receiver::Command>>, + waker_recv: ::Recv, + ) -> Self { + Runtime { + service, + poller, + controller, + ctl_recv, + files: empty!(), + waker: waker_recv, + timeouts: Timer::new(), + } + } + + fn timeouts(&mut self) -> &mut Timer { &mut self.timeouts } + fn poller(&mut self) -> &mut P { &mut self.poller } + fn service(&mut self) -> &mut Self::Handler { &mut self.service } + fn resource_interests(&self) -> impl Iterator { + self.files.iter().map(move |(id, handler)| (*id, handler.interests())) + } + + fn ctl_recv(&self) -> Result::Command>, TryRecvError> { + self.ctl_recv.try_recv() + } + + fn controller(&self) -> Controller<::Command, impl WakerSend> { + self.controller.clone() + } + + /// # Returns + /// + /// Whether it was awakened by a waker + fn handle_events(&mut self, time: Timestamp) -> bool { + let mut awoken = false; + + while let Some((id, res)) = self.poller.next() { + if id == ResourceId::WAKER { + if let Err(err) = res { + #[cfg(feature = "log")] + log::error!(target: "reactor", "Polling waker has failed: {err}"); + panic!("waker failure: {err}"); + }; + + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Awoken by the controller"); + + self.waker.reset(); + awoken = true; + } else if self.files.contains_key(&id) { + match res { + Ok(io) => { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Got `{io}` event from transport {id}"); + + let transport = self.files.get_mut(&id).expect("resource disappeared"); + for io in io { + if let Some(event) = transport.handle_io(io) { + let fd = transport.as_raw_fd(); + self.service.handle_event(fd, id, event, time); + } + } + } + Err(err) => { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Transport {id} {err}"); + let transport = + self.unregister_file(id).expect("transport has disappeared"); + self.service + .handle_error(ReactorError::Handler(Error::Closed(id, transport))); + } + } + } else { + panic!( + "file descriptor in reactor which is not a known waker, listener or transport" + ) + } + } + + awoken + } + + /// # Safety + /// + /// Panics on `Action::Send` for read-only resources or resources which are not ready for a + /// write operation (i.e. returning `false` from [`WriteAtomic::is_ready_to_write`] + /// implementation. + fn handle_action(&mut self, action: Action, time: Timestamp) -> Result { + match action { + Action::RegisterFile(file) => { + let fd = file.as_raw_fd(); + + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Registering file with fd={fd}"); + + let id = self.poller.register(&file, IoType::read_only()); + self.files.insert(id, file); + self.service.handle_registered(fd, id); + } + Action::UnregisterFile(id) => { + let Some(transport) = self.unregister_file(id) else { + return Ok(true); + }; + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Handling over file {id}"); + self.service.handover_file(id, transport); + } + Action::Write(id, data) => { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Sending {} bytes to {id}", data.len()); + + let Some(file) = self.files.get_mut(&id) else { + #[cfg(feature = "log")] + log::error!(target: "reactor", "Transport {id} is not in the reactor"); + + return Ok(true); + }; + match file.write_atomic(&data) { + Err(WriteError::NotReady) => { + #[cfg(feature = "log")] + log::error!(target: "reactor", internal = true; + "An attempt to write to transport {id} before it got ready"); + panic!( + "application business logic error: write to transport {id} which is \ + read-only or not ready for a write operation" + ); + } + Err(WriteError::Io(e)) => { + #[cfg(feature = "log")] + log::error!(target: "reactor", "Fatal error writing to transport {id}, disconnecting. Error details: {e:?}"); + if let Some(transport) = self.unregister_file(id) { + return Err(Error::Closed(id, transport)); + } + } + Ok(_) => {} + } + } + Action::SetTimer(duration) => { + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Adding timer {duration:?} from now"); + + self.timeouts.set_timeout(duration, time); + } + Action::Terminate => return Ok(false), + } + Ok(true) + } + + fn handle_shutdown(self) { + #[cfg(feature = "log")] + log::info!(target: "reactor", "Shutdown"); + + // We just drop here? + } +} diff --git a/src/runtimes/mod.rs b/src/runtimes/mod.rs new file mode 100644 index 0000000..ebfb530 --- /dev/null +++ b/src/runtimes/mod.rs @@ -0,0 +1,239 @@ +// Library for concurrent I/O resource management using reactor pattern. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Written in 2021-2025 by +// Dr. Maxim Orlovsky +// Alexis Sellier +// +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. + +//! Reactor runtimes. + +mod controller; +pub mod net; +pub mod file; + +use std::error::Error as StdError; +use std::fmt::{Debug, Display}; +use std::io; +use std::time::Duration; + +pub use controller::Controller; +use crossbeam_channel::Receiver; + +use crate::poller::{IoType, Poll, Waker, WakerSend}; +use crate::runtimes::controller::Ctl; +use crate::{ResourceId, Timer, Timestamp}; + +/// Reactor errors +#[derive(Debug, Error, Display)] +#[display(doc_comments)] +pub enum Error { + /// file {0} got closed during poll operation. + Handler(E), + + /// polling multiple resources has failed. Details: {0:?} + Poll(io::Error), +} + +/// Handler for a service run by a reactor. +pub trait ReactorHandler: Send + Iterator { + /// A command which may be sent to the [`Handler`] from outside the [`Reactor`], including + /// other threads. + /// + /// The handler object is owned by the reactor runtime and executes always in the context of the + /// reactor runtime thread. Thus, if other (micro)services within the app needs to communicate + /// to the handler they have to use this data type, which usually is an enumeration for a set of + /// commands supported by the handler. + /// + /// The commands are sent by using reactor [`Controller`] API. + type Command: Debug + Send; + + /// I/O action generated within the service handler. + type Action: Display; + + /// Error type returned by the service handler. + type Error: StdError; + + /// Method called by the reactor on any kind of error during the event loop, including errors of + /// the poll syscall or I/O errors returned as a part of the poll result events. + /// + /// See [`enum@Error`] for the details on errors which may happen. + fn handle_error(&mut self, err: Error); + + /// Method called by the reactor on the start of each event loop once the poll has returned. + fn tick(&mut self, time: Timestamp); + + /// Method called by the reactor when a previously set timeout is fired. + /// + /// Related: [`Action::SetTimer`]. + fn handle_timer(&mut self); + + /// Method called by the reactor when a [`Self::Command`] is received for the [`Handler`]. + /// + /// The commands are sent via [`Controller`] from outside the reactor, including other + /// threads. + fn handle_command(&mut self, cmd: Self::Command); +} + +/// Trait for specific reactor runtime implementations. +/// +/// Reactor runtime runs inside the reactor thread and manages resources inside the reactor. +pub trait ReactorRuntime: Sized { + /// Maximum amount of time to wait for I/O. + const WAIT_TIMEOUT: Duration; + + /// Handler for a service run by a reactor. + type Handler: ReactorHandler + 'static; + + /// Creates new reactor runtime using provided [`Poll`] engine and a service exposing + /// [`ReactorHandler`] API to the reactor. + fn with( + service: Self::Handler, + poller: P, + controller: Controller< + ::Command, + ::Send, + >, + ctl_recv: Receiver::Command>>, + waker_recv: ::Recv, + ) -> Self; + + /// Accessor for timer. + fn timeouts(&mut self) -> &mut Timer; + /// Accessor for poller. + fn poller(&mut self) -> &mut P; + /// Accessor for service. + fn service(&mut self) -> &mut Self::Handler; + /// Iterator over resource interests. + fn resource_interests(&self) -> impl Iterator; + /// Reads control command send to the service from outside the reactor. + fn ctl_recv( + &self, + ) -> Result::Command>, crossbeam_channel::TryRecvError>; + + /// Executes reactor even loop. + fn run(mut self) { + loop { + let before_poll = Timestamp::now(); + let timeout = + self.timeouts().next_expiring_from(before_poll).unwrap_or(Self::WAIT_TIMEOUT); + + let resources = self.resource_interests().collect::>(); + for (id, io) in resources { + self.poller().set_interest(id, io); + } + + // Blocking + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Polling with timeout {timeout:?}"); + + let res = self.poller().poll(Some(timeout)); + let now = Timestamp::now(); + self.service().tick(now); + + // Nb. The way this is currently used basically ignores which keys have + // timed out. So as long as *something* timed out, we wake the service. + let timers_fired = self.timeouts().remove_expired_by(now); + if timers_fired > 0 { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Timer has fired"); + self.service().handle_timer(); + } + + match res { + Ok(0) if timers_fired == 0 => { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Poll timeout; no I/O events had happened"); + } + Err(err) => { + #[cfg(feature = "log")] + log::error!(target: "reactor", "Error during polling: {err}"); + self.service().handle_error(Error::Poll(err)); + } + _ => {} + } + + let awoken = self.handle_events(now); + + // Process the commands only if we awaken by the waker + if awoken { + loop { + match self.ctl_recv() { + Err(crossbeam_channel::TryRecvError::Empty) => break, + Err(crossbeam_channel::TryRecvError::Disconnected) => { + panic!("control channel is broken") + } + Ok(Ctl::Shutdown) => return self.handle_shutdown(), + Ok(Ctl::Cmd(cmd)) => self.service().handle_command(cmd), + } + } + } + + if !self.handle_actions(now) { + break; + }; + } + } + + /// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service + /// running inside its thread. + fn controller(&self) -> Controller<::Command, impl WakerSend>; + + /// Handles the actions from the queue. + /// + /// # Return + /// + /// Return value indicates whether the reactor must proceed operating (`true`) or should + /// terminate (`false`). + fn handle_actions(&mut self, time: Timestamp) -> bool { + let mut result = true; + while let Some(action) = self.service().next() { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Handling action {action} from the service"); + + // NB: Deadlock may happen here if the service will generate events over and over + // in the handle_* calls we may never get out of this loop + match self.handle_action(action, time) { + Ok(ret) => result |= ret, + Err(err) => { + #[cfg(feature = "log")] + log::error!(target: "reactor", "Error: {err}"); + self.service().handle_error(Error::Handler(err)); + } + } + } + result + } + + /// # Panics + /// + /// Panics on `Action::Send` for read-only resources or resources which are not ready for a + /// write operation (i.e. returning `false` from [`WriteAtomic::is_ready_to_write`] + /// implementation. + fn handle_action( + &mut self, + action: ::Action, + time: Timestamp, + ) -> Result::Error>; + + /// # Returns + /// + /// Whether it was awakened by a waker. + fn handle_events(&mut self, time: Timestamp) -> bool; + + /// Handler fo a shutdown action. + fn handle_shutdown(self); +} diff --git a/src/runtimes/net.rs b/src/runtimes/net.rs new file mode 100644 index 0000000..41c5329 --- /dev/null +++ b/src/runtimes/net.rs @@ -0,0 +1,569 @@ +// Library for concurrent I/O resource management using reactor pattern. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Written in 2021-2025 by +// Dr. Maxim Orlovsky +// Alexis Sellier +// +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. + +//! Networking reactor runtime. + +#![allow(unused_variables)] // because we need them for feature-gated logger + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::os::fd::{AsRawFd, RawFd}; +use std::time::Duration; + +use crossbeam_channel::{Receiver, TryRecvError}; + +use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend}; +use crate::resource::WriteError; +use crate::runtimes::controller::Ctl; +use crate::runtimes::{Controller, Error as RuntimeError, ReactorHandler, ReactorRuntime}; +use crate::{Resource, ResourceId, Timer, Timestamp, WriteAtomic}; + +/// Type of the resource. +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub enum ResourceType { + /// Listener resource. + Listener, + /// Transport resource. + Transport, +} + +/// Reactor errors +#[derive(Error, Display, From)] +#[display(doc_comments)] +pub enum Error { + /// transport {0} got disconnected during poll operation. + ListenerDisconnect(ResourceId, L), + + /// transport {0} got disconnected during poll operation. + TransportDisconnect(ResourceId, T), +} + +impl Debug for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) } +} + +/// Actions which can be provided to the reactor by the [`Handler`]. +/// +/// Reactor reads actions on each event loop using [`Handler`] iterator interface. +#[derive(Display)] +pub enum Action { + /// Register a new listener resource for the reactor poll. + /// + /// Reactor can't instantiate the resource, like bind a network listener. + /// Reactor only can register already active resource for polling in the event loop. + #[display("register_listener")] + RegisterListener(L), + + /// Register a new transport resource for the reactor poll. + /// + /// Reactor can't instantiate the resource, like open a file or establish network connection. + /// Reactor only can register already active resource for polling in the event loop. + #[display("register_transport")] + RegisterTransport(T), + + /// Unregister listener resource from the reactor poll and handover it to the [`Handler`] via + /// [`Handler::handover_listener`]. + /// + /// When the resource is unregistered no action is performed, i.e. the file descriptor is not + /// closed, listener is not unbound, connections are not closed etc. All these actions must be + /// handled by the handler upon the handover event. + #[display("unregister_listener")] + UnregisterListener(ResourceId), + + /// Unregister transport resource from the reactor poll and handover it to the [`Handler`] via + /// [`Handler::handover_transport`]. + /// + /// When the resource is unregistered no action is performed, i.e. the file descriptor is not + /// closed, listener is not unbound, connections are not closed etc. All these actions must be + /// handled by the handler upon the handover event. + #[display("unregister_transport")] + UnregisterTransport(ResourceId), + + /// Write the data to one of the transport resources using [`io::Write`]. + #[display("send_to({0})")] + Send(ResourceId, Vec), + + /// Set a new timer for a given duration from this moment. + /// + /// When the timer fires reactor will timeout poll syscall and call [`Handler::handle_timer`]. + #[display("set_timer({0:?})")] + SetTimer(Duration), + + /// Graceffully terminate the reactor. + #[display("terminate")] + Terminate, +} + +/// A service which handles I/O events generated in the [`Reactor`]. +pub trait Handler: + ReactorHandler< + Action = Action, + Error = Error, +> +{ + /// Type for a listener resource. + /// + /// Listener resources are resources which may spawn more resources and can't be written to. A + /// typical example of a listener resource is a [`std::net::TcpListener`], however this may also + /// be a special form of a peripheral device or something else. + type Listener: Resource; + + /// Type for a transport resource. + /// + /// Transport is a "full" resource which can be read from - and written to. Usual files, network + /// connections, database connections etc are all fall into this category. + type Transport: Resource; + + /// Method called by the reactor upon an I/O event on a listener resource. + /// + /// Since listener doesn't support writing, it can be only a read event (indicating that a new + /// resource can be spawned from the listener). + fn handle_listener_event( + &mut self, + fd: RawFd, + id: ResourceId, + event: ::Event, + time: Timestamp, + ); + + /// Method called by the reactor upon I/O event on a transport resource. + fn handle_transport_event( + &mut self, + fd: RawFd, + id: ResourceId, + event: ::Event, + time: Timestamp, + ); + + /// Method called by the reactor when a given resource was successfully registered and provided + /// with a resource id. + /// + /// The resource id will be used later in [`Self::handle_listener_event`], + /// [`Self::handle_transport_event`], [`Self::handover_listener`] and [`handover_transport`] + /// calls to the handler. + fn handle_registered(&mut self, fd: RawFd, id: ResourceId, ty: ResourceType); + + /// Method called by the reactor upon receiving [`Action::UnregisterListener`]. + /// + /// Passes the listener resource to the [`Handler`] when it is already not a part of the reactor + /// poll. From this point of time it is safe to send the resource to other threads (like + /// workers) or close the resource. + fn handover_listener(&mut self, id: ResourceId, listener: Self::Listener); + + /// Method called by the reactor upon receiving [`Action::UnregisterTransport`]. + /// + /// Passes the transport resource to the [`Handler`] when it is already not a part of the + /// reactor poll. From this point of time it is safe to send the resource to other threads + /// (like workers) or close the resource. + fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport); +} + +/// Internal [`Reactor`] runtime which is run in a dedicated thread. +/// +/// Use this structure direactly only if you'd like to have the full control over the reactor +/// thread. +/// +/// This runtime structure **does not** spawns a thread and is **blocking**. It implements the +/// actual reactor event loop. +pub struct Runtime { + service: H, + poller: P, + controller: Controller::Send>, + ctl_recv: Receiver>, + listeners: HashMap, + transports: HashMap, + waker: ::Recv, + timeouts: Timer, +} + +impl Runtime { + fn unregister_listener(&mut self, id: ResourceId) -> Option { + let Some(listener) = self.listeners.remove(&id) else { + #[cfg(feature = "log")] + log::warn!(target: "reactor", "Unregistering non-registered listener {id}"); + return None; + }; + + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Handling over listener {id} (fd={})", listener.as_raw_fd()); + + self.poller.unregister(id); + + Some(listener) + } + + fn unregister_transport(&mut self, id: ResourceId) -> Option { + let Some(transport) = self.transports.remove(&id) else { + #[cfg(feature = "log")] + log::warn!(target: "reactor", "Unregistering non-registered transport {id}"); + return None; + }; + + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Unregistering over transport {id} (fd={})", transport.as_raw_fd()); + + self.poller.unregister(id); + + Some(transport) + } +} + +impl ReactorRuntime

for Runtime { + const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); + + type Handler = H; + + fn with( + service: Self::Handler, + poller: P, + controller: Controller< + ::Command, + ::Send, + >, + ctl_recv: Receiver::Command>>, + waker_recv: ::Recv, + ) -> Self { + Runtime { + service, + poller, + controller, + ctl_recv, + listeners: empty!(), + transports: empty!(), + waker: waker_recv, + timeouts: Timer::new(), + } + } + + fn timeouts(&mut self) -> &mut Timer { &mut self.timeouts } + fn poller(&mut self) -> &mut P { &mut self.poller } + fn service(&mut self) -> &mut Self::Handler { &mut self.service } + fn resource_interests(&self) -> impl Iterator { + self.listeners + .iter() + .map(move |(id, handler)| (*id, handler.interests())) + .chain(self.transports.iter().map(move |(id, transport)| (*id, transport.interests()))) + } + + fn ctl_recv(&self) -> Result::Command>, TryRecvError> { + self.ctl_recv.try_recv() + } + + fn controller(&self) -> Controller<::Command, impl WakerSend> { + self.controller.clone() + } + + /// # Returns + /// + /// Whether it was awakened by a waker + fn handle_events(&mut self, time: Timestamp) -> bool { + let mut awoken = false; + + while let Some((id, res)) = self.poller.next() { + if id == ResourceId::WAKER { + if let Err(err) = res { + #[cfg(feature = "log")] + log::error!(target: "reactor", "Polling waker has failed: {err}"); + panic!("waker failure: {err}"); + }; + + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Awoken by the controller"); + + self.waker.reset(); + awoken = true; + } else if self.listeners.contains_key(&id) { + match res { + Ok(io) => { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Got `{io}` event from listener {id}"); + + let listener = self.listeners.get_mut(&id).expect("resource disappeared"); + for io in io { + if let Some(event) = listener.handle_io(io) { + let fd = listener.as_raw_fd(); + self.service.handle_listener_event(fd, id, event, time); + } + } + } + Err(err) => { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Listener {id} {err}"); + let listener = + self.unregister_listener(id).expect("listener has disappeared"); + self.service.handle_error(RuntimeError::Handler( + Error::ListenerDisconnect(id, listener), + )); + } + } + } else if self.transports.contains_key(&id) { + match res { + Ok(io) => { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Got `{io}` event from transport {id}"); + + let transport = self.transports.get_mut(&id).expect("resource disappeared"); + for io in io { + if let Some(event) = transport.handle_io(io) { + let fd = transport.as_raw_fd(); + self.service.handle_transport_event(fd, id, event, time); + } + } + } + Err(err) => { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Transport {id} {err}"); + let transport = + self.unregister_transport(id).expect("transport has disappeared"); + self.service.handle_error(RuntimeError::Handler( + Error::TransportDisconnect(id, transport), + )); + } + } + } else { + panic!( + "file descriptor in reactor which is not a known waker, listener or transport" + ) + } + } + + awoken + } + + /// # Safety + /// + /// Panics on `Action::Send` for read-only resources or resources which are not ready for a + /// write operation (i.e. returning `false` from [`WriteAtomic::is_ready_to_write`] + /// implementation. + fn handle_action( + &mut self, + action: Action, + time: Timestamp, + ) -> Result> { + match action { + Action::RegisterListener(listener) => { + let fd = listener.as_raw_fd(); + + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Registering listener with fd={fd}"); + + let id = self.poller.register(&listener, IoType::read_only()); + self.listeners.insert(id, listener); + self.service.handle_registered(fd, id, ResourceType::Listener); + } + Action::RegisterTransport(transport) => { + let fd = transport.as_raw_fd(); + + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Registering transport with fd={fd}"); + + let id = self.poller.register(&transport, IoType::read_only()); + self.transports.insert(id, transport); + self.service.handle_registered(fd, id, ResourceType::Transport); + } + Action::UnregisterListener(id) => { + let Some(listener) = self.unregister_listener(id) else { + return Ok(true); + }; + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Handling over listener {id}"); + self.service.handover_listener(id, listener); + } + Action::UnregisterTransport(id) => { + let Some(transport) = self.unregister_transport(id) else { + return Ok(true); + }; + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Handling over transport {id}"); + self.service.handover_transport(id, transport); + } + Action::Send(id, data) => { + #[cfg(feature = "log")] + log::trace!(target: "reactor", "Sending {} bytes to {id}", data.len()); + + let Some(transport) = self.transports.get_mut(&id) else { + #[cfg(feature = "log")] + log::error!(target: "reactor", "Transport {id} is not in the reactor"); + + return Ok(true); + }; + match transport.write_atomic(&data) { + Err(WriteError::NotReady) => { + #[cfg(feature = "log")] + log::error!(target: "reactor", internal = true; + "An attempt to write to transport {id} before it got ready"); + panic!( + "application business logic error: write to transport {id} which is \ + read-only or not ready for a write operation" + ); + } + Err(WriteError::Io(e)) => { + #[cfg(feature = "log")] + log::error!(target: "reactor", "Fatal error writing to transport {id}, disconnecting. Error details: {e:?}"); + if let Some(transport) = self.unregister_transport(id) { + return Err(Error::TransportDisconnect(id, transport)); + } + } + Ok(_) => {} + } + } + Action::SetTimer(duration) => { + #[cfg(feature = "log")] + log::debug!(target: "reactor", "Adding timer {duration:?} from now"); + + self.timeouts.set_timeout(duration, time); + } + Action::Terminate => return Ok(false), + } + Ok(true) + } + + fn handle_shutdown(self) { + #[cfg(feature = "log")] + log::info!(target: "reactor", "Shutdown"); + + // We just drop here? + } +} + +#[cfg(test)] +mod test { + use std::io::{self, stdout}; + use std::thread::sleep; + + use super::*; + use crate::{poller, Io, Reactor}; + + pub struct DumbRes(Box); + impl DumbRes { + pub fn new() -> DumbRes { DumbRes(Box::new(stdout())) } + } + impl AsRawFd for DumbRes { + fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() } + } + impl io::Write for DumbRes { + fn write(&mut self, buf: &[u8]) -> io::Result { Ok(buf.len()) } + fn flush(&mut self) -> io::Result<()> { Ok(()) } + } + impl WriteAtomic for DumbRes { + fn is_ready_to_write(&self) -> bool { true } + fn empty_write_buf(&mut self) -> io::Result { Ok(true) } + fn write_or_buf(&mut self, _buf: &[u8]) -> io::Result<()> { Ok(()) } + } + impl Resource for DumbRes { + type Event = (); + fn interests(&self) -> IoType { IoType::read_write() } + fn handle_io(&mut self, _io: Io) -> Option { None } + } + + #[test] + fn timer() { + #[derive(Clone, Eq, PartialEq, Debug)] + enum Cmd { + Init, + Expect(Vec), + } + #[derive(Clone, Eq, PartialEq, Debug)] + enum Event { + Timer, + } + #[derive(Clone, Debug, Default)] + struct DumbService { + pub add_resource: bool, + pub set_timer: bool, + pub log: Vec, + } + impl Iterator for DumbService { + type Item = Action; + fn next(&mut self) -> Option { + if self.add_resource { + self.add_resource = false; + Some(Action::RegisterTransport(DumbRes::new())) + } else if self.set_timer { + self.set_timer = false; + Some(Action::SetTimer(Duration::from_millis(3))) + } else { + None + } + } + } + impl ReactorHandler for DumbService { + type Command = Cmd; + type Action = Action; + type Error = Error; + + fn tick(&mut self, _time: Timestamp) {} + fn handle_timer(&mut self) { + self.log.push(Event::Timer); + self.set_timer = true; + } + fn handle_command(&mut self, cmd: Self::Command) { + match cmd { + Cmd::Init => { + self.add_resource = true; + self.set_timer = true; + } + Cmd::Expect(expected) => { + assert_eq!(expected, self.log); + } + } + } + fn handle_error(&mut self, err: RuntimeError) { panic!("{err}") } + } + impl Handler for DumbService { + type Listener = DumbRes; + type Transport = DumbRes; + + fn handle_listener_event( + &mut self, + _fd: RawFd, + _d: ResourceId, + _event: ::Event, + _time: Timestamp, + ) { + unreachable!() + } + fn handle_transport_event( + &mut self, + _fd: RawFd, + _id: ResourceId, + _event: ::Event, + _time: Timestamp, + ) { + unreachable!() + } + fn handle_registered(&mut self, _fd: RawFd, _id: ResourceId, _ty: ResourceType) {} + fn handover_listener(&mut self, _id: ResourceId, _listener: Self::Listener) { + unreachable!() + } + fn handover_transport(&mut self, _id: ResourceId, _transport: Self::Transport) { + unreachable!() + } + } + + let reactor = + Reactor::, _>::new(DumbService::default(), poller::popol::Poller::new()) + .unwrap(); + reactor.controller().cmd(Cmd::Init).unwrap(); + sleep(Duration::from_secs(2)); + reactor.controller().cmd(Cmd::Expect(vec![Event::Timer; 6])).unwrap(); + } +} diff --git a/src/timeouts.rs b/src/timeouts.rs index 768d8ad..daa25ab 100644 --- a/src/timeouts.rs +++ b/src/timeouts.rs @@ -2,24 +2,22 @@ // // SPDX-License-Identifier: Apache-2.0 // -// Written in 2021-2023 by +// Written in 2021-2025 by // Dr. Maxim Orlovsky // Alexis Sellier // -// Copyright 2022-2023 UBIDECO Institute, Switzerland -// Copyright 2021 Alexis Sellier +// Copyright 2022-2025 UBIDECO Labs, InDCS, Lugano, Switzerland. All Rights reserved. +// Copyright 2021-2023 Alexis Sellier . All Rights reserved. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under +// the License. use std::collections::BTreeSet; use std::ops::{Add, AddAssign, Sub, SubAssign};