Skip to content

Commit b273004

Browse files
committed
flamenco, gossip: use vector as push queue; order-preserving cleanup
1 parent 7951566 commit b273004

File tree

1 file changed

+79
-44
lines changed

1 file changed

+79
-44
lines changed

src/flamenco/gossip/fd_gossip.c

+79-44
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ void fd_hash_copy( fd_hash_t * keyd, const fd_hash_t * keys ) {
174174
larger size footprint (100x). Decoupling the two
175175
allows for retaining more metadata without significantly
176176
increasing memory footprint and iteration overhead.
177-
178-
177+
178+
179179
An entry in the vector must have a corresponding entry
180180
in the metadata map, while an entry in the metadata
181181
map may not have an entry in the vector (denoted by a
@@ -189,13 +189,14 @@ struct fd_value {
189189
fd_pubkey_t origin; /* Where did this value originate */
190190
uchar data[PACKET_DATA_SIZE]; /* Serialized form of value (bincode) including signature */
191191
ulong datalen;
192+
ulong del; /* Set to queue for deletion in fd_gossip_cleanup */
192193
};
193194

194195
typedef struct fd_value fd_value_t;
195196

196197
/* Value vector that:
197-
- backs the values pointed by fd_value_meta_t->value
198-
- is used in generating push and pullresp
198+
- backs the values pointed by fd_value_meta_t->value
199+
- is used in generating push and pullresp
199200
messages */
200201
#define VEC_NAME fd_value_vec
201202
#define VEC_T fd_value_t
@@ -405,10 +406,8 @@ struct fd_gossip {
405406
fd_push_state_t * push_states[FD_PUSH_LIST_MAX];
406407
ulong push_states_cnt;
407408
fd_push_state_t * push_states_pool;
408-
/* Queue of values vec ptrs that need pushing */
409-
fd_value_t * * need_push;
409+
/* Index into values vector */
410410
ulong need_push_head;
411-
ulong need_push_cnt;
412411

413412
/* Table of receive statistics */
414413
fd_stats_elem_t * stats;
@@ -459,7 +458,6 @@ fd_gossip_footprint( void ) {
459458
l = FD_LAYOUT_APPEND( l, fd_peer_table_align(), fd_peer_table_footprint(FD_PEER_KEY_MAX) );
460459
l = FD_LAYOUT_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) );
461460
l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_peer_addr_t), INACTIVES_MAX*sizeof(fd_gossip_peer_addr_t) );
462-
l = FD_LAYOUT_APPEND( l, alignof(fd_value_t *), FD_NEED_PUSH_MAX*sizeof(fd_value_t *) );
463461
l = FD_LAYOUT_APPEND( l, fd_value_meta_map_align(), fd_value_meta_map_footprint( FD_VALUE_KEY_MAX ) );
464462
l = FD_LAYOUT_APPEND( l, fd_value_vec_align(), fd_value_vec_footprint( FD_VALUE_DATA_MAX ) );
465463
l = FD_LAYOUT_APPEND( l, fd_pending_heap_align(), fd_pending_heap_footprint(FD_PENDING_MAX) );
@@ -487,13 +485,13 @@ fd_gossip_new ( void * shmem, ulong seed ) {
487485
glob->actives = fd_active_table_join(fd_active_table_new(shm, FD_ACTIVE_KEY_MAX, seed));
488486

489487
glob->inactives = (fd_gossip_peer_addr_t*)FD_SCRATCH_ALLOC_APPEND(l, alignof(fd_gossip_peer_addr_t), INACTIVES_MAX*sizeof(fd_gossip_peer_addr_t));
490-
glob->need_push = (fd_value_t * *)FD_SCRATCH_ALLOC_APPEND(l, alignof(fd_value_t *), FD_NEED_PUSH_MAX*sizeof(fd_value_t *));
491488

492489
shm = FD_SCRATCH_ALLOC_APPEND( l, fd_value_meta_map_align(), fd_value_meta_map_footprint( FD_VALUE_KEY_MAX ) );
493490
glob->value_metas = fd_value_meta_map_join( fd_value_meta_map_new( shm, FD_VALUE_KEY_MAX, seed ) );
494491

495492
shm = FD_SCRATCH_ALLOC_APPEND( l, fd_value_vec_align(), fd_value_vec_footprint( FD_VALUE_DATA_MAX ) );
496493
glob->values = fd_value_vec_join( fd_value_vec_new( shm, FD_VALUE_DATA_MAX ) );
494+
glob->need_push_head = 0; // point to start of values
497495

498496
glob->last_contact_time = 0;
499497

@@ -1235,8 +1233,8 @@ fd_gossip_random_pull( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
12351233
fd_hash_t * hash = &(ele->key);
12361234

12371235
/* Purge expired value's data entry */
1238-
if (ele->wallclock < expire && ele->value != NULL) {
1239-
fd_value_vec_remove_ele( glob->values, ele->value );
1236+
if( ele->wallclock<expire && ele->value!=NULL ) {
1237+
ele->value->del = 1; // Mark for deletion
12401238
ele->value = NULL;
12411239
continue;
12421240
}
@@ -1511,6 +1509,8 @@ fd_gossip_recv_crds_array( fd_gossip_t * glob, const fd_gossip_peer_addr_t * fro
15111509
fd_crds_value_t * crd = &crds[ i ];
15121510
retained_crds[ num_retained_crds ] = crd; /* for use in insert pass */
15131511

1512+
val->del = 0UL;
1513+
15141514
/* Setup fd_crds_value_processed_t entry */
15151515
switch( crd->data.discriminant ) {
15161516
case fd_crds_data_enum_contact_info_v1:
@@ -1648,15 +1648,6 @@ fd_gossip_recv_crds_array( fd_gossip_t * glob, const fd_gossip_peer_addr_t * fro
16481648
ele->value = val;
16491649
ele->wallclock = val->wallclock;
16501650

1651-
if ( FD_UNLIKELY( glob->need_push_cnt < FD_NEED_PUSH_MAX ) ) {
1652-
/* Remember that I need to push this value */
1653-
ulong i = ((glob->need_push_head + (glob->need_push_cnt++)) & (FD_NEED_PUSH_MAX-1U));
1654-
/* TODO: change this to vector idx/ptrs? */
1655-
glob->need_push[ i ] = val;
1656-
glob->metrics.push_crds_queue_cnt = glob->need_push_cnt;
1657-
} else {
1658-
INC_RECV_CRDS_DROP_METRIC( PUSH_QUEUE_FULL );
1659-
}
16601651

16611652
fd_crds_value_t * crd = retained_crds[ i ];
16621653

@@ -1829,10 +1820,10 @@ fd_gossip_push_updated_contact(fd_gossip_t * glob) {
18291820
if( (glob->now - glob->last_contact_time)<(long)1e9 )
18301821
return;
18311822

1832-
if (glob->last_contact_time != 0) {
1833-
fd_value_meta_t * ele = fd_value_meta_map_query(glob->value_metas, &glob->last_contact_info_v2_key, NULL);
1834-
if (ele != NULL) {
1835-
fd_value_vec_remove_ele( glob->values, ele->value ); /* Remove the old value from the vector */
1823+
if( glob->last_contact_time!=0 ) {
1824+
fd_value_meta_t * ele = fd_value_meta_map_query( glob->value_metas, &glob->last_contact_info_v2_key, NULL );
1825+
if( ele!=NULL ) {
1826+
ele->value->del = 1UL;
18361827
fd_value_meta_map_remove( glob->value_metas, &glob->last_contact_info_v2_key );
18371828
}
18381829
}
@@ -2112,11 +2103,14 @@ fd_gossip_push( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
21122103
/* Push an updated version of my contact info into values */
21132104
fd_gossip_push_updated_contact(glob);
21142105

2106+
ulong pending_values_cnt = fd_value_vec_cnt( glob->values ) - glob->need_push_head;
2107+
ulong need_push_cnt = fd_ulong_if( pending_values_cnt < FD_NEED_PUSH_MAX, pending_values_cnt, FD_NEED_PUSH_MAX );
2108+
21152109
/* Iterate across recent values */
2116-
ulong expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_PULL_TIMEOUT;
2117-
while (glob->need_push_cnt > 0) {
2118-
fd_value_t * msg = glob->need_push[ ((glob->need_push_head++) & (FD_NEED_PUSH_MAX-1)) ];
2119-
glob->need_push_cnt--;
2110+
ulong expire = FD_NANOSEC_TO_MILLI( glob->now ) - FD_GOSSIP_PULL_TIMEOUT;
2111+
while( need_push_cnt>0 ) {
2112+
fd_value_t * msg = &glob->values[ glob->need_push_head++ ];
2113+
need_push_cnt--;
21202114

21212115
if( msg->wallclock<expire )
21222116
continue;
@@ -2239,20 +2233,13 @@ fd_gossip_push_value_nolock( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_
22392233
val->key = key;
22402234
val->wallclock = ele->wallclock;
22412235
val->origin = *glob->public_key;
2236+
val->del = 0UL;
22422237

22432238
/* We store the serialized form for convenience */
22442239
fd_memcpy( val->data, buf, datalen );
22452240
val->datalen = datalen;
22462241

2247-
if (glob->need_push_cnt < FD_NEED_PUSH_MAX) {
2248-
/* Remember that I need to push this value */
2249-
ulong i = ((glob->need_push_head + (glob->need_push_cnt++)) & (FD_NEED_PUSH_MAX-1U));
2250-
glob->need_push[ i ] = val; /* Store the pointer to the value element */
2251-
} else {
2252-
INC_PUSH_CRDS_DROP_METRIC( PUSH_QUEUE_FULL );
2253-
}
2254-
2255-
glob->metrics.push_crds_queue_cnt = glob->need_push_cnt;
2242+
glob->metrics.push_crds_queue_cnt = fd_value_vec_cnt( glob->values ) - glob->need_push_head;
22562243
glob->metrics.push_crds[ data->discriminant ] += 1UL;
22572244
return 0;
22582245

@@ -2407,25 +2394,73 @@ fd_gossip_gettime( fd_gossip_t * glob ) {
24072394
return glob->now;
24082395
}
24092396

2397+
/* Single pass values vector compaction. This
2398+
preserves ordering, allowing us to use the
2399+
vector as a push queue.
2400+
See fd_gossip_cleanup_values for an example */
2401+
static ulong
2402+
fd_gossip_compact_values( fd_gossip_t * glob ) {
2403+
fd_value_t * vec = glob->values;
2404+
2405+
ulong start = 0;
2406+
ulong cur_count = fd_value_vec_cnt( vec );
2407+
/* find first element to delete */
2408+
for( ; start<cur_count ; start++ ) if( FD_UNLIKELY( vec[start].del ) ) break;
2409+
2410+
ulong next = start + 1;
2411+
2412+
ulong num_deleted = 0UL;
2413+
ulong push_head_snapshot = ULONG_MAX;
2414+
2415+
while( next<cur_count ) {
2416+
if( FD_UNLIKELY( vec[next].del ) ) {
2417+
if( next>(start + 1) ) {
2418+
/* move all values between start and next */
2419+
memmove( &vec[start - num_deleted],
2420+
&vec[start + 1],
2421+
(next - start - 1) * sizeof(fd_value_t) );
2422+
}
2423+
start = next;
2424+
next = start + 1;
2425+
num_deleted++;
2426+
/* Need to adjust push queue */
2427+
if( FD_UNLIKELY( glob->need_push_head > start &&
2428+
glob->need_push_head <= next ) ) {
2429+
push_head_snapshot = num_deleted;
2430+
}
2431+
} else {
2432+
next++;
2433+
}
2434+
}
2435+
2436+
glob->need_push_head -= fd_ulong_if( push_head_snapshot != ULONG_MAX, push_head_snapshot, num_deleted );
2437+
fd_value_vec_contract( glob->values, num_deleted );
2438+
glob->metrics.value_vec_cnt = fd_value_vec_cnt( glob->values );
2439+
FD_LOG_NOTICE(( "GOSSIP compacted %lu values", num_deleted ));
2440+
return num_deleted;
2441+
}
2442+
24102443
static void
24112444
fd_gossip_cleanup_values( fd_gossip_t * glob,
24122445
fd_pending_event_arg_t * arg ) {
24132446
(void)arg;
2414-
fd_gossip_add_pending( glob, fd_gossip_cleanup_values, fd_pending_event_arg_null(), glob->now + (long)3600e9 );
2447+
fd_gossip_add_pending( glob, fd_gossip_cleanup_values, fd_pending_event_arg_null(), glob->now + (long)15e9 );
24152448

24162449
ulong value_expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_VALUE_EXPIRE;
2417-
// ulong value_purge = value_expire - FD_GOSSIP_VALUE_EXPIRE/4U; // for purging, which removes data entries but retains the key
24182450
for( fd_value_meta_map_iter_t iter = fd_value_meta_map_iter_init( glob->value_metas );
24192451
!fd_value_meta_map_iter_done( glob->value_metas, iter );
24202452
iter = fd_value_meta_map_iter_next( glob->value_metas, iter ) ) {
24212453
fd_value_meta_t * ele = fd_value_meta_map_iter_ele( glob->value_metas, iter );
2422-
if ( ele->wallclock < value_expire) {
2423-
/* This value has expired, remove it from the value set and vector */
2424-
if( ele->value != NULL )
2425-
fd_value_vec_remove_ele( glob->values, ele->value ); /* Remove the old value from the vector */
2454+
if ( ele->wallclock<value_expire ) {
2455+
/* This value has expired, mark it for deletion in the value vector and remove from map */
2456+
if( ele->value!=NULL ){
2457+
ele->value->del = 1UL;
2458+
}
24262459
fd_value_meta_map_remove( glob->value_metas, &ele->key ); /* Remove from the value set */
24272460
}
24282461
}
2462+
2463+
fd_gossip_compact_values( glob );
24292464
glob->metrics.value_map_cnt = fd_value_meta_map_key_cnt( glob->value_metas );
24302465
glob->metrics.value_vec_cnt = fd_value_vec_cnt( glob->values );
24312466
}
@@ -2440,7 +2475,7 @@ fd_gossip_start( fd_gossip_t * glob ) {
24402475
fd_gossip_add_pending( glob, fd_gossip_refresh_push_states, fd_pending_event_arg_null(), glob->now + (long) 20e9 );
24412476
fd_gossip_add_pending( glob, fd_gossip_push, fd_pending_event_arg_null(), glob->now + (long) 1e8 );
24422477
fd_gossip_add_pending( glob, fd_gossip_make_prune, fd_pending_event_arg_null(), glob->now + (long) 30e9 );
2443-
fd_gossip_add_pending( glob, fd_gossip_cleanup_values, fd_pending_event_arg_null(), glob->now + (long)3600e9 );
2478+
fd_gossip_add_pending( glob, fd_gossip_cleanup_values, fd_pending_event_arg_null(), glob->now + (long) 15e9 );
24442479
fd_gossip_unlock( glob );
24452480
return 0;
24462481
}

0 commit comments

Comments
 (0)