Skip to content

funk: clean up join API #4853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/app/ledger/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct fd_ledger_args {
fd_wksp_t * status_cache_wksp; /* wksp for status cache. */
fd_blockstore_t blockstore_ljoin;
fd_blockstore_t * blockstore; /* blockstore for replay */
fd_funk_t * funk; /* handle to funk */
fd_funk_t funk[1]; /* handle to funk */
fd_alloc_t * alloc; /* handle to alloc */
char const * cmd; /* user passed command to fd_ledger */
ulong start_slot; /* start slot for offline replay */
Expand Down Expand Up @@ -894,13 +894,13 @@ void
init_funk( fd_ledger_args_t * args ) {
fd_funk_t * funk;
if( args->restore_funk ) {
funk = fd_funk_recover_checkpoint( args->funk_file, 1, args->restore_funk, &args->funk_close_args );
funk = fd_funk_recover_checkpoint( args->funk, args->funk_file, 1, args->restore_funk, &args->funk_close_args );
} else {
funk = fd_funk_open_file( args->funk_file, 1, args->hashseed, args->txns_max, args->index_max, args->funk_page_cnt*(1UL<<30), FD_FUNK_OVERWRITE, &args->funk_close_args );
funk = fd_funk_open_file( args->funk, args->funk_file, 1, args->hashseed, args->txns_max, args->index_max, args->funk_page_cnt*(1UL<<30), FD_FUNK_OVERWRITE, &args->funk_close_args );
}
args->funk = funk;
if( FD_UNLIKELY( !funk ) ) FD_LOG_ERR(( "Failed to join funk" ));
args->funk_wksp = fd_funk_wksp( funk );
FD_LOG_NOTICE(( "funky at global address 0x%016lx", fd_wksp_gaddr_fast( args->funk_wksp, funk ) ));
FD_LOG_NOTICE(( "Funk database is at %s:0x%lx", fd_wksp_name( args->wksp ), fd_wksp_gaddr_fast( args->funk_wksp, funk ) ));
}

