From 52407015fc5e119d4564afff8307dfbb181773d7 Mon Sep 17 00:00:00 2001 From: Emily Wang Date: Thu, 26 Jun 2025 19:29:48 +0000 Subject: [PATCH] repair: fix forest and fec chainer publish edge cases --- src/app/firedancer/config/default.toml | 2 +- src/discof/forest/fd_forest.c | 187 +++++++++++++------------ src/discof/forest/fd_forest.h | 16 ++- src/discof/forest/test_forest.c | 110 +++++++++++++++ src/discof/repair/fd_fec_chainer.c | 81 ++++++++++- src/discof/repair/fd_fec_chainer.h | 6 + src/discof/repair/fd_repair_tile.c | 4 +- src/discof/repair/test_fec_chainer.c | 72 ++++++++++ 8 files changed, 381 insertions(+), 97 deletions(-) diff --git a/src/app/firedancer/config/default.toml b/src/app/firedancer/config/default.toml index f05933224a..10fb511007 100644 --- a/src/app/firedancer/config/default.toml +++ b/src/app/firedancer/config/default.toml @@ -1148,7 +1148,7 @@ user = "" # startup, when the validator is catching up from the snapshot # slot, but still maintaining and receiving new turbine slots. # Must be power of 2 - slot_max = 1024 + slot_max = 4096 [tiles.replay] cluster_version = "1.18.0" diff --git a/src/discof/forest/fd_forest.c b/src/discof/forest/fd_forest.c index c63daec8ef..be16a333a7 100644 --- a/src/discof/forest/fd_forest.c +++ b/src/discof/forest/fd_forest.c @@ -38,19 +38,21 @@ fd_forest_new( void * shmem, ulong ele_max, ulong seed ) { FD_SCRATCH_ALLOC_INIT( l, shmem ); forest = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(), sizeof(fd_forest_t) ); void * ver = FD_SCRATCH_ALLOC_APPEND( l, fd_fseq_align(), fd_fseq_footprint() ); - void * pool = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_pool_align(), fd_forest_pool_footprint( ele_max ) ); + void * pool = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_pool_align(), fd_forest_pool_footprint ( ele_max ) ); void * ancestry = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_ancestry_align(), fd_forest_ancestry_footprint( ele_max ) ); void * frontier = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_frontier_align(), fd_forest_frontier_footprint( ele_max ) ); void * orphaned = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_orphaned_align(), fd_forest_orphaned_footprint( ele_max ) ); + void * deque = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_deque_align(), fd_forest_deque_footprint ( ele_max ) ); FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_forest_align() ) == (ulong)shmem + footprint ); forest->root = ULONG_MAX; forest->wksp_gaddr = fd_wksp_gaddr_fast( wksp, forest ); - forest->ver_gaddr = fd_wksp_gaddr_fast( wksp, fd_fseq_join ( fd_fseq_new ( ver, FD_FOREST_VER_UNINIT ) ) ); - forest->pool_gaddr = fd_wksp_gaddr_fast( wksp, fd_forest_pool_join ( fd_forest_pool_new ( pool, ele_max ) ) ); - forest->ancestry_gaddr = fd_wksp_gaddr_fast( wksp, fd_forest_ancestry_join( fd_forest_ancestry_new( ancestry, ele_max, seed ) ) ); - forest->frontier_gaddr = fd_wksp_gaddr_fast( wksp, fd_forest_frontier_join( fd_forest_frontier_new( frontier, ele_max, seed ) ) ); - forest->orphaned_gaddr = fd_wksp_gaddr_fast( wksp, fd_forest_orphaned_join( fd_forest_orphaned_new( orphaned, ele_max, seed ) ) ); + forest->ver_gaddr = fd_wksp_gaddr_fast( wksp, fd_fseq_join ( fd_fseq_new ( ver, FD_FOREST_VER_UNINIT ) ) ); + forest->pool_gaddr = fd_wksp_gaddr_fast( wksp, fd_forest_pool_join ( fd_forest_pool_new ( pool, ele_max ) ) ); + forest->ancestry_gaddr = fd_wksp_gaddr_fast( wksp, fd_forest_ancestry_join( fd_forest_ancestry_new( ancestry, ele_max, seed ) ) ); + forest->frontier_gaddr = fd_wksp_gaddr_fast( wksp, fd_forest_frontier_join( fd_forest_frontier_new( frontier, ele_max, seed ) ) ); + forest->orphaned_gaddr = fd_wksp_gaddr_fast( wksp, fd_forest_orphaned_join( fd_forest_orphaned_new( orphaned, ele_max, seed ) ) ); + forest->deque_gaddr = fd_wksp_gaddr_fast( wksp, fd_forest_deque_join ( fd_forest_deque_new ( deque, ele_max ) ) ); FD_COMPILER_MFENCE(); FD_VOLATILE( forest->magic ) = FD_FOREST_MAGIC; @@ -126,7 +128,6 @@ fd_forest_init( fd_forest_t * forest, ulong root_slot ) { fd_forest_ele_t * root_ele = fd_forest_pool_ele_acquire( pool ); root_ele->slot = root_slot; - root_ele->prev = null; root_ele->parent = null; root_ele->child = null; root_ele->sibling = null; @@ -188,16 +189,9 @@ fd_forest_verify( fd_forest_t const * forest ) { return 0; } -/* query queries for a connected ele keyed by slot. does not return - orphaned ele. */ - -static fd_forest_ele_t * -ancestry_frontier_query( fd_forest_t * forest, ulong slot ) { - fd_forest_ele_t * pool = fd_forest_pool( forest ); - fd_forest_ele_t * ele = NULL; - ele = fd_forest_ancestry_ele_query( fd_forest_ancestry( forest ), &slot, NULL, pool ); - ele = fd_ptr_if( !ele, fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &slot, NULL, pool ), ele ); - return ele; +FD_FN_PURE static inline ulong * +fd_forest_deque( fd_forest_t * forest ) { + return fd_wksp_laddr_fast( fd_forest_wksp( forest ), forest->deque_gaddr ); } /* remove removes and returns a connected ele from ancestry or frontier @@ -233,41 +227,6 @@ link( fd_forest_t * forest, fd_forest_ele_t * parent, fd_forest_ele_t * child ) child->parent = fd_forest_pool_idx( pool, parent ); } -/* link_orphans performs a BFS beginning from head using BFS. head is - the first element of a linked list representing the BFS queue. If the - starting orphan is connected to the ancestry tree (ie. its parent is - in the map), it is linked to the tree and removed from the orphaned - map, and any of its orphaned children are added to the queue (linking - a parent also links its direct children). Otherwise it remains in the - orphaned map. The BFS continues until the queue is empty. */ - -FD_FN_UNUSED static void -link_orphans( fd_forest_t * forest, fd_forest_ele_t * head ) { - fd_forest_ele_t * pool = fd_forest_pool( forest ); - ulong null = fd_forest_pool_idx_null( pool ); - fd_forest_ancestry_t * ancestry = fd_forest_ancestry( forest ); - fd_forest_orphaned_t * orphaned = fd_forest_orphaned( forest ); - fd_forest_ele_t * tail = head; - fd_forest_ele_t * prev = NULL; - while( FD_LIKELY( head ) ) { /* while queue is non-empty */ - if( FD_LIKELY( fd_forest_orphaned_ele_remove( orphaned, &head->slot, NULL, pool ) ) ) { /* head is orphan root */ - fd_forest_ancestry_ele_insert( ancestry, head, pool ); - fd_forest_ele_t * child = fd_forest_pool_ele( pool, head->child ); - while( FD_LIKELY( child ) ) { /* append children to frontier */ - tail->prev = fd_forest_pool_idx( pool, child ); /* safe to overwrite prev */ - tail = fd_forest_pool_ele( pool, tail->prev ); - tail->prev = null; - ulong sibling = child->sibling; - child->sibling = null; - child = fd_forest_pool_ele( pool, sibling ); - } - } - prev = head; - head = fd_forest_pool_ele( pool, head->prev ); - prev->prev = null; - } -} - /* advance_frontier attempts to advance the frontier beginning from slot using BFS. head is the first element of a linked list representing the BFS queue. A slot can be advanced if all shreds for the block @@ -276,20 +235,24 @@ link_orphans( fd_forest_t * forest, fd_forest_ele_t * head ) { static void advance_frontier( fd_forest_t * forest, ulong slot, ushort parent_off ) { fd_forest_ele_t * pool = fd_forest_pool( forest ); - ulong null = fd_forest_pool_idx_null( pool ); fd_forest_ancestry_t * ancestry = fd_forest_ancestry( forest ); fd_forest_frontier_t * frontier = fd_forest_frontier( forest ); + ulong * queue = fd_forest_deque( forest ); fd_forest_ele_t * ele; ele = fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &slot, NULL, pool ); ulong parent_slot = slot - parent_off; ele = fd_ptr_if( !ele, fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &parent_slot, NULL, pool ), ele ); + if( FD_UNLIKELY( !ele ) ) return; - fd_forest_ele_t * head = ele; - fd_forest_ele_t * tail = head; - fd_forest_ele_t * prev = NULL; +# if FD_FOREST_USE_HANDHOLDING + FD_TEST( fd_forest_deque_cnt( queue ) == 0 ); +# endif - while( FD_LIKELY( head ) ) { + /* BFS elements as pool idxs */ + fd_forest_deque_push_tail( queue, fd_forest_pool_idx( pool, ele ) ); + while( FD_LIKELY( fd_forest_deque_cnt( queue ) ) ) { + fd_forest_ele_t * head = fd_forest_pool_ele( pool, fd_forest_deque_pop_head( queue ) ); fd_forest_ele_t * child = fd_forest_pool_ele( pool, head->child ); if( FD_LIKELY( child && head->complete_idx != UINT_MAX && head->buffered_idx == head->complete_idx ) ) { fd_forest_frontier_ele_remove( frontier, &head->slot, NULL, pool ); @@ -297,15 +260,11 @@ advance_frontier( fd_forest_t * forest, ulong slot, ushort parent_off ) { while( FD_LIKELY( child ) ) { /* append children to frontier */ fd_forest_ancestry_ele_remove( ancestry, &child->slot, NULL, pool ); fd_forest_frontier_ele_insert( frontier, child, pool ); - tail->prev = fd_forest_pool_idx( pool, child ); - tail = fd_forest_pool_ele( pool, tail->prev ); - tail->prev = fd_forest_pool_idx_null( pool ); - child = fd_forest_pool_ele( pool, child->sibling ); + + fd_forest_deque_push_tail( queue, fd_forest_pool_idx( pool, child ) ); + child = fd_forest_pool_ele( pool, child->sibling ); } } - prev = head; - head = fd_forest_pool_ele( pool, head->prev ); - prev->prev = null; } } @@ -330,7 +289,6 @@ acquire( fd_forest_t * forest, ulong slot ) { ulong null = fd_forest_pool_idx_null( pool ); ele->slot = slot; - ele->prev = null; ele->next = null; ele->parent = null; ele->child = null; @@ -382,9 +340,6 @@ insert( fd_forest_t * forest, ulong slot, ushort parent_off ) { fd_forest_ele_t * fd_forest_query( fd_forest_t * forest, ulong slot ) { -# if FD_FOREST_USE_HANDHOLDING - FD_TEST( slot > fd_forest_root_slot( forest ) ); /* caller error - inval */ -# endif return query( forest, slot ); } @@ -433,50 +388,108 @@ fd_forest_publish( fd_forest_t * forest, ulong new_root_slot ) { fd_forest_ancestry_t * ancestry = fd_forest_ancestry( forest ); fd_forest_frontier_t * frontier = fd_forest_frontier( forest ); + fd_forest_orphaned_t * orphaned = fd_forest_orphaned( forest ); fd_forest_ele_t * pool = fd_forest_pool( forest ); ulong null = fd_forest_pool_idx_null( pool ); + ulong * queue = fd_forest_deque( forest ); fd_forest_ele_t * old_root_ele = fd_forest_pool_ele( pool, forest->root ); - fd_forest_ele_t * new_root_ele = ancestry_frontier_query( forest, new_root_slot ); + fd_forest_ele_t * new_root_ele = query( forest, new_root_slot ); # if FD_FOREST_USE_HANDHOLDING - FD_TEST( new_root_ele ); /* caller error - not found */ - FD_TEST( new_root_ele->slot > old_root_ele->slot ); /* caller error - inval */ + if( FD_LIKELY( new_root_ele ) ) { + FD_TEST( new_root_ele->slot > old_root_ele->slot ); /* caller error - inval */ + } # endif + /* Edge case where if we haven't been getting repairs, and we have a + gap between the root and orphans. we publish forward to a slot that + we don't have. This only case this should be happening is when we + load a second incremental and that incremental slot lives in the + gap. In that case this isn't a bug, but we should be treating this + new root like the snapshot slot / init root. Should be happening + very rarely given a well-functioning repair. */ + + if( FD_UNLIKELY( !new_root_ele ) ) { + new_root_ele = acquire( forest, new_root_slot ); + new_root_ele->complete_idx = 0; + new_root_ele->buffered_idx = 0; + fd_forest_frontier_ele_insert( frontier, new_root_ele, pool ); + } + /* First, remove the previous root, and add it to a FIFO prune queue. head points to the queue head (initialized with old_root_ele). */ - +# if FD_FOREST_USE_HANDHOLDING + FD_TEST( fd_forest_deque_cnt( queue ) == 0 ); +# endif fd_forest_ele_t * head = ancestry_frontier_remove( forest, old_root_ele->slot ); - head->next = null; - fd_forest_ele_t * tail = head; + if( FD_LIKELY( head ) ) fd_forest_deque_push_tail( queue, fd_forest_pool_idx( pool, head ) ); /* Second, BFS down the tree, inserting each ele into the prune queue except for the new root. Loop invariant: head always descends from old_root_ele and never descends from new_root_ele. */ - while( head ) { + while( FD_LIKELY( fd_forest_deque_cnt( queue ) ) ) { + head = fd_forest_pool_ele( pool, fd_forest_deque_pop_head( queue ) ); fd_forest_ele_t * child = fd_forest_pool_ele( pool, head->child ); while( FD_LIKELY( child ) ) { if( FD_LIKELY( child != new_root_ele ) ) { /* do not prune new root or descendants */ - ulong idx = fd_forest_ancestry_idx_remove( ancestry, &child->slot, null, pool ); - idx = fd_ulong_if( idx != null, idx, fd_forest_frontier_idx_remove( frontier, &child->slot, null, pool ) ); - tail->next = idx; /* insert prune queue */ -# if FD_FOREST_USE_HANDHOLDING - FD_TEST( tail->next != null ); /* programming error in BFS */ -# endif - tail = fd_forest_pool_ele( pool, tail->next ); /* advance prune queue */ - tail->next = null; + ulong idx = fd_forest_ancestry_idx_remove( ancestry, &child->slot, null, pool ); + idx = fd_ulong_if( idx != null, idx, fd_forest_frontier_idx_remove( frontier, &child->slot, null, pool ) ); + fd_forest_deque_push_tail( queue, idx ); } child = fd_forest_pool_ele( pool, child->sibling ); } - fd_forest_ele_t * next = fd_forest_pool_ele( pool, head->next ); /* FIFO pop */ - fd_forest_pool_ele_release( pool, head ); /* free head */ - head = next; + fd_forest_pool_ele_release( pool, head ); + } + + /* If there is nothing on the frontier, we have hit an edge case + during catching up where all of our frontiers were < the new root. + In that case we need to continue repairing from the new root, so + add it to the frontier. */ + + if( FD_UNLIKELY( fd_forest_frontier_iter_done( fd_forest_frontier_iter_init( frontier, pool ), frontier, pool ) ) ) { + fd_forest_ele_t * remove = fd_forest_ancestry_ele_remove( ancestry, &new_root_ele->slot, NULL, pool ); + if( FD_UNLIKELY( !remove ) ) { + /* Very rare case where during second incremental load we could publish to an orphaned slot */ + remove = fd_forest_orphaned_ele_remove( orphaned, &new_root_ele->slot, NULL, pool ); + } + FD_TEST( remove == new_root_ele ); + fd_forest_frontier_ele_insert( frontier, new_root_ele, pool ); + new_root_ele->complete_idx = 0; + new_root_ele->buffered_idx = 0; + advance_frontier( forest, new_root_ele->slot, 0 ); } new_root_ele->parent = null; /* unlink new root from parent */ - forest->root = fd_forest_ancestry_idx_query( ancestry, &new_root_slot, null, pool ); + forest->root = fd_forest_pool_idx( pool, new_root_ele ); + + /* Lastly, cleanup orphans if there orphan heads < new_root_slot. + First, add any relevant orphans to the prune queue. */ + + for( fd_forest_orphaned_iter_t iter = fd_forest_orphaned_iter_init( orphaned, pool ); + !fd_forest_orphaned_iter_done( iter, orphaned, pool ); + iter = fd_forest_orphaned_iter_next( iter, orphaned, pool ) ) { + fd_forest_ele_t * ele = fd_forest_orphaned_iter_ele( iter, orphaned, pool ); + if( FD_UNLIKELY( ele->slot < new_root_slot ) ) { + fd_forest_deque_push_tail( queue, fd_forest_pool_idx( pool, ele ) ); + } + } + + /* Now BFS and clean up children of these orphan heads */ + while( FD_UNLIKELY( fd_forest_deque_cnt( queue ) ) ) { + head = fd_forest_pool_ele( pool, fd_forest_deque_pop_head( queue ) ); + fd_forest_ele_t * child = fd_forest_pool_ele( pool, head->child ); + while( FD_LIKELY( child ) ) { + if( FD_LIKELY( child != new_root_ele ) ) { + fd_forest_deque_push_tail( queue, fd_forest_pool_idx( pool, child ) ); + } + child = fd_forest_pool_ele( pool, child->sibling ); + } + ulong remove = fd_forest_orphaned_idx_remove( orphaned, &head->slot, null, pool ); /* remove myself */ + remove = fd_ulong_if( remove == null, fd_forest_ancestry_idx_remove( ancestry, &head->slot, null, pool ), remove ); + fd_forest_pool_ele_release( pool, head ); /* free head */ + } return new_root_ele; } diff --git a/src/discof/forest/fd_forest.h b/src/discof/forest/fd_forest.h index 530fcba4be..cb17b572b3 100644 --- a/src/discof/forest/fd_forest.h +++ b/src/discof/forest/fd_forest.h @@ -45,7 +45,6 @@ struct __attribute__((aligned(128UL))) fd_forest_ele { ulong slot; /* map key */ - ulong prev; /* internal use by link_orphans */ ulong next; /* internal use by fd_pool, fd_map_chain */ ulong parent; /* pool idx of the parent in the tree */ ulong child; /* pool idx of the left-child */ @@ -79,6 +78,12 @@ typedef struct fd_forest_ele fd_forest_ele_t; #define MAP_KEY slot #include "../../util/tmpl/fd_map_chain.c" +/* Internal use only for BFSing */ +#define DEQUE_NAME fd_forest_deque +#define DEQUE_T ulong +#include "../../util/tmpl/fd_deque_dynamic.c" + + /* fd_forest_t is the top-level structure that holds the root of the tree, as well as the memory pools and map structures. @@ -111,6 +116,7 @@ struct __attribute__((aligned(128UL))) fd_forest { ulong ancestry_gaddr; /* wksp_gaddr of fd_forest_ancestry */ ulong frontier_gaddr; /* map of slot to ele (leaf that needs repair) */ ulong orphaned_gaddr; /* map of parent_slot to singly-linked list of ele orphaned by that parent slot */ + ulong deque_gaddr; /* wksp gaddr of fd_forest_deque. internal use only for BFSing */ ulong magic; /* ==FD_FOREST_MAGIC */ }; typedef struct fd_forest fd_forest_t; @@ -137,13 +143,15 @@ fd_forest_footprint( ulong ele_max ) { FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( + FD_LAYOUT_APPEND( FD_LAYOUT_INIT, - alignof(fd_forest_t), sizeof(fd_forest_t) ), - fd_fseq_align(), fd_fseq_footprint() ), - fd_forest_pool_align(), fd_forest_pool_footprint( ele_max ) ), + alignof(fd_forest_t), sizeof(fd_forest_t) ), + fd_fseq_align(), fd_fseq_footprint() ), + fd_forest_pool_align(), fd_forest_pool_footprint ( ele_max ) ), fd_forest_ancestry_align(), fd_forest_ancestry_footprint( ele_max ) ), fd_forest_frontier_align(), fd_forest_frontier_footprint( ele_max ) ), fd_forest_orphaned_align(), fd_forest_orphaned_footprint( ele_max ) ), + fd_forest_deque_align(), fd_forest_deque_footprint ( ele_max ) ), fd_forest_align() ); } diff --git a/src/discof/forest/test_forest.c b/src/discof/forest/test_forest.c index 7e6f0a2b0a..c9a66910b1 100644 --- a/src/discof/forest/test_forest.c +++ b/src/discof/forest/test_forest.c @@ -79,6 +79,115 @@ test_publish( fd_wksp_t * wksp ) { } } +void +test_publish_incremental( fd_wksp_t * wksp ){ + /* as the name suggests. tests the complications introduced by loading + two incremental snapshots */ + + ulong ele_max = 32UL; + void * mem = fd_wksp_alloc_laddr( wksp, fd_forest_align(), fd_forest_footprint( ele_max ), 1UL ); + FD_TEST( mem ); + fd_forest_t * forest = fd_forest_join( fd_forest_new( mem, ele_max, 42UL /* seed */ ) ); + + /* 1. Try publishing to a slot that doesnt exist + + 0 10 -> 11 + + */ + + fd_forest_init( forest, 0 ); + fd_forest_data_shred_insert( forest, 11, 1, 0, 0, 1, 1 ); + + ulong new_root = 1; + fd_forest_publish( forest, new_root ); + FD_TEST( fd_forest_root_slot( forest ) == new_root ); + FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &new_root, NULL, fd_forest_pool( forest ) ) ); + FD_TEST( !fd_forest_query( forest, 0 ) ); + + /* 2. Try publishing to a slot on the frontier + + 1 -> 2 -> 3 10 -> 11 + + */ + + fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 3, 1, 0, 0, 1, 1 ); + + ulong frontier = 3; + FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &frontier, NULL, fd_forest_pool( forest ) ) ); + fd_forest_publish( forest, frontier ); + FD_TEST( fd_forest_root_slot( forest ) == frontier ); + FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &frontier, NULL, fd_forest_pool( forest ) ) ); + FD_TEST( !fd_forest_query( forest, 1 ) ); + FD_TEST( !fd_forest_query( forest, 2 ) ); + FD_TEST( fd_forest_query( forest, 10 ) ); + FD_TEST( fd_forest_query( forest, 11 ) ); + + /* 3. Try publishing to a slot in ancestry but in front of the frontier + + frontier new_root + 3 -> 4 -> 5 -> 6 -> 7 10 -> 11 + + */ + + fd_forest_data_shred_insert( forest, 4, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 5, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 6, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 7, 1, 0, 0, 0, 0 ); + + frontier = 4; + new_root = 6; + FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &frontier, NULL, fd_forest_pool( forest ) ) ); + fd_forest_publish( forest, new_root ); + FD_TEST( fd_forest_root_slot( forest ) == new_root ); + frontier = 7; + FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &frontier, NULL, fd_forest_pool( forest ) ) ); + FD_TEST( !fd_forest_query( forest, 3 ) ); + FD_TEST( !fd_forest_query( forest, 4 ) ); + FD_TEST( !fd_forest_query( forest, 5 ) ); + + /* 4. Try publishing to an orphan slot + + 6 -> 7 10 -> 11 + 8 -> 9 (should get pruned) + */ + + fd_forest_data_shred_insert( forest, 9, 1, 0, 0, 0, 0 ); + + new_root = 10; + frontier = 11; + + fd_forest_publish( forest, new_root); + FD_TEST( !fd_forest_verify( forest ) ); + FD_TEST( fd_forest_root_slot( forest ) == new_root ); + FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &frontier, NULL, fd_forest_pool( forest ) ) ); + FD_TEST( !fd_forest_query( forest, 6 ) ); + FD_TEST( !fd_forest_query( forest, 7 ) ); + FD_TEST( !fd_forest_query( forest, 8 ) ); + FD_TEST( !fd_forest_query( forest, 9 ) ); + FD_TEST( fd_forest_query( forest, 10 ) ); + FD_TEST( fd_forest_query( forest, 11 ) ); + + /* 5. Try publishing to an orphan slot that is not a "head" of orphans + (publish) + 10 -> 11 14 -> 15 -> 16 + + */ + + fd_forest_data_shred_insert( forest, 14, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 15, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 16, 1, 0, 0, 0, 0 ); + + new_root = 15; + frontier = 16; + fd_forest_publish( forest, new_root ); + FD_TEST( !fd_forest_verify( forest ) ); + FD_TEST( fd_forest_root_slot( forest ) == new_root ); + FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &frontier, NULL, fd_forest_pool( forest ) ) ); + FD_TEST( !fd_forest_query( forest, 10 ) ); + FD_TEST( !fd_forest_query( forest, 11 ) ); + FD_TEST( !fd_forest_query( forest, 14 ) ); +} #define SORT_NAME sort #define SORT_KEY_T ulong #include "../../util/tmpl/fd_sort.c" @@ -520,6 +629,7 @@ main( int argc, char ** argv ) { FD_TEST( wksp ); test_publish( wksp ); + test_publish_incremental( wksp ); test_out_of_order( wksp ); test_forks( wksp ); // test_print_tree( wksp ); diff --git a/src/discof/repair/fd_fec_chainer.c b/src/discof/repair/fd_fec_chainer.c index fa1590d165..be0dbb208b 100644 --- a/src/discof/repair/fd_fec_chainer.c +++ b/src/discof/repair/fd_fec_chainer.c @@ -108,9 +108,9 @@ fd_fec_chainer_init( fd_fec_chainer_t * chainer, ulong slot, uchar merkle_root[s FD_TEST( fd_fec_pool_free( chainer->pool ) ); fd_fec_ele_t * root = fd_fec_pool_ele_acquire( chainer->pool ); FD_TEST( root ); - root->key = slot << 32 | ( UINT_MAX-1 ); // maintain invariant that no fec_set_idx=UINT_MAX lives in pool_ele + root->key = slot << 32 | ( 0 ); // maintain invariant that no fec_set_idx=UINT_MAX lives in pool_ele root->slot = slot; - root->fec_set_idx = UINT_MAX-1; + root->fec_set_idx = 0; root->data_cnt = 0; root->data_complete = 1; root->slot_complete = 1; @@ -125,9 +125,10 @@ fd_fec_chainer_init( fd_fec_chainer_t * chainer, ulong slot, uchar merkle_root[s UINT_MAX -> UINT_MAX-1 -> get_pool_ele(UINT_MAX-1)*/ fd_fec_parent_t * p = fd_fec_parents_insert( chainer->parents, slot << 32 | UINT_MAX ); - p->parent_key = (slot << 32) | ( UINT_MAX - 1 ); + p->parent_key = (slot << 32) | ( 0 ); fd_fec_frontier_ele_insert( chainer->frontier, root, chainer->pool ); + chainer->root_fec = fd_fec_pool_idx( chainer->pool, root ); return root; } @@ -146,9 +147,17 @@ fd_fec_chainer_query( fd_fec_chainer_t * chainer, ulong slot, uint fec_set_idx ) return fec; } +static fd_fec_ele_t * +fd_fec_chainer_remove( fd_fec_chainer_t * chainer, ulong key ) { + fd_fec_ele_t * fec = fd_fec_frontier_ele_remove( chainer->frontier, &key, NULL, chainer->pool ); + fec = fd_ptr_if( !fec, fd_fec_ancestry_ele_remove( chainer->ancestry, &key, NULL, chainer->pool ), fec ); + fec = fd_ptr_if( !fec, fd_fec_orphaned_ele_remove( chainer->orphaned, &key, NULL, chainer->pool ), fec ); + return fec; +} + static int is_last_fec( ulong key ){ - return ( (uint)fd_ulong_extract( key, 0, 31 ) & UINT_MAX ) == UINT_MAX; // lol fix + return (uint)fd_ulong_extract( key, 0, 31 ) == UINT_MAX; } static void @@ -328,3 +337,67 @@ fd_fec_chainer_insert( fd_fec_chainer_t * chainer, return ele; } + +void +fd_fec_chainer_publish( fd_fec_chainer_t * chainer, ulong new_root_slot ) { + fd_fec_ele_t * old_root = fd_fec_pool_ele( chainer->pool, chainer->root_fec ); + fd_fec_ele_t * new_root = fd_fec_chainer_query( chainer, new_root_slot, 0 ); + + FD_TEST( old_root ); + if( FD_UNLIKELY( !new_root ) ) { + /* It is possible to not have a fec element for the new root during + second incremental snapshot load */ + + new_root = fd_fec_pool_ele_acquire( chainer->pool ); + new_root->key = new_root_slot << 32; /* fec_set_idx 0, similar to chainer_init */ + new_root->slot = new_root_slot; + new_root->fec_set_idx = 0; + new_root->data_cnt = 0; + new_root->data_complete = 1; + new_root->slot_complete = 1; + new_root->parent_off = 0; + memset( new_root->chained_merkle_root, 0, FD_SHRED_MERKLE_ROOT_SZ ); + + fd_fec_parent_t * p = fd_fec_parents_insert( chainer->parents, new_root_slot << 32 | UINT_MAX ); + p->parent_key = new_root_slot << 32; + + fd_fec_frontier_ele_insert( chainer->frontier, new_root, chainer->pool ); + } + + /* Prune children of the old root */ + + fd_fec_queue_push_tail( chainer->queue, old_root->key ); + + while( FD_LIKELY( !fd_fec_queue_empty( chainer->queue ) ) ) { + ulong key = fd_fec_queue_pop_head( chainer->queue ); + fd_fec_ele_t * ele = fd_fec_chainer_query( chainer, key >> 32, (uint)key ); + if( FD_UNLIKELY( !ele ) ) continue; + + if( FD_UNLIKELY( ele->slot_complete ) ) { + fd_fec_children_t * fec_children = fd_fec_children_query( chainer->children, ele->slot, NULL ); + if( FD_UNLIKELY( fec_children ) ) { + for( ulong off = fd_slot_child_offs_const_iter_init( fec_children->child_offs ); + !fd_slot_child_offs_const_iter_done( off ); + off = fd_slot_child_offs_const_iter_next( fec_children->child_offs, off ) ) { + ulong child_slot = ele->slot + off; + + if( FD_UNLIKELY( child_slot == new_root_slot ) ) continue; + + fd_fec_queue_push_tail( chainer->queue, child_slot << 32 | 0 ); + } + } + } else { + ulong child_key = (ele->slot << 32) | (ele->key + ele->data_cnt); + fd_fec_queue_push_tail( chainer->queue, child_key ); + } + + /* Remove ele from the chainer. */ + + fd_fec_ele_t * remove = fd_fec_chainer_remove( chainer, ele->key ); + FD_TEST( remove == ele ); + } + + /* Update the root_fec */ + + chainer->root_fec = fd_fec_pool_idx( chainer->pool, new_root ); +} diff --git a/src/discof/repair/fd_fec_chainer.h b/src/discof/repair/fd_fec_chainer.h index 38a85e96fc..b21a242c52 100644 --- a/src/discof/repair/fd_fec_chainer.h +++ b/src/discof/repair/fd_fec_chainer.h @@ -267,6 +267,7 @@ struct __attribute__((aligned(128UL))) fd_fec_chainer { fd_fec_children_t * children; /* map of slot->child_offs for fast O(1) querying */ ulong * queue; /* queue of FEC keys for BFS chaining */ fd_fec_out_t * out; /* queue of FEC keys to deliver to application */ + ulong root_fec; /* pool idx of the root FEC set */ }; FD_PROTOTYPES_BEGIN @@ -365,6 +366,11 @@ fd_fec_chainer_insert( fd_fec_chainer_t * chainer, uchar const merkle_root[static FD_SHRED_MERKLE_ROOT_SZ], uchar const chained_merkle_root[static FD_SHRED_MERKLE_ROOT_SZ] ); +/* fd_fec_chainer_publish prunes the fec tree when the wmk is updated. */ + +void +fd_fec_chainer_publish( fd_fec_chainer_t * chainer, ulong new_root ); + FD_PROTOTYPES_END #endif /* HEADER_fd_src_discof_repair_fd_fec_chainer_h */ diff --git a/src/discof/repair/fd_repair_tile.c b/src/discof/repair/fd_repair_tile.c index 0b471be3bd..1d0fe09360 100644 --- a/src/discof/repair/fd_repair_tile.c +++ b/src/discof/repair/fd_repair_tile.c @@ -766,15 +766,17 @@ after_frag( fd_repair_tile_ctx_t * ctx, ulong wmark = fd_fseq_query( ctx->wmark ); if( FD_UNLIKELY( fd_forest_root_slot( ctx->forest ) == ULONG_MAX ) ) { + FD_LOG_NOTICE(( "Forest initializing with root %lu", wmark )); fd_forest_init( ctx->forest, wmark ); uchar mr[ FD_SHRED_MERKLE_ROOT_SZ ] = { 0 }; /* FIXME */ fd_fec_chainer_init( ctx->fec_chainer, wmark, mr ); FD_TEST( fd_forest_root_slot( ctx->forest ) != ULONG_MAX ); - FD_LOG_NOTICE(( "Forest initialized with root %lu", wmark )); ctx->prev_wmark = wmark; } + if( FD_UNLIKELY( ctx->prev_wmark < wmark ) ) { fd_forest_publish( ctx->forest, wmark ); + fd_fec_chainer_publish( ctx->fec_chainer, wmark ); ctx->prev_wmark = wmark; // invalidate our repair iterator ctx->repair_iter = fd_forest_iter_init( ctx->forest ); diff --git a/src/discof/repair/test_fec_chainer.c b/src/discof/repair/test_fec_chainer.c index 2cfd32b1a5..d7bf21800b 100644 --- a/src/discof/repair/test_fec_chainer.c +++ b/src/discof/repair/test_fec_chainer.c @@ -183,6 +183,77 @@ test_single_fec( fd_wksp_t * wksp ){ } +void +test_publish( fd_wksp_t * wksp ){ + ulong fec_max = 32; + + void * mem = fd_wksp_alloc_laddr( wksp, fd_fec_chainer_align(), fd_fec_chainer_footprint( fec_max ), 1UL ); + FD_TEST( mem ); + fd_fec_chainer_t * chainer = fd_fec_chainer_join( fd_fec_chainer_new( mem, fec_max, 0UL ) ); + + uchar mr_root[FD_SHRED_MERKLE_ROOT_SZ] = { 1 }; + fd_fec_ele_t * f0_64 = fd_fec_chainer_init( chainer, 1, mr_root ); + + FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &f0_64->key, NULL, chainer->pool ) == f0_64 ); + + /* Typical startup behavior, turbine orphan FECs added */ + fd_fec_chainer_insert( chainer, 10, 0, 32, 1, 1, 1, mr_root, mr_root ); + fd_fec_chainer_insert( chainer, 9, 0, 32, 1, 1, 1, mr_root, mr_root ); + + /* simulating no FECs chained, but a new root is published */ + ulong new_root = 2UL << 32 | 0; + fd_fec_chainer_publish( chainer, 2 ); + + FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &new_root, NULL, chainer->pool ) ); + + /* Chain off of root slot for a bit */ + fd_fec_chainer_insert( chainer, 3, 0, 32, 1, 1, 1, mr_root, mr_root ); + fd_fec_chainer_insert( chainer, 4, 0, 32, 1, 1, 1, mr_root, mr_root ); + ulong new_frontier = 4UL << 32 | 0; + FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &new_frontier, NULL, chainer->pool ) ); + + /* Publish to ancestor */ + fd_fec_chainer_publish( chainer, 3 ); + FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &new_frontier, NULL, chainer->pool ) ); + + /* Make a tree + + 3 - 4 - 8 - 9 - 10 + \ 5 - 6 - 7 + + */ + + fd_fec_chainer_insert( chainer, 5, 0, 32, 1, 1, 2, mr_root, mr_root ); + fd_fec_chainer_insert( chainer, 6, 0, 32, 1, 1, 1, mr_root, mr_root ); + fd_fec_chainer_insert( chainer, 7, 0, 32, 1, 1, 1, mr_root, mr_root ); + fd_fec_chainer_insert( chainer, 8, 0, 32, 1, 1, 4, mr_root, mr_root ); + + uint frontier_cnt = 0; + ulong frontier_keys[2] = { 7UL << 32 | 0, 10UL << 32 | 0 }; + for( fd_fec_frontier_iter_t iter = fd_fec_frontier_iter_init( chainer->frontier, chainer->pool ); !fd_fec_frontier_iter_done( iter, chainer->frontier, chainer->pool ); iter = fd_fec_frontier_iter_next( iter, chainer->frontier, chainer->pool ) ){ + frontier_cnt++; + } + FD_TEST( frontier_cnt == sizeof(frontier_keys) / sizeof(ulong) ); + for( uint i = 0; i < sizeof(frontier_keys) / sizeof(ulong); i++ ){ + FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &frontier_keys[i], NULL, chainer->pool ) ); + } + + /* Publish down the tree */ + fd_fec_chainer_publish( chainer, 4 ); + new_frontier = 10UL << 32 | 0; + FD_TEST( fd_fec_frontier_ele_query( chainer->frontier, &new_frontier, NULL, chainer->pool ) ); + + FD_TEST( fd_fec_chainer_query( chainer, 4, 0 ) ); + FD_TEST( !fd_fec_chainer_query( chainer, 5, 0 ) ); + FD_TEST( !fd_fec_chainer_query( chainer, 6, 0 ) ); + FD_TEST( !fd_fec_chainer_query( chainer, 7, 0 ) ); + FD_TEST( fd_fec_chainer_query( chainer, 8, 0 ) ); + FD_TEST( fd_fec_chainer_query( chainer, 9, 0 ) ); + FD_TEST( fd_fec_chainer_query( chainer, 10, 0 ) ); + + fd_wksp_free_laddr( fd_fec_chainer_delete( fd_fec_chainer_leave( chainer ) ) ); +} + int main( int argc, char ** argv ) { fd_boot( &argc, &argv ); @@ -195,6 +266,7 @@ main( int argc, char ** argv ) { // test_fec_ordering( wksp ); // test_single_fec( wksp ); + test_publish( wksp ); ulong sig = fd_disco_repair_replay_sig( 3508496, 1, 32, 128 ); FD_TEST( fd_disco_repair_replay_sig_slot( sig ) == 3508496 );