Skip to content

Commit f192d42

Browse files
committed
import uservice and uthread from internal repo
1 parent b43dbaf commit f192d42

File tree

5 files changed

+334
-5
lines changed

5 files changed

+334
-5
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ crate-type = ["rlib", "staticlib"]
1818

1919
[dependencies]
2020
amplify = "4.7.0"
21+
crossbeam-channel = "0.5.13"
2122
log = { version = "0.4.17", optional = true }
2223

2324
[target.'cfg(target_arch = "wasm32")'.dependencies]
@@ -35,6 +36,7 @@ rustc-args = ["--cfg", "docsrs"]
3536

3637
[features]
3738
default = []
38-
all = ["log"]
39+
all = ["log", "stderr"]
3940

41+
stderr = []
4042
log = ["dep:log"]

src/lib.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
// Written in 2022-2025 by
66
// Dr. Maxim Orlovsky <[email protected]>
77
//
8-
// Copyright (C) 2022-2025 Cyphernet Labs,
9-
// Institute for Distributed and Cognitive Systems,
8+
// Copyright (C) 2022-2025 Cyphernet Labs,
9+
// Institute for Distributed and Cognitive Systems,
1010
// Lugano, Switzerland
1111
// All rights reserved
1212
//
@@ -24,5 +24,8 @@
2424

2525
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
2626

27-
#[macro_use]
28-
extern crate amplify;
27+
mod uservice;
28+
mod uthread;
29+
30+
pub use uservice::{UError, UErrorMsg, UErrorSender, UResponder, UResult, USender, UService};
31+
pub use uthread::UThread;

