Skip to content

Commit 3c93341

Browse files
committed
forest: fix forest publish edge cases
1 parent 090c57a commit 3c93341

File tree

6 files changed

+228
-12
lines changed

6 files changed

+228
-12
lines changed

src/discof/forest/fd_forest.c

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,10 +438,26 @@ fd_forest_publish( fd_forest_t * forest, ulong new_root_slot ) {
438438
fd_forest_ele_t * old_root_ele = fd_forest_pool_ele( pool, forest->root );
439439
fd_forest_ele_t * new_root_ele = ancestry_frontier_query( forest, new_root_slot );
440440

441-
# if FD_FOREST_USE_HANDHOLDING
442-
FD_TEST( new_root_ele ); /* caller error - not found */
443-
FD_TEST( new_root_ele->slot > old_root_ele->slot ); /* caller error - inval */
444-
# endif
441+
#if FD_FOREST_USE_HANDHOLDING
442+
if( FD_LIKELY( new_root_ele ) ) {
443+
FD_TEST( new_root_ele->slot > old_root_ele->slot ); /* caller error - inval */
444+
}
445+
#endif
446+
447+
/* Edge case where if we haven't been getting repairs, and we have a
448+
gap between the root and orphans. we publish forward to a slot that
449+
we don't have. This only case this should be happening is when we
450+
load a second incremental and that incremental slot lives in the
451+
gap. In that case this isn't a bug, but we should be treating this
452+
new root like the snapshot slot / init root. Should be happening
453+
very rarely given a well-functioning repair. */
454+
455+
if( FD_UNLIKELY( !new_root_ele ) ) {
456+
new_root_ele = acquire( forest, new_root_slot );
457+
new_root_ele->complete_idx = 0;
458+
new_root_ele->buffered_idx = 0;
459+
fd_forest_frontier_ele_insert( frontier, new_root_ele, pool );
460+
}
445461

446462
/* First, remove the previous root, and add it to a FIFO prune queue.
447463
head points to the queue head (initialized with old_root_ele). */
@@ -475,7 +491,56 @@ fd_forest_publish( fd_forest_t * forest, ulong new_root_slot ) {
475491
}
476492

477493
new_root_ele->parent = null; /* unlink new root from parent */
478-
forest->root = fd_forest_ancestry_idx_query( ancestry, &new_root_slot, null, pool );
494+
forest->root = fd_forest_ancestry_idx_query( ancestry, &new_root_slot, null, pool );
495+
forest->root = fd_ulong_if( forest->root == null, fd_forest_frontier_idx_query( frontier, &new_root_slot, null, pool ), forest->root );
496+
497+
/* If there is nothing on the frontier, we have hit an edge case
498+
during catching up where all of our frontiers were < the new root.
499+
In that case we need to continue repairing from the new root, so
500+
add it to the frontier. */
501+
502+
if( FD_UNLIKELY( fd_forest_frontier_iter_done( fd_forest_frontier_iter_init( frontier, pool ), frontier, pool ) ) ) {
503+
fd_forest_frontier_ele_insert( frontier, new_root_ele, pool );
504+
new_root_ele->complete_idx = 0;
505+
new_root_ele->buffered_idx = 0;
506+
advance_frontier( forest, new_root_ele->slot, 0 );
507+
}
508+
509+
/* Lastly, cleanup orphans if there orphan heads < new_root_slot.
510+
First, add any relevant orphans to the prune queue. FIXME: NEED TO REMOVE BEFORE MODIFYING NEXT PTR */
511+
512+
fd_forest_orphaned_t * orphaned = fd_forest_orphaned( forest );
513+
head = NULL;
514+
for( fd_forest_orphaned_iter_t iter = fd_forest_orphaned_iter_init( orphaned, pool );
515+
!fd_forest_orphaned_iter_done( iter, orphaned, pool );
516+
iter = fd_forest_orphaned_iter_next( iter, orphaned, pool ) ) {
517+
fd_forest_ele_t * ele = fd_forest_orphaned_iter_ele( iter, orphaned, pool );
518+
if( FD_UNLIKELY( ele->slot < new_root_slot ) ) {
519+
if( FD_UNLIKELY( head ) ) {
520+
head = ele;
521+
head->next = null;
522+
tail = ele;
523+
} else {
524+
tail->next = iter.ele_idx;
525+
tail = fd_forest_pool_ele( pool, tail->next );
526+
tail->next = null;
527+
}
528+
}
529+
}
530+
531+
/* Now BFS and clean up children of these orphan heads */
532+
while( head ) {
533+
fd_forest_ele_t * child = fd_forest_pool_ele( pool, head->child );
534+
while( FD_LIKELY( child ) ) { /* add children to prune queue first */
535+
tail->next = fd_forest_pool_idx( pool, child );
536+
tail = fd_forest_pool_ele( pool, tail->next );
537+
tail->next = null;
538+
child = fd_forest_pool_ele( pool, child->sibling );
539+
}
540+
fd_forest_orphaned_ele_remove( orphaned, &head->slot, NULL, pool ); /* remove myself */
541+
head = fd_forest_pool_ele( pool, head->next ); /* FIFO pop */
542+
fd_forest_pool_ele_release( pool, head ); /* free head */
543+
}
479544
return new_root_ele;
480545
}
481546

