@@ -71,7 +71,7 @@ impl Default for ConnectionRegistry {
71
71
}
72
72
}
73
73
74
- async fn refresh_routing_tables (
74
+ async fn refresh_all_routing_tables (
75
75
config : Config ,
76
76
connection_registry : Arc < ConnectionRegistry > ,
77
77
provider : Arc < dyn RoutingTableProvider > ,
@@ -200,7 +200,7 @@ pub(crate) fn start_background_updater(
200
200
// This thread is in charge of refreshing the routing table periodically
201
201
tokio:: spawn ( async move {
202
202
let mut bookmarks = vec ! [ ] ;
203
- let mut ttl = refresh_routing_tables (
203
+ let mut ttl = refresh_all_routing_tables (
204
204
config_clone. clone ( ) ,
205
205
registry. clone ( ) ,
206
206
provider. clone ( ) ,
@@ -210,12 +210,14 @@ pub(crate) fn start_background_updater(
210
210
. expect ( "Failed to get routing table. Exiting..." ) ;
211
211
debug ! ( "Starting background updater with TTL: {}" , ttl) ;
212
212
let mut interval = tokio:: time:: interval ( Duration :: from_secs ( ttl) ) ;
213
+ let now = std:: time:: Instant :: now ( ) ;
213
214
interval. tick ( ) . await ; // first tick is immediate
214
215
loop {
215
216
tokio:: select! {
216
217
// Trigger periodic updates
217
218
_ = interval. tick( ) => {
218
- ttl = match refresh_routing_tables( config_clone. clone( ) , registry. clone( ) , provider. clone( ) , bookmarks. as_slice( ) ) . await {
219
+ debug!( "Refreshing all routing tables ({})" , registry. databases. len( ) ) ;
220
+ ttl = match refresh_all_routing_tables( config_clone. clone( ) , registry. clone( ) , provider. clone( ) , bookmarks. as_slice( ) ) . await {
219
221
Ok ( ttl) => ttl,
220
222
Err ( e) => {
221
223
debug!( "Failed to refresh routing table: {}" , e) ;
@@ -228,16 +230,24 @@ pub(crate) fn start_background_updater(
228
230
match cmd {
229
231
Some ( RegistryCommand :: RefreshSingleTable ( ( db, new_bookmarks) ) ) => {
230
232
let db_name = db. as_ref( ) . map( |d| d. to_string( ) ) . unwrap_or_default( ) ;
233
+ debug!( "Forcing refresh of routing table for database: {}" , db_name) ;
231
234
bookmarks = new_bookmarks;
232
235
ttl = match refresh_routing_table( & config_clone, & registry. pool_registry, provider. clone( ) , bookmarks. as_slice( ) , db) . await {
233
236
Ok ( table) => {
234
237
registry. databases. insert( db_name. clone( ) , table. resolve( ) ) ;
235
- debug!( "Successfully refreshed routing table for database {}" , db_name) ;
236
- table. ttl
238
+ // we don't want to lose the initial TTL synchronization: if the forced update is triggered,
239
+ // we derive the TTL from the initial time. Example:
240
+ // if the TTL is 60 seconds and the forced update is triggered after 10 seconds,
241
+ // we want to set the TTL to 50 seconds, so that the next update will be in 50 seconds
242
+ ttl - ( now. elapsed( ) . as_secs( ) % table. ttl)
237
243
}
238
244
Err ( e) => {
239
245
debug!( "Failed to refresh routing table: {}" , e) ;
240
- ttl
246
+ // we don't want to lose the initial TTL synchronization: if the forced update is triggered,
247
+ // we derive the TTL from the initial time. Example:
248
+ // if the TTL is 60 seconds and the forced update is triggered after 10 seconds,
249
+ // we want to set the TTL to 50 seconds, so that the next update will be in 50 seconds
250
+ ttl - ( now. elapsed( ) . as_secs( ) % ttl)
241
251
}
242
252
} ;
243
253
}
@@ -250,7 +260,8 @@ pub(crate) fn start_background_updater(
250
260
}
251
261
252
262
debug ! ( "Resetting interval with TTL: {}" , ttl) ;
253
- interval = tokio:: time:: interval ( Duration :: from_secs ( ttl) ) ; // recreate interval with the new TTL
263
+ // recreate interval with the new TTL or the derived one in case of a forced update
264
+ interval = tokio:: time:: interval ( Duration :: from_secs ( ttl) ) ;
254
265
interval. tick ( ) . await ;
255
266
}
256
267
} ) ;
@@ -295,8 +306,6 @@ impl ConnectionRegistry {
295
306
. map ( |entry| entry. value ( ) . clone ( ) )
296
307
. unwrap_or_default ( )
297
308
} else {
298
- debug ! ( "Creating new registry for database: {}" , db_name) ;
299
- self . databases . insert ( db_name. clone ( ) , Vec :: new ( ) ) ;
300
309
vec ! [ ]
301
310
}
302
311
}
@@ -409,7 +418,7 @@ mod tests {
409
418
tls_config : ConnectionTLSConfig :: None ,
410
419
} ;
411
420
let registry = Arc :: new ( ConnectionRegistry :: default ( ) ) ;
412
- let ttl = refresh_routing_tables (
421
+ let ttl = refresh_all_routing_tables (
413
422
config. clone ( ) ,
414
423
registry. clone ( ) ,
415
424
Arc :: new ( TestRoutingTableProvider :: new ( & [ cluster_routing_table] ) ) ,
@@ -525,7 +534,7 @@ mod tests {
525
534
cluster_routing_table_1,
526
535
cluster_routing_table_2,
527
536
] ) ) ;
528
- refresh_routing_tables ( config. clone ( ) , registry. clone ( ) , provider. clone ( ) , & [ ] )
537
+ refresh_all_routing_tables ( config. clone ( ) , registry. clone ( ) , provider. clone ( ) , & [ ] )
529
538
. await
530
539
. unwrap ( ) ;
531
540
@@ -535,7 +544,7 @@ mod tests {
535
544
536
545
let _ = registry. servers ( Some ( "db1" . into ( ) ) ) ; // ensure db1 is initialized
537
546
let _ = registry. servers ( Some ( "db2" . into ( ) ) ) ; // ensure db2 is initialized
538
- let ttl = refresh_routing_tables ( config. clone ( ) , registry. clone ( ) , provider. clone ( ) , & [ ] )
547
+ let ttl = refresh_all_routing_tables ( config. clone ( ) , registry. clone ( ) , provider. clone ( ) , & [ ] )
539
548
. await
540
549
. unwrap ( ) ;
541
550
0 commit comments