src/uservice.rs

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Channel-based non-blocking microservices without use of async
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Written in 2022-2025 by
6+
// Dr. Maxim Orlovsky <[email protected]>
7+
//
8+
// Copyright (C) 2022-2025 Cyphernet Labs,
9+
// Institute for Distributed and Cognitive Systems,
10+
// Lugano, Switzerland
11+
// All rights reserved
12+
//
13+
// Licensed under the Apache License, Version 2.0 (the "License");
14+
// you may not use this file except in compliance with the License.
15+
// You may obtain a copy of the License at
16+
//
17+
// http://www.apache.org/licenses/LICENSE-2.0
18+
//
19+
// Unless required by applicable law or agreed to in writing, software
20+
// distributed under the License is distributed on an "AS IS" BASIS,
21+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22+
// See the License for the specific language governing permissions and
23+
// limitations under the License.
24+
25+
use std::fmt::Display;
26+
use std::ops::ControlFlow;
27+
use std::time::{Duration, Instant};
28+
29+
use crossbeam_channel::{SendError, SendTimeoutError, Sender, TrySendError};
30+
31+
pub type UError = Box<dyn Display + Send>;
32+
pub type UResult<T = ()> = Result<T, UError>;
33+
34+
#[derive(Clone, Debug)]
35+
pub struct UResponder<T = (), E = UError>(Option<Sender<Result<T, E>>>);
36+
37+
impl<T, E> UResponder<T, E> {
38+
pub fn respond(&self, msg: Result<T, E>) -> Result<(), SendError<Result<T, E>>> {
39+
if let Some(sender) = &self.0 { sender.send(msg) } else { Ok(()) }
40+
}
41+
}
42+
43+
#[derive(Clone)]
44+
pub(crate) enum UMsg<Msg> {
45+
Msg(Msg),
46+
Terminate,
47+
}
48+
49+
#[derive(Clone, Debug)]
50+
pub struct UErrorMsg {
51+
pub service: String,
52+
pub error: String,
53+
}
54+
55+
pub trait UService: Send + 'static {
56+
type Msg: Send;
57+
type Error: Display + Sync;
58+
const NAME: &'static str;
59+
60+
fn tick(&mut self) -> Result<(), Self::Error> {
61+
// By default, do nothing
62+
Ok(())
63+
}
64+
fn process(&mut self, msg: Self::Msg) -> Result<ControlFlow<u8>, Self::Error>;
65+
fn terminate(&mut self);
66+
fn monitor(&self) -> Option<&USender<UErrorMsg>> { None }
67+
68+
fn error(&self, context: &str, err: impl Display) {
69+
self.error_sender().report(context, err.to_string())
70+
}
71+
72+
fn error_brief(&self, err: impl Display) { self.error_sender().report_brief(err.to_string()) }
73+
74+
fn error_sender(&self) -> UErrorSender {
75+
UErrorSender {
76+
sender: self.monitor().cloned(),
77+
service_name: Self::NAME,
78+
}
79+
}
80+
81+
fn set_self_sender(&mut self, _sender: USender<Self::Msg>) {
82+
// By default, do nothing
83+
}
84+
fn self_sender(&self) -> USender<Self::Msg> {
85+
// By default, panic
86+
panic!("the sender was not set");
87+
}
88+
}
89+
90+
pub struct UErrorSender {
91+
sender: Option<USender<UErrorMsg>>,
92+
service_name: &'static str,
93+
}
94+
95+
impl UErrorSender {
96+
pub fn report(&self, context: &str, err: impl ToString) {
97+
self.report_brief(format!("{context} - {}", err.to_string()))
98+
}
99+
100+
pub fn report_brief(&self, err: impl ToString) {
101+
#[cfg(feature = "log")]
102+
{
103+
let error = err.to_string();
104+
log::error!(target: self.service_name, "{error}");
105+
106+
let Some(sender) = &self.sender else {
107+
return;
108+
};
109+
if sender
110+
.send(UErrorMsg {
111+
service: self.service_name.to_string(),
112+
error,
113+
})
114+
.is_err()
115+
{
116+
log::error!(target: self.service_name, "Broken monitor channel");
117+
}
118+
}
119+
#[cfg(feature = "stderr")]
120+
eprintln!("Error in {}: {}", self.service_name, err.to_string());
121+
}
122+
}
123+
124+
#[derive(Clone, Debug)]
125+
pub struct USender<Msg>(pub(crate) Sender<UMsg<Msg>>);
126+
127+
impl<Msg> USender<Msg> {
128+
fn convert_timeout_error(err: SendTimeoutError<UMsg<Msg>>) -> SendTimeoutError<Msg> {
129+
match err {
130+
SendTimeoutError::Timeout(UMsg::Msg(msg)) => SendTimeoutError::Timeout(msg),
131+
SendTimeoutError::Disconnected(UMsg::Msg(msg)) => SendTimeoutError::Disconnected(msg),
132+
SendTimeoutError::Timeout(UMsg::Terminate)
133+
| SendTimeoutError::Disconnected(UMsg::Terminate) => {
134+
unreachable!()
135+
}
136+
}
137+
}
138+
139+
pub fn send(&self, msg: Msg) -> Result<(), SendError<Msg>> {
140+
self.0.send(UMsg::Msg(msg)).map_err(|SendError(msg)| match msg {
141+
UMsg::Msg(msg) => SendError(msg),
142+
UMsg::Terminate => unreachable!(),
143+
})
144+
}
145+
146+
pub fn try_send(&self, msg: Msg) -> Result<(), TrySendError<Msg>> {
147+
self.0.try_send(UMsg::Msg(msg)).map_err(|err| match err {
148+
TrySendError::Full(UMsg::Msg(msg)) => TrySendError::Full(msg),
149+
TrySendError::Disconnected(UMsg::Msg(msg)) => TrySendError::Disconnected(msg),
150+
TrySendError::Full(UMsg::Terminate) | TrySendError::Disconnected(UMsg::Terminate) => {
151+
unreachable!()
152+
}
153+
})
154+
}
155+
156+
pub fn send_timeout(&self, msg: Msg, timeout: Duration) -> Result<(), SendTimeoutError<Msg>> {
157+
self.0.send_timeout(UMsg::Msg(msg), timeout).map_err(Self::convert_timeout_error)
158+
}
159+
160+
pub fn send_deadline(&self, msg: Msg, deadline: Instant) -> Result<(), SendTimeoutError<Msg>> {
161+
self.0.send_deadline(UMsg::Msg(msg), deadline).map_err(Self::convert_timeout_error)
162+
}
163+
164+
#[inline]
165+
pub fn is_empty(&self) -> bool { self.0.is_empty() }
166+
167+
#[inline]
168+
pub fn is_full(&self) -> bool { self.0.is_full() }
169+
170+
#[inline]
171+
pub fn len(&self) -> usize { self.0.len() }
172+
173+
#[inline]
174+
pub fn capacity(&self) -> Option<usize> { self.0.capacity() }
175+
}