src/discof/repair/fd_fec_chainer.c

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,9 @@ fd_fec_chainer_init( fd_fec_chainer_t * chainer, ulong slot, uchar merkle_root[s
108108
FD_TEST( fd_fec_pool_free( chainer->pool ) );
109109
fd_fec_ele_t * root = fd_fec_pool_ele_acquire( chainer->pool );
110110
FD_TEST( root );
111-
root->key = slot << 32 | ( UINT_MAX-1 ); // maintain invariant that no fec_set_idx=UINT_MAX lives in pool_ele
111+
root->key = slot << 32 | ( 0 ); // maintain invariant that no fec_set_idx=UINT_MAX lives in pool_ele
112112
root->slot = slot;
113-
root->fec_set_idx = UINT_MAX-1;
113+
root->fec_set_idx = 0;
114114
root->data_cnt = 0;
115115
root->data_complete = 1;
116116
root->slot_complete = 1;
@@ -125,9 +125,10 @@ fd_fec_chainer_init( fd_fec_chainer_t * chainer, ulong slot, uchar merkle_root[s
125125
UINT_MAX -> UINT_MAX-1 -> get_pool_ele(UINT_MAX-1)*/
126126

127127
fd_fec_parent_t * p = fd_fec_parents_insert( chainer->parents, slot << 32 | UINT_MAX );
128-
p->parent_key = (slot << 32) | ( UINT_MAX - 1 );
128+
p->parent_key = (slot << 32) | ( 0 );
129129

130130
fd_fec_frontier_ele_insert( chainer->frontier, root, chainer->pool );
131+
chainer->root_fec = fd_fec_pool_idx( chainer->pool, root );
131132
return root;
132133
}
133134

@@ -146,9 +147,17 @@ fd_fec_chainer_query( fd_fec_chainer_t * chainer, ulong slot, uint fec_set_idx )
146147
return fec;
147148
}
148149

150+
static fd_fec_ele_t *
151+
fd_fec_chainer_remove( fd_fec_chainer_t * chainer, ulong key ) {
152+
fd_fec_ele_t * fec = fd_fec_frontier_ele_remove( chainer->frontier, &key, NULL, chainer->pool );
153+
fec = fd_ptr_if( !fec, fd_fec_ancestry_ele_remove( chainer->ancestry, &key, NULL, chainer->pool ), fec );
154+
fec = fd_ptr_if( !fec, fd_fec_orphaned_ele_remove( chainer->orphaned, &key, NULL, chainer->pool ), fec );
155+
return fec;
156+
}
157+
149158
static int
150159
is_last_fec( ulong key ){
151-
return ( (uint)fd_ulong_extract( key, 0, 31 ) & UINT_MAX ) == UINT_MAX; // lol fix
160+
return (uint)fd_ulong_extract( key, 0, 31 ) == UINT_MAX;
152161
}
153162

154163
static void
@@ -328,3 +337,62 @@ fd_fec_chainer_insert( fd_fec_chainer_t * chainer,
328337

329338
return ele;
330339
}
340+
341+
void
342+
fd_fec_chainer_publish( fd_fec_chainer_t * chainer, ulong new_root_slot ) {
343+
344+
/* 1. Prune children of the old root */
345+
fd_fec_ele_t * old_root = fd_fec_pool_ele( chainer->pool, chainer->root_fec );
346+
fd_fec_ele_t * new_root = fd_fec_chainer_query( chainer, new_root_slot, 0 );
347+
FD_TEST( old_root );
348+
if( FD_UNLIKELY( !new_root ) ) {
349+
/* Happens during second incremental snapshot load */
350+
351+
new_root = fd_fec_pool_ele_acquire( chainer->pool );
352+
new_root->key = new_root_slot << 32 | ( 0 ); // maintain invariant that no fec_set_idx=UINT_MAX lives in pool_ele
353+
new_root->slot = new_root_slot;
354+
new_root->fec_set_idx = 0;
355+
new_root->data_cnt = 0;
356+
new_root->data_complete = 1;
357+
new_root->slot_complete = 1;
358+
new_root->parent_off = 0;
359+
memset( new_root->chained_merkle_root, 0, FD_SHRED_MERKLE_ROOT_SZ );
360+
361+
fd_fec_parent_t * p = fd_fec_parents_insert( chainer->parents, new_root_slot << 32 | UINT_MAX );
362+
p->parent_key = (new_root_slot << 32) | ( 0 );
363+
364+
fd_fec_frontier_ele_insert( chainer->frontier, new_root, chainer->pool );
365+
}
366+
367+
fd_fec_queue_push_tail( chainer->queue, old_root->key );
368+
369+
while( FD_LIKELY( !fd_fec_queue_empty( chainer->queue ) ) ) {
370+
ulong key = fd_fec_queue_pop_head( chainer->queue );
371+
fd_fec_ele_t * ele = fd_fec_chainer_query( chainer, key >> 32, (uint)key );
372+
if( FD_UNLIKELY( !ele ) ) continue;
373+
374+
if( FD_UNLIKELY( ele->slot_complete ) ) {
375+
fd_fec_children_t * fec_children = fd_fec_children_query( chainer->children, ele->slot, NULL );
376+
if( FD_UNLIKELY( fec_children ) ) {
377+
for( ulong off = fd_slot_child_offs_const_iter_init( fec_children->child_offs );
378+
!fd_slot_child_offs_const_iter_done( off );
379+
off = fd_slot_child_offs_const_iter_next( fec_children->child_offs, off ) ) {
380+
ulong child_slot = ele->slot + off;
381+
382+
if( FD_UNLIKELY( child_slot == new_root_slot ) ) continue;
383+
384+
fd_fec_queue_push_tail( chainer->queue, child_slot << 32 | 0 );
385+
}
386+
}
387+
} else {
388+
ulong child_key = (ele->slot << 32) | (ele->key + ele->data_cnt);
389+
fd_fec_queue_push_tail( chainer->queue, child_key );
390+
}
391+
392+
/* Remove ele from the chainer. */
393+
394+
fd_fec_ele_t * remove = fd_fec_chainer_remove( chainer, ele->key );
395+
FD_TEST( remove == ele );
396+
}
397+
chainer->root_fec = fd_fec_pool_idx( chainer->pool, new_root );
398+
}

src/discof/repair/fd_fec_chainer.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ struct __attribute__((aligned(128UL))) fd_fec_chainer {
267267
fd_fec_children_t * children; /* map of slot->child_offs for fast O(1) querying */
268268
ulong * queue; /* queue of FEC keys for BFS chaining */
269269
fd_fec_out_t * out; /* queue of FEC keys to deliver to application */
270+
ulong root_fec; /* pool idx of the root FEC set */
270271
};
271272

272273
FD_PROTOTYPES_BEGIN
@@ -365,6 +366,11 @@ fd_fec_chainer_insert( fd_fec_chainer_t * chainer,
365366
uchar const merkle_root[static FD_SHRED_MERKLE_ROOT_SZ],
366367
uchar const chained_merkle_root[static FD_SHRED_MERKLE_ROOT_SZ] );
367368

369+
/* fd_fec_chainer_publish prunes the fec tree when the wmk is updated. */
370+
371+
void
372+
fd_fec_chainer_publish( fd_fec_chainer_t * chainer, ulong new_root );
373+
368374
FD_PROTOTYPES_END
369375

370376
#endif /* HEADER_fd_src_discof_repair_fd_fec_chainer_h */

src/discof/repair/fd_repair_tile.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -767,15 +767,17 @@ after_frag( fd_repair_tile_ctx_t * ctx,
767767

768768
ulong wmark = fd_fseq_query( ctx->wmark );
769769
if( FD_UNLIKELY( fd_forest_root_slot( ctx->forest ) == ULONG_MAX ) ) {
770+
FD_LOG_NOTICE(( "Forest initializing with root %lu", wmark ));
770771
fd_forest_init( ctx->forest, wmark );
771772
uchar mr[ FD_SHRED_MERKLE_ROOT_SZ ] = { 0 }; /* FIXME */
772773
fd_fec_chainer_init( ctx->fec_chainer, wmark, mr );
773774
FD_TEST( fd_forest_root_slot( ctx->forest ) != ULONG_MAX );
774-
FD_LOG_NOTICE(( "Forest initialized with root %lu", wmark ));
775775
ctx->prev_wmark = wmark;
776776
}
777+
777778
if( FD_UNLIKELY( ctx->prev_wmark < wmark ) ) {
778779
fd_forest_publish( ctx->forest, wmark );
780+
fd_fec_chainer_publish( ctx->fec_chainer, wmark );
779781
ctx->prev_wmark = wmark;
780782
// invalidate our repair iterator
781783
ctx->repair_iter = fd_forest_iter_init( ctx->forest );
@@ -841,6 +843,7 @@ after_frag( fd_repair_tile_ctx_t * ctx,
841843
ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
842844
reasm->cnt = out.fec_set_idx + out.data_cnt;
843845
fd_stem_publish( ctx->stem, REPLAY_OUT_IDX, sig, 0, 0, 0, tsorig, tspub );
846+
FD_LOG_INFO(( "REPLAY out %lu %u %u %d", out.slot, out.fec_set_idx, cnt, out.slot_complete ));
844847
if( FD_UNLIKELY( out.slot_complete ) ) {
845848
fd_reasm_remove( ctx->reasm, reasm );
846849
}

src/discof/repair/test_fec_chainer.c

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,77 @@ test_single_fec( fd_wksp_t * wksp ){
183183

184184
}
185185

186+
void
187+
test_publish( fd_wksp_t * wksp ){
188+
ulong fec_max = 32;
189+
190+
void * mem = fd_wksp_alloc_laddr( wksp, fd_fec_chainer_align(), fd_fec_chainer_footprint( fec_max ), 1UL );
191+
FD_TEST( mem );
192+
fd_fec_chainer_t * chainer = fd_fec_chainer_join( fd_fec_chainer_new( mem, fec_max, 0UL ) );
193+
194+
uchar mr_root[FD_SHRED_MERKLE_ROOT_SZ] = { 1 };
195+
fd_fec_ele_t * f0_64 = fd_fec_chainer_init( chainer, 1, mr_root );
196+
197+
FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &f0_64->key, NULL, chainer->pool ) == f0_64 );
198+
199+
/* Typical startup behavior, turbine orphan FECs added */
200+
fd_fec_chainer_insert( chainer, 10, 0, 32, 1, 1, 1, mr_root, mr_root );
201+
fd_fec_chainer_insert( chainer, 9, 0, 32, 1, 1, 1, mr_root, mr_root );
202+
203+
/* simulating no FECs chained, but a new root is published */
204+
ulong new_root = 2UL << 32 | 0;
205+
fd_fec_chainer_publish( chainer, 2 );
206+
207+
FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &new_root, NULL, chainer->pool ) );
208+
209+
/* Chain off of root slot for a bit */
210+
fd_fec_chainer_insert( chainer, 3, 0, 32, 1, 1, 1, mr_root, mr_root );
211+
fd_fec_chainer_insert( chainer, 4, 0, 32, 1, 1, 1, mr_root, mr_root );
212+
ulong new_frontier = 4UL << 32 | 0;
213+
FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &new_frontier, NULL, chainer->pool ) );
214+
215+
/* Publish to ancestor */
216+
fd_fec_chainer_publish( chainer, 3 );
217+
FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &new_frontier, NULL, chainer->pool ) );
218+
219+
/* Make a tree
220+
221+
3 - 4 - 8 - 9 - 10
222+
\ 5 - 6 - 7
223+
224+
*/
225+
226+
fd_fec_chainer_insert( chainer, 5, 0, 32, 1, 1, 2, mr_root, mr_root );
227+
fd_fec_chainer_insert( chainer, 6, 0, 32, 1, 1, 1, mr_root, mr_root );
228+
fd_fec_chainer_insert( chainer, 7, 0, 32, 1, 1, 1, mr_root, mr_root );
229+
fd_fec_chainer_insert( chainer, 8, 0, 32, 1, 1, 4, mr_root, mr_root );
230+
231+
uint frontier_cnt = 0;
232+
ulong frontier_keys[2] = { 7UL << 32 | 0, 10UL << 32 | 0 };
233+
for( fd_fec_frontier_iter_t iter = fd_fec_frontier_iter_init( chainer->frontier, chainer->pool ); !fd_fec_frontier_iter_done( iter, chainer->frontier, chainer->pool ); iter = fd_fec_frontier_iter_next( iter, chainer->frontier, chainer->pool ) ){
234+
frontier_cnt++;
235+
}
236+
FD_TEST( frontier_cnt == sizeof(frontier_keys) / sizeof(ulong) );
237+
for( uint i = 0; i < sizeof(frontier_keys) / sizeof(ulong); i++ ){
238+
FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &frontier_keys[i], NULL, chainer->pool ) );
239+
}
240+
241+
/* Publish down the tree */
242+
fd_fec_chainer_publish( chainer, 4 );
243+
new_frontier = 10UL << 32 | 0;
244+
FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &new_frontier, NULL, chainer->pool ) );
245+
246+
FD_TEST( fd_fec_chainer_query( chainer, 4, 0 ) );
247+
FD_TEST( !fd_fec_chainer_query( chainer, 5, 0 ) );
248+
FD_TEST( !fd_fec_chainer_query( chainer, 6, 0 ) );
249+
FD_TEST( !fd_fec_chainer_query( chainer, 7, 0 ) );
250+
FD_TEST( fd_fec_chainer_query( chainer, 8, 0 ) );
251+
FD_TEST( fd_fec_chainer_query( chainer, 9, 0 ) );
252+
FD_TEST( fd_fec_chainer_query( chainer, 10, 0 ) );
253+
254+
fd_wksp_free_laddr( fd_fec_chainer_delete( fd_fec_chainer_leave( chainer ) ) );
255+
}
256+
186257
int
187258
main( int argc, char ** argv ) {
188259
fd_boot( &argc, &argv );
@@ -195,6 +266,7 @@ main( int argc, char ** argv ) {
195266

196267
// test_fec_ordering( wksp );
197268
// test_single_fec( wksp );
269+
test_publish( wksp );
198270

199271
ulong sig = fd_disco_repair_replay_sig( 3508496, 1, 32, 128 );
200272
FD_TEST( fd_disco_repair_replay_sig_slot( sig ) == 3508496 );

src/discof/replay/fd_replay_tile.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1656,6 +1656,7 @@ read_snapshot( void * _ctx,
16561656
/* TODO: If prefetching the manifest is enabled it leads to
16571657
incorrect snapshot loads. This needs to be looked into. */
16581658
if( strlen( incremental )>0UL ) {
1659+
FD_LOG_NOTICE(("LOADING INCREMENTAL SNAPSHOT"));
16591660
uchar * tmp_mem = fd_spad_alloc_check( ctx->runtime_spad, fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
16601661

16611662
fd_snapshot_load_ctx_t * tmp_snap_ctx = fd_snapshot_load_new( tmp_mem,
@@ -1696,13 +1697,15 @@ read_snapshot( void * _ctx,
16961697
/* If we don't have an incremental snapshot, load the manifest and the status cache and initialize
16971698
the objects because we don't have these from the incremental snapshot. */
16981699
if( strlen( incremental )<=0UL ) {
1700+
FD_LOG_NOTICE(("LOADING MANIFEST AND STATUS CACHE BECAUSE NO INCREMENTAL SNAPSHOT"));
16991701
fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL,
17001702
FD_SNAPSHOT_RESTORE_MANIFEST | FD_SNAPSHOT_RESTORE_STATUS_CACHE );
17011703

17021704
kickoff_repair_orphans( ctx, stem );
17031705
/* If we don't have an incremental snapshot, we can still kick off
17041706
sending the stake weights and snapshot slot to repair. */
17051707
} else {
1708+
FD_LOG_NOTICE(("LOADING MANIFEST AND STATUS CACHE W/ INCREMENTAL SNAPSHOT"));
17061709
/* If we have an incremental snapshot, load the manifest and the status cache,
17071710
and don't initialize the objects because we did this above from the incremental snapshot. */
17081711
fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL, FD_SNAPSHOT_RESTORE_NONE );
@@ -1717,6 +1720,7 @@ read_snapshot( void * _ctx,
17171720

17181721
/* The slot of the full snapshot should be used as the base slot to verify the incremental snapshot,
17191722
not the slot context's slot - which is the slot of the incremental, not the full snapshot. */
1723+
FD_LOG_NOTICE(( "LOADING ANOTHER INCREMENTAL SNAPSHOT"));
17201724
fd_snapshot_load_all( incremental,
17211725
ctx->incremental_src_type,
17221726
NULL,
@@ -1885,10 +1889,8 @@ init_after_snapshot( fd_replay_tile_ctx_t * ctx,
18851889
are no funk txns to publish, and all rooted slots have already
18861890
been registered in the txncache when we loaded the snapshot. */
18871891

1888-
fd_funk_txn_xid_t xid = { .ul = { root, root } };
18891892
if( FD_LIKELY( ctx->blockstore ) ) fd_blockstore_publish( ctx->blockstore, ctx->blockstore_fd, root );
18901893
if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, root );
1891-
if( FD_LIKELY( ctx->funk ) ) funk_and_txncache_publish( ctx, root, &xid );
18921894
if( FD_LIKELY( ctx->epoch_forks ) ) fd_epoch_forks_publish( ctx->epoch_forks, root );
18931895

18941896
fd_fseq_update( ctx->published_wmark, root );

0 commit comments

Comments
 (0)