@@ -46,7 +46,7 @@ fn clickhouse_task_runner(
4646 tracing:: debug!( "performing write" ) ;
4747
4848 let response = client
49- . send ( encoded_rows)
49+ . send ( encoded_rows, RetryConfig :: default ( ) )
5050 . await
5151 . map_err ( RunTaskError :: Other ) ?;
5252
@@ -121,6 +121,22 @@ where
121121 }
122122}
123123
124+ pub struct RetryConfig {
125+ initial_backoff_ms : f64 ,
126+ max_retries : usize ,
127+ jitter_factor : f64 , // between 0 and 1
128+ }
129+
130+ impl Default for RetryConfig {
131+ fn default ( ) -> Self {
132+ Self {
133+ initial_backoff_ms : 500.0 ,
134+ max_retries : 4 ,
135+ jitter_factor : 0.2 ,
136+ }
137+ }
138+ }
139+
124140#[ derive( Clone ) ]
125141pub struct ClickhouseClient {
126142 client : Client ,
@@ -163,15 +179,12 @@ impl ClickhouseClient {
163179 }
164180 }
165181
166- pub async fn send ( & self , body : Vec < u8 > ) -> anyhow:: Result < Response > {
167- const MAX_RETRIES : usize = 4 ;
168- const INITIAL_BACKOFF_MS : u64 = 50 ;
169-
182+ pub async fn send ( & self , body : Vec < u8 > , retry_config : RetryConfig ) -> anyhow:: Result < Response > {
170183 // Convert to Bytes once for efficient cloning since sending the request
171184 // moves the body into the request body.
172185 let body_bytes = bytes:: Bytes :: from ( body) ;
173186
174- for attempt in 0 ..=MAX_RETRIES {
187+ for attempt in 0 ..=retry_config . max_retries {
175188 let res = self
176189 . client
177190 . post ( & self . url )
@@ -192,11 +205,11 @@ impl ClickhouseClient {
192205 . await
193206 . unwrap_or_else ( |_| "unknown error" . to_string ( ) ) ;
194207
195- if attempt == MAX_RETRIES {
208+ if attempt == retry_config . max_retries {
196209 counter ! ( "rust_consumer.clickhouse_insert_error" , 1 , "status" => status, "retried" => "false" ) ;
197210 anyhow:: bail!(
198211 "error writing to clickhouse after {} attempts: {}" ,
199- MAX_RETRIES + 1 ,
212+ retry_config . max_retries + 1 ,
200213 error_text
201214 ) ;
202215 }
@@ -205,18 +218,18 @@ impl ClickhouseClient {
205218 tracing:: warn!(
206219 "ClickHouse write failed (attempt {}/{}): status={}, error={}" ,
207220 attempt + 1 ,
208- MAX_RETRIES + 1 ,
221+ retry_config . max_retries + 1 ,
209222 status,
210223 error_text
211224 ) ;
212225 }
213226 }
214227 Err ( e) => {
215- if attempt == MAX_RETRIES {
228+ if attempt == retry_config . max_retries {
216229 counter ! ( "rust_consumer.clickhouse_insert_error" , 1 , "status" => "network_error" , "retried" => "false" ) ;
217230 anyhow:: bail!(
218231 "error writing to clickhouse after {} attempts: {}" ,
219- MAX_RETRIES + 1 ,
232+ retry_config . max_retries + 1 ,
220233 e
221234 ) ;
222235 }
@@ -225,21 +238,26 @@ impl ClickhouseClient {
225238 tracing:: warn!(
226239 "ClickHouse write failed (attempt {}/{}): {}" ,
227240 attempt + 1 ,
228- MAX_RETRIES + 1 ,
241+ retry_config . max_retries + 1 ,
229242 e
230243 ) ;
231244 }
232245 }
233246
234247 // Calculate exponential backoff delay
235- if attempt < MAX_RETRIES {
236- let backoff_ms = INITIAL_BACKOFF_MS * ( 2_u64 . pow ( attempt as u32 ) ) ;
237- let delay = Duration :: from_millis ( backoff_ms) ;
248+ if attempt < retry_config. max_retries {
249+ let backoff_ms =
250+ retry_config. initial_backoff_ms * ( 2_u64 . pow ( attempt as u32 ) as f64 ) ;
251+ // add/subtract up to 10% jitter (by default) to avoid every consumer retrying at the same time
252+ // causing too many simultaneous queries
253+ let jitter = rand:: random :: < f64 > ( ) * retry_config. jitter_factor
254+ - retry_config. jitter_factor / 2.0 ; // Random value between (-jitter_factor/2, jitter_factor/2)
255+ let delay = Duration :: from_millis ( ( backoff_ms * ( 1.0 + jitter) ) . round ( ) as u64 ) ;
238256 tracing:: debug!(
239257 "Retrying in {:?} (attempt {}/{})" ,
240258 delay,
241259 attempt + 1 ,
242- MAX_RETRIES
260+ retry_config . max_retries
243261 ) ;
244262 tokio:: time:: sleep ( delay) . await ;
245263 }
@@ -280,7 +298,7 @@ mod tests {
280298 assert ! ( client. url. contains( "load_balancing" ) ) ;
281299 assert ! ( client. url. contains( "insert_distributed_sync" ) ) ;
282300 println ! ( "running test" ) ;
283- let res = client. send ( b"[]" . to_vec ( ) ) . await ;
301+ let res = client. send ( b"[]" . to_vec ( ) , RetryConfig :: default ( ) ) . await ;
284302 println ! ( "Response status {}" , res. unwrap( ) . status( ) ) ;
285303 Ok ( ( ) )
286304 }
@@ -302,15 +320,24 @@ mod tests {
302320 let client = ClickhouseClient :: new ( & config, "test_table" ) ;
303321
304322 let start_time = Instant :: now ( ) ;
305- let result = client. send ( b"test data" . to_vec ( ) ) . await ;
323+ let result = client
324+ . send (
325+ b"test data" . to_vec ( ) ,
326+ RetryConfig {
327+ initial_backoff_ms : 100.0 ,
328+ max_retries : 4 ,
329+ jitter_factor : 0.1 ,
330+ } ,
331+ )
332+ . await ;
306333 let elapsed = start_time. elapsed ( ) ;
307334
308335 // Should fail after all retries
309336 assert ! ( result. is_err( ) ) ;
310337
311338 // Should have taken at least the sum of our backoff delays
312- // 50ms + 100ms + 200ms + 400ms = 750ms minimum
313- assert ! ( elapsed >= Duration :: from_millis( 750 ) ) ;
339+ // 90ms + 180ms + 360ms + 720ms = 1350ms minimum
340+ assert ! ( elapsed >= Duration :: from_millis( 1350 ) ) ;
314341
315342 // Error message should mention the number of attempts
316343 let error_msg = result. unwrap_err ( ) . to_string ( ) ;
0 commit comments