src/uthread.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Channel-based non-blocking microservices without use of async
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Written in 2022-2025 by
6+
// Dr. Maxim Orlovsky <[email protected]>
7+
//
8+
// Copyright (C) 2022-2025 Cyphernet Labs,
9+
// Institute for Distributed and Cognitive Systems,
10+
// Lugano, Switzerland
11+
// All rights reserved
12+
//
13+
// Licensed under the Apache License, Version 2.0 (the "License");
14+
// you may not use this file except in compliance with the License.
15+
// You may obtain a copy of the License at
16+
//
17+
// http://www.apache.org/licenses/LICENSE-2.0
18+
//
19+
// Unless required by applicable law or agreed to in writing, software
20+
// distributed under the License is distributed on an "AS IS" BASIS,
21+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22+
// See the License for the specific language governing permissions and
23+
// limitations under the License.
24+
25+
use std::ops::ControlFlow;
26+
use std::thread;
27+
use std::thread::JoinHandle;
28+
use std::time::Duration;
29+
30+
use crossbeam_channel::{RecvTimeoutError, Sender};
31+
32+
use crate::uservice::UMsg;
33+
use crate::{USender, UService};
34+
35+
#[derive(Debug)]
36+
pub struct UThread<S: UService> {
37+
thread: Option<JoinHandle<()>>,
38+
sender: Sender<UMsg<S::Msg>>,
39+
}
40+
41+
impl<S: UService> UThread<S> {
42+
pub fn new(mut service: S, ticks: Option<Duration>) -> Self {
43+
let (sender, receiver) = crossbeam_channel::unbounded();
44+
service.set_self_sender(USender(sender.clone()));
45+
let thread = thread::spawn(move || {
46+
loop {
47+
let recv = || {
48+
if let Some(timeout) = ticks {
49+
receiver.recv_timeout(timeout)
50+
} else {
51+
receiver.recv().map_err(|_| RecvTimeoutError::Disconnected)
52+
}
53+
};
54+
let msg = match recv() {
55+
Ok(UMsg::Msg(msg)) => msg,
56+
Ok(UMsg::Terminate) => {
57+
#[cfg(feature = "log")]
58+
log::debug!(target: S::NAME, "got terminate command");
59+
service.terminate();
60+
break;
61+
}
62+
Err(RecvTimeoutError::Timeout) => {
63+
#[cfg(feature = "log")]
64+
log::trace!(target: S::NAME, "timed out, restarting the event loop");
65+
if let Err(err) = service.tick() {
66+
service.error("service tick error", err)
67+
};
68+
continue;
69+
}
70+
Err(RecvTimeoutError::Disconnected) => {
71+
#[cfg(feature = "log")]
72+
log::error!(target: S::NAME, "service channel got disconnected");
73+
service.error("channel to the service is broken", "disconnected");
74+
break;
75+
}
76+
};
77+
match service.process(msg) {
78+
Err(err) => {
79+
service.error("service process error", err);
80+
}
81+
Ok(ControlFlow::Break(code)) => {
82+
if code == 0 {
83+
#[cfg(feature = "log")]
84+
log::info!(target: S::NAME, "thread is stopping on service request");
85+
} else {
86+
#[cfg(feature = "log")]
87+
log::debug!(target: S::NAME, "stopping thread due to status {code} returned from the service");
88+
}
89+
service.terminate();
90+
break;
91+
}
92+
Ok(ControlFlow::Continue(())) => {}
93+
}
94+
}
95+
#[cfg(feature = "log")]
96+
log::info!(target: S::NAME, "thread is stopped");
97+
});
98+
99+
Self {
100+
thread: Some(thread),
101+
sender,
102+
}
103+
}
104+
105+
pub fn sender(&self) -> USender<S::Msg> { USender(self.sender.clone()) }
106+
107+
pub fn join(&mut self) -> thread::Result<()> {
108+
if let Some(thread) = self.thread.take() {
109+
return thread.join().inspect_err(|_| {
110+
#[cfg(feature = "log")]
111+
log::error!(target: S::NAME, "unable to complete thread")
112+
});
113+
}
114+
Ok(())
115+
}
116+
}
117+
118+
impl<S: UService> Drop for UThread<S> {
119+
fn drop(&mut self) {
120+
#[cfg(feature = "log")]
121+
log::debug!(target: S::NAME, "ordering service to terminate");
122+
self.sender.send(UMsg::Terminate).unwrap_or_else(|err| {
123+
panic!("unable to send terminate command to the {} thread: {err}", S::NAME)
124+
});
125+
if let Some(thread) = self.thread.take() {
126+
#[cfg(feature = "log")]
127+
log::info!(target: S::NAME, "waiting for the service thread to complete");
128+
thread
129+
.join()
130+
.unwrap_or_else(|err| panic!("unable to join the {} thread: {err:?}", S::NAME))
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)