Skip to content

Commit fc84a43

Browse files
fix: OnEvent is not propagating data in some scenarios with postgres (getdozer#637)
* chore: simplify stream_pipeline_request * fix: multiple client subscribe * chore: add log if broadcast error * fix: Fix `Field` to `grpc::types::Value` conversion * chore: fix issue delay async Co-authored-by: chubei <[email protected]>
1 parent 2b1d614 commit fc84a43

File tree

2 files changed

+62
-34
lines changed

2 files changed

+62
-34
lines changed

dozer-api/src/grpc/internal/internal_pipeline_server.rs

Lines changed: 61 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,45 +6,90 @@ use crate::grpc::internal_grpc::{
66
use crossbeam::channel::Receiver;
77
use dozer_types::{crossbeam, log::info, models::app_config::Config, tracing::warn};
88
use std::{net::ToSocketAddrs, pin::Pin};
9-
use tokio::runtime::Runtime;
10-
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
9+
use tokio::{
10+
runtime::Runtime,
11+
sync::broadcast::{self, Sender},
12+
};
13+
use tokio_stream::wrappers::ReceiverStream;
1114
use tonic::{codegen::futures_core::Stream, transport::Server, Response, Status};
1215

1316
pub struct InternalPipelineServer {
1417
app_config: Config,
15-
receiver: Receiver<PipelineResponse>,
18+
receiver: broadcast::Receiver<PipelineResponse>,
19+
}
20+
impl InternalPipelineServer {
21+
pub fn new(app_config: Config, receiver: Receiver<PipelineResponse>) -> Self {
22+
let (tx, rx1) = broadcast::channel::<PipelineResponse>(16);
23+
tokio::spawn(async move {
24+
Self::setup_broad_cast_channel(tx, receiver);
25+
});
26+
Self {
27+
app_config,
28+
receiver: rx1,
29+
}
30+
}
31+
32+
fn setup_broad_cast_channel(
33+
tx: Sender<PipelineResponse>,
34+
receiver: Receiver<PipelineResponse>,
35+
) {
36+
loop {
37+
let message = receiver.recv();
38+
match message {
39+
Ok(message) => {
40+
let result = tx.send(message);
41+
if let Err(e) = result {
42+
warn!("Internal Pipeline server - Error sending message to broadcast channel: {:?}", e);
43+
}
44+
}
45+
Err(err) => {
46+
warn!(
47+
"Internal Pipeline server - message reveived error: {:?}",
48+
err
49+
);
50+
break;
51+
}
52+
}
53+
}
54+
}
1655
}
1756
type ResponseStream = Pin<Box<dyn Stream<Item = Result<PipelineResponse, Status>> + Send>>;
1857

1958
#[tonic::async_trait]
2059
impl InternalPipelineService for InternalPipelineServer {
2160
type StreamPipelineRequestStream = ResponseStream;
22-
2361
async fn stream_pipeline_request(
2462
&self,
2563
_request: tonic::Request<PipelineRequest>,
2664
) -> Result<Response<ResponseStream>, Status> {
2765
let (tx, rx) = tokio::sync::mpsc::channel(1000);
28-
let iterator = InternalIterator {
29-
receiver: self.receiver.to_owned(),
30-
};
31-
let in_stream = tokio_stream::iter(iterator);
32-
let mut stream = Box::pin(in_stream);
66+
let mut receiver = self.receiver.resubscribe();
3367
tokio::spawn(async move {
34-
while let Some(item) = stream.next().await {
35-
if let Err(_item) = tx.send(Result::<_, Status>::Ok(item)).await {
36-
warn!("output_stream was build from rx and both are dropped");
37-
break;
68+
loop {
69+
let result = receiver.try_recv();
70+
match result {
71+
Ok(message) => {
72+
let result = tx.send(Ok(message)).await;
73+
if let Err(e) = result {
74+
warn!("Error sending message to mpsc channel: {:?}", e);
75+
break;
76+
}
77+
}
78+
Err(err) => {
79+
if err == broadcast::error::TryRecvError::Closed {
80+
break;
81+
}
82+
}
3883
}
84+
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
3985
}
40-
warn!("client disconnected");
4186
});
42-
4387
let output_stream = ReceiverStream::new(rx);
4488
Ok(Response::new(
4589
Box::pin(output_stream) as Self::StreamPipelineRequestStream
4690
))
4791
}
92+
4893
async fn get_config(
4994
&self,
5095
_request: tonic::Request<GetAppConfigRequest>,
@@ -72,10 +117,7 @@ async fn _start_internal_pipeline_server(
72117
app_config: Config,
73118
receiver: Receiver<PipelineResponse>,
74119
) -> Result<(), tonic::transport::Error> {
75-
let server = InternalPipelineServer {
76-
app_config: app_config.to_owned(),
77-
receiver,
78-
};
120+
let server = InternalPipelineServer::new(app_config.to_owned(), receiver);
79121

80122
let internal_config = app_config
81123
.api
@@ -95,17 +137,3 @@ async fn _start_internal_pipeline_server(
95137
.serve(addr.next().unwrap())
96138
.await
97139
}
98-
99-
struct InternalIterator {
100-
receiver: Receiver<PipelineResponse>,
101-
}
102-
impl Iterator for InternalIterator {
103-
type Item = PipelineResponse;
104-
105-
fn next(&mut self) -> Option<Self::Item> {
106-
match self.receiver.recv() {
107-
Ok(msg) => Some(msg),
108-
Err(_) => None,
109-
}
110-
}
111-
}

dozer-api/src/grpc/types_helper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ fn field_to_prost_value(f: Field) -> Value {
4747
value: Some(value::Value::IntValue(n)),
4848
},
4949
Field::Float(n) => Value {
50-
value: Some(value::Value::FloatValue(n.0 as f32)),
50+
value: Some(value::Value::DoubleValue(n.0)),
5151
},
5252

5353
Field::Boolean(n) => Value {

0 commit comments

Comments
 (0)