void
Expand Down Expand Up @@ -1171,9 +1171,9 @@ ingest( fd_ledger_args_t * args ) {

#ifdef FD_FUNK_HANDHOLDING
if( args->verify_funk ) {
FD_LOG_NOTICE(( "verifying funky" ));
FD_LOG_NOTICE(( "fd_funk_verify() start" ));
if( fd_funk_verify( funk ) ) {
FD_LOG_ERR(( "verification failed" ));
FD_LOG_ERR(( "fd_funk_verify() failed" ));
}
}
#endif
Expand Down
17 changes: 10 additions & 7 deletions src/app/rpcserver/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ init_args( int * argc, char *** argv, fd_rpcserver_args_t * args ) {
char const * funk_file = fd_env_strip_cmdline_cstr( argc, argv, "--funk-file", NULL, NULL );
if( FD_UNLIKELY( !funk_file ))
FD_LOG_ERR(( "--funk-file argument is required" ));
args->funk = fd_funk_open_file( funk_file, 1, 0, 0, 0, 0, FD_FUNK_READONLY, NULL );
if( args->funk == NULL ) {
FD_LOG_ERR(( "failed to join a funky" ));
fd_funk_t * funk = fd_funk_open_file( args->funk, funk_file, 1, 0, 0, 0, 0, FD_FUNK_READONLY, NULL );
if( !funk ) {
FD_LOG_ERR(( "failed to join funk" ));
}

char const * blockstore_file = fd_env_strip_cmdline_cstr( argc, argv, "--blockstore-file", NULL, NULL );
Expand Down Expand Up @@ -107,10 +107,13 @@ init_args_offline( int * argc, char *** argv, fd_rpcserver_args_t * args ) {
if( FD_UNLIKELY( !funk_file ))
FD_LOG_ERR(( "--funk-file argument is required" ));
char const * restore = fd_env_strip_cmdline_cstr ( argc, argv, "--restore-funk", NULL, NULL );
if( restore != NULL )
args->funk = fd_funk_recover_checkpoint( funk_file, 1, restore, NULL );
else
args->funk = fd_funk_open_file( funk_file, 1, 0, 0, 0, 0, FD_FUNK_READONLY, NULL );
fd_funk_t * funk = NULL;
if( restore != NULL ) {
funk = fd_funk_recover_checkpoint( args->funk, funk_file, 1, restore, NULL );
} else {
funk = fd_funk_open_file( args->funk, funk_file, 1, 0, 0, 0, 0, FD_FUNK_READONLY, NULL );
}
if( FD_UNLIKELY( !funk ) ) FD_LOG_ERR(( "Failed to join funk database" ));

fd_wksp_t * wksp;
const char * wksp_name = fd_env_strip_cmdline_cstr ( argc, argv, "--wksp-name-blockstore", NULL, NULL );
Expand Down
6 changes: 3 additions & 3 deletions src/choreo/forks/fd_forks.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ slot_ctx_restore( ulong slot,
fd_exec_epoch_ctx_t * epoch_ctx,
fd_spad_t * runtime_spad,
fd_exec_slot_ctx_t * slot_ctx_out ) {
fd_funk_txn_map_t txn_map = fd_funk_txn_map( funk, fd_funk_wksp( funk ) );
fd_funk_txn_map_t * txn_map = fd_funk_txn_map( funk );
bool block_exists = fd_blockstore_shreds_complete( blockstore, slot );

FD_LOG_DEBUG( ( "Current slot %lu", slot ) );
Expand All @@ -204,11 +204,11 @@ slot_ctx_restore( ulong slot,
fd_funk_rec_key_t id = fd_runtime_slot_bank_key();
for( ; ; ) {
fd_funk_txn_start_read( funk );
fd_funk_txn_t * txn = fd_funk_txn_query( &xid, &txn_map );
fd_funk_txn_t * txn = fd_funk_txn_query( &xid, txn_map );
if( !txn ) {
memset( xid.uc, 0, sizeof( fd_funk_txn_xid_t ) );
xid.ul[0] = slot;
txn = fd_funk_txn_query( &xid, &txn_map );
txn = fd_funk_txn_query( &xid, txn_map );
if( !txn ) {
FD_LOG_ERR( ( "missing txn, parent slot %lu", slot ) );
}
Expand Down
19 changes: 7 additions & 12 deletions src/discof/batch/fd_batch_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct fd_snapshot_tile_ctx {
/* Shared data structures. */
fd_txncache_t * status_cache;
ulong * is_constipated;
fd_funk_t * funk;
fd_funk_t funk[1];

/* File descriptors used for snapshot generation. */
int tmp_fd;
Expand Down Expand Up @@ -442,20 +442,15 @@ after_credit( fd_snapshot_tile_ctx_t * ctx,
if( FD_UNLIKELY( !ctx->is_funk_active ) ) {
/* Setting these parameters are not required because we are joining the
funk that was setup in the replay tile. */
ctx->funk = fd_funk_open_file( ctx->funk_file,
1UL,
0UL,
0UL,
0UL,
0UL,
FD_FUNK_READ_WRITE,
NULL );
if( FD_UNLIKELY( !ctx->funk ) ) {
FD_LOG_ERR(( "failed to join a funky" ));
fd_funk_t * funk = fd_funk_open_file(
ctx->funk, ctx->funk_file,
1UL, 0UL, 0UL, 0UL, 0UL, FD_FUNK_READ_WRITE, NULL );
if( FD_UNLIKELY( !funk ) ) {
FD_LOG_ERR(( "Failed to join a funk database" ));
}
ctx->is_funk_active = 1;

FD_LOG_WARNING(( "Just joined funk at file=%s", ctx->funk_file ));
FD_LOG_WARNING(( "Joined funk database at file=%s", ctx->funk_file ));
}

if( fd_batch_fseq_is_snapshot( batch_fseq ) ) {
Expand Down
7 changes: 4 additions & 3 deletions src/discof/consensus/test_consensus.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,14 @@ main( void ) {
// /**********************************************************************/

// fd_wksp_tag_query_info_t funk_info;
// fd_funk_t * funk = NULL;
// fd_funk_t funk_[1];
// fd_funk_t * funk = NULL;
// ulong funk_tag = FD_FUNK_MAGIC;
// if( fd_wksp_tag_query( wksp, &funk_tag, 1, &funk_info, 1 ) > 0 ) {
// void * shmem = fd_wksp_laddr_fast( wksp, funk_info.gaddr_lo );
// funk = fd_funk_join( shmem );
// funk = fd_funk_join( funk_, shmem );
// }
// if( funk == NULL ) FD_LOG_ERR( ( "failed to join a funky" ) );
// if( !funk ) FD_LOG_ERR( ( "failed to join a funky" ) );

// /**********************************************************************/
// /* blockstore */
Expand Down
26 changes: 10 additions & 16 deletions src/discof/exec/fd_exec_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ struct fd_exec_tile_ctx {
int pending_epoch_pop;

/* Funk-specific setup. */
fd_funk_t * funk;
fd_funk_t funk[1];
fd_wksp_t * funk_wksp;

/* Data structures related to managing and executing the transaction.
Expand Down Expand Up @@ -241,13 +241,13 @@ prepare_new_slot_execution( fd_exec_tile_ctx_t * ctx,
fd_spad_push( ctx->exec_spad );
ctx->pending_slot_pop = 1;

fd_funk_txn_map_t txn_map = fd_funk_txn_map( ctx->funk, ctx->funk_wksp );
if( FD_UNLIKELY( !txn_map.map ) ) {
fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
if( FD_UNLIKELY( !txn_map->map ) ) {
FD_LOG_ERR(( "Could not find valid funk transaction map" ));
}
fd_funk_txn_xid_t xid = { .ul = { slot_msg->slot, slot_msg->slot } };
fd_funk_txn_start_read( ctx->funk );
fd_funk_txn_t * funk_txn = fd_funk_txn_query( &xid, &txn_map );
fd_funk_txn_t * funk_txn = fd_funk_txn_query( &xid, txn_map );
if( FD_UNLIKELY( !funk_txn ) ) {
FD_LOG_ERR(( "Could not find valid funk transaction" ));
}
Expand Down Expand Up @@ -664,19 +664,13 @@ unprivileged_init( fd_topo_t * topo,
the funk that was setup in the replay tile. */
FD_LOG_NOTICE(( "Trying to join funk at file=%s", tile->exec.funk_file ));
fd_funk_txn_start_write( NULL );
ctx->funk = fd_funk_open_file( tile->exec.funk_file,
1UL,
0UL,
0UL,
0UL,
0UL,
FD_FUNK_READONLY,
NULL );
if( FD_UNLIKELY( !fd_funk_open_file(
ctx->funk, tile->exec.funk_file,
1UL, 0UL, 0UL, 0UL, 0UL, FD_FUNK_READONLY, NULL ) ) ) {
FD_LOG_ERR(( "fd_funk_open_file(%s) failed", tile->exec.funk_file ));
}
fd_funk_txn_end_write( NULL );
ctx->funk_wksp = fd_funk_wksp( ctx->funk );
if( FD_UNLIKELY( !ctx->funk ) ) {
FD_LOG_ERR(( "failed to join a funk" ));
}

FD_LOG_NOTICE(( "Just joined funk at file=%s", tile->exec.funk_file ));

Expand All @@ -693,7 +687,7 @@ unprivileged_init( fd_topo_t * topo,
// FIXME account for this in exec spad footprint
uchar * txn_ctx_mem = fd_spad_alloc( ctx->exec_spad, FD_EXEC_TXN_CTX_ALIGN, FD_EXEC_TXN_CTX_FOOTPRINT );
ctx->txn_ctx = fd_exec_txn_ctx_join( fd_exec_txn_ctx_new( txn_ctx_mem ), ctx->exec_spad, ctx->exec_spad_wksp );
ctx->txn_ctx->funk = ctx->funk;
*ctx->txn_ctx->funk = *ctx->funk;

ctx->txn_ctx->runtime_pub_wksp = ctx->runtime_public_wksp;
if( FD_UNLIKELY( !ctx->txn_ctx->runtime_pub_wksp ) ) {
Expand Down
12 changes: 6 additions & 6 deletions src/discof/geyser/fd_geyser.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "sham_link.h"

struct fd_geyser {
fd_funk_t * funk;
fd_funk_t funk[1];
fd_blockstore_t blockstore_ljoin;
fd_blockstore_t * blockstore;
int blockstore_fd;
Expand Down Expand Up @@ -72,9 +72,9 @@ fd_geyser_new( void * mem, fd_geyser_args_t * args ) {
ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
FD_TEST( scratch_top <= (ulong)mem + fd_geyser_footprint() );

self->funk = fd_funk_open_file( args->funk_file, 1, 0, 0, 0, 0, FD_FUNK_READONLY, NULL );
if( self->funk == NULL ) {
FD_LOG_ERR(( "failed to join a funky" ));
fd_funk_t * funk = fd_funk_open_file( self->funk, args->funk_file, 1, 0, 0, 0, 0, FD_FUNK_READONLY, NULL );
if( !funk ) {
FD_LOG_ERR(( "fd_funk_open_file(%s) failed", args->funk_file ));
}

fd_wksp_t * wksp = fd_wksp_attach( args->blockstore_wksp );
Expand Down Expand Up @@ -216,8 +216,8 @@ replay_sham_link_during_frag( fd_geyser_t * ctx, fd_replay_notif_msg_t * state,

static const void *
read_account_with_xid( fd_geyser_t * ctx, fd_funk_rec_key_t * recid, fd_funk_txn_xid_t * xid, ulong * result_len ) {
fd_funk_txn_map_t txn_map = fd_funk_txn_map( ctx->funk, fd_funk_wksp( ctx->funk ) );
fd_funk_txn_t * txn = fd_funk_txn_query( xid, &txn_map );
fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
fd_funk_txn_t * txn = fd_funk_txn_query( xid, txn_map );
return fd_funk_rec_query_copy( ctx->funk, txn, recid, fd_scratch_virtual(), result_len );
}

Expand Down
43 changes: 22 additions & 21 deletions src/discof/replay/fd_replay_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ struct fd_replay_tile_ctx {

fd_alloc_t * alloc;
fd_valloc_t valloc;
fd_funk_t * funk;
fd_funk_t funk[1];
fd_exec_epoch_ctx_t * epoch_ctx;
fd_epoch_t * epoch;
fd_forks_t * forks;
Expand Down Expand Up @@ -746,9 +746,9 @@ checkpt( fd_replay_tile_ctx_t * ctx ) {
static void FD_FN_UNUSED
funk_cancel( fd_replay_tile_ctx_t * ctx, ulong mismatch_slot ) {
fd_funk_txn_start_write( ctx->funk );
fd_funk_txn_xid_t xid = { .ul = { mismatch_slot, mismatch_slot } };
fd_funk_txn_map_t txn_map = fd_funk_txn_map( ctx->funk, fd_funk_wksp( ctx->funk ) );
fd_funk_txn_t * mismatch_txn = fd_funk_txn_query( &xid, &txn_map );
fd_funk_txn_xid_t xid = { .ul = { mismatch_slot, mismatch_slot } };
fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
fd_funk_txn_t * mismatch_txn = fd_funk_txn_query( &xid, txn_map );
FD_TEST( fd_funk_txn_cancel( ctx->funk, mismatch_txn, 1 ) );
fd_funk_txn_end_write( ctx->funk );
}
Expand All @@ -773,7 +773,7 @@ txncache_publish( fd_replay_tile_ctx_t * ctx,
fd_funk_txn_start_read( ctx->funk );

fd_funk_txn_t * txn = to_root_txn;
fd_funk_txn_pool_t txn_pool = fd_funk_txn_pool( ctx->funk, fd_funk_wksp( ctx->funk ) );
fd_funk_txn_pool_t * txn_pool = fd_funk_txn_pool( ctx->funk );
while( txn!=rooted_txn ) {
ulong slot = txn->xid.ul[0];
if( FD_LIKELY( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) ) {
Expand All @@ -783,7 +783,7 @@ txncache_publish( fd_replay_tile_ctx_t * ctx,
FD_LOG_INFO(( "Registering constipated slot %lu", slot ));
fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, slot );
}
txn = fd_funk_txn_parent( txn, &txn_pool );
txn = fd_funk_txn_parent( txn, txn_pool );
}

fd_funk_txn_end_read( ctx->funk );
Expand Down Expand Up @@ -875,7 +875,7 @@ funk_publish( fd_replay_tile_ctx_t * ctx,
fd_funk_txn_start_write( ctx->funk );

fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
fd_funk_txn_pool_t txn_pool = fd_funk_txn_pool( ctx->funk, fd_funk_wksp( ctx->funk ) );
fd_funk_txn_pool_t * txn_pool = fd_funk_txn_pool( ctx->funk );

/* Try to publish into Funk */
if( is_constipated ) {
Expand All @@ -895,7 +895,7 @@ funk_publish( fd_replay_tile_ctx_t * ctx,
if( FD_UNLIKELY( fd_funk_txn_publish_into_parent( ctx->funk, txn, 0 ) ) ) {
FD_LOG_ERR(( "Can't publish funk transaction" ));
}
txn = fd_funk_txn_parent( txn, &txn_pool );
txn = fd_funk_txn_parent( txn, txn_pool );
}

} else {
Expand All @@ -916,7 +916,7 @@ funk_publish( fd_replay_tile_ctx_t * ctx,
we will calculate the epoch account hash for. */

fd_funk_txn_t * txn = to_root_txn;
fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, &txn_pool );
fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_pool );
while( parent_txn ) {
/* We need to be careful here because the eah start slot may be skipped
so the actual slot that we calculate the eah for may be greater than
Expand All @@ -931,7 +931,7 @@ funk_publish( fd_replay_tile_ctx_t * ctx,
break;
}
txn = parent_txn;
parent_txn = fd_funk_txn_parent( txn, &txn_pool );
parent_txn = fd_funk_txn_parent( txn, txn_pool );
}

/* At this point, we know txn is the funk txn that we will want to
Expand Down Expand Up @@ -982,17 +982,17 @@ get_rooted_txn( fd_replay_tile_ctx_t * ctx,
we must also register it into the status cache because we don't register
the root in txncache_publish to avoid registering the same slot multiple times. */

fd_funk_txn_pool_t txn_pool = fd_funk_txn_pool( ctx->funk, fd_funk_wksp( ctx->funk ) );
fd_funk_txn_pool_t * txn_pool = fd_funk_txn_pool( ctx->funk );

if( is_constipated ) {

if( FD_UNLIKELY( !ctx->false_root ) ) {

fd_funk_txn_t * txn = to_root_txn;
fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, &txn_pool );
fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_pool );
while( parent_txn ) {
txn = parent_txn;
parent_txn = fd_funk_txn_parent( txn, &txn_pool );
parent_txn = fd_funk_txn_parent( txn, txn_pool );
}

ctx->false_root = txn;
Expand Down Expand Up @@ -1056,8 +1056,8 @@ funk_and_txncache_publish( fd_replay_tile_ctx_t * ctx, ulong wmk, fd_funk_txn_xi
/* Handle updates to funk and the status cache. */

fd_funk_txn_start_read( ctx->funk );
fd_funk_txn_map_t txn_map = fd_funk_txn_map( ctx->funk, fd_funk_wksp( ctx->funk ) );
fd_funk_txn_t * to_root_txn = fd_funk_txn_query( xid, &txn_map );
fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
fd_funk_txn_t * to_root_txn = fd_funk_txn_query( xid, txn_map );
if( FD_UNLIKELY( !to_root_txn ) ) {
FD_LOG_ERR(( "Unable to find funk transaction for xid %lu", xid->ul[0] ));
}
Expand Down Expand Up @@ -2763,26 +2763,27 @@ privileged_init( fd_topo_t * topo,
if( strcmp( snapshot, "funk" ) == 0 ) {
/* Funk database already exists. The parameters are actually mostly ignored. */
funk = fd_funk_open_file(
tile->replay.funk_file, 1, ctx->funk_seed, tile->replay.funk_txn_max,
ctx->funk,
tile->replay.funk_file, 1, ctx->funk_seed, tile->replay.funk_txn_max,
tile->replay.funk_rec_max, tile->replay.funk_sz_gb * (1UL<<30),
FD_FUNK_READ_WRITE, NULL );
} else if( strncmp( snapshot, "wksp:", 5 ) == 0) {
/* Recover funk database from a checkpoint. */
funk = fd_funk_recover_checkpoint( tile->replay.funk_file, 1, snapshot+5, NULL );
funk = fd_funk_recover_checkpoint( ctx->funk, tile->replay.funk_file, 1, snapshot+5, NULL );
} else {
FD_LOG_NOTICE(( "Trying to create new funk at file=%s", tile->replay.funk_file ));
/* Create new funk database */
funk = fd_funk_open_file(
tile->replay.funk_file, 1, ctx->funk_seed, tile->replay.funk_txn_max,
ctx->funk,
tile->replay.funk_file, 1, ctx->funk_seed, tile->replay.funk_txn_max,
tile->replay.funk_rec_max, tile->replay.funk_sz_gb * (1UL<<30),
FD_FUNK_OVERWRITE, NULL );
FD_LOG_NOTICE(( "Opened funk file at %s", tile->replay.funk_file ));
}
if( FD_UNLIKELY( funk == NULL ) ) {
FD_LOG_ERR(( "no funk loaded" ));
if( FD_UNLIKELY( !funk ) ) {
FD_LOG_ERR(( "Failed to join funk database" ));
}
fd_funk_txn_end_write( NULL );
ctx->funk = funk;
ctx->funk_wksp = fd_funk_wksp( funk );
if( FD_UNLIKELY( ctx->funk_wksp == NULL ) ) {
FD_LOG_ERR(( "no funk wksp" ));
Expand Down
Loading
Loading