diff --git a/src/app/firedancer-dev/main.c b/src/app/firedancer-dev/main.c index 6cfffd2f2be..943248e8c2c 100644 --- a/src/app/firedancer-dev/main.c +++ b/src/app/firedancer-dev/main.c @@ -97,9 +97,7 @@ extern fd_topo_run_tile_t fd_tile_archiver_writer; extern fd_topo_run_tile_t fd_tile_archiver_playback; extern fd_topo_run_tile_t fd_tile_archiver_backtest; -extern fd_topo_run_tile_t fd_tile_bencho; -extern fd_topo_run_tile_t fd_tile_benchg; -extern fd_topo_run_tile_t fd_tile_benchs; +extern fd_topo_run_tile_t fd_tile_snapshot_restore_FileRd; fd_topo_run_tile_t * TILES[] = { &fd_tile_net, @@ -138,6 +136,7 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_bencho, &fd_tile_benchg, &fd_tile_benchs, + &fd_tile_snapshot_restore_FileRd, NULL, }; diff --git a/src/disco/stem/fd_stem.h b/src/disco/stem/fd_stem.h index 5ed398bd2bb..9fd290a4a6c 100644 --- a/src/disco/stem/fd_stem.h +++ b/src/disco/stem/fd_stem.h @@ -8,7 +8,7 @@ struct fd_stem_context { fd_frag_meta_t ** mcaches; ulong * seqs; - ulong * depths; + ulong const * depths; ulong * cr_avail; ulong cr_decrement_amount; diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index 6c91f3dbb0d..de173d29d45 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -435,6 +435,10 @@ typedef struct { int archive_fd; } archiver; + struct { + char file_path[ PATH_MAX ]; + } filerd; + }; } fd_topo_tile_t; diff --git a/src/disco/topo/fd_topob.c b/src/disco/topo/fd_topob.c index 2ce07b9f9ea..653587e597f 100644 --- a/src/disco/topo/fd_topob.c +++ b/src/disco/topo/fd_topob.c @@ -64,7 +64,7 @@ fd_topob_obj( fd_topo_t * topo, return obj; } -void +fd_topo_link_t * fd_topob_link( fd_topo_t * topo, char const * link_name, char const * wksp_name, @@ -100,6 +100,8 @@ fd_topob_link( fd_topo_t * topo, FD_TEST( fd_pod_insertf_ulong( topo->props, mtu, "obj.%lu.mtu", obj->id ) ); } topo->link_cnt++; + + return link; } void diff --git a/src/disco/topo/fd_topob.h b/src/disco/topo/fd_topob.h index cbb6d100b05..162237d18d4 100644 --- a/src/disco/topo/fd_topob.h +++ b/src/disco/topo/fd_topob.h @@ -72,7 +72,7 @@ fd_topob_tile_uses( fd_topo_t * topo, can have no backing data buffer, a dcache, or a reassembly buffer behind it. */ -void +fd_topo_link_t * fd_topob_link( fd_topo_t * topo, char const * link_name, char const * wksp_name, diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk new file mode 100644 index 00000000000..da3369e6a57 --- /dev/null +++ b/src/discof/restore/Local.mk @@ -0,0 +1 @@ +$(call add-objs,fd_filerd_tile,fd_discof) diff --git a/src/discof/restore/README.md b/src/discof/restore/README.md new file mode 100644 index 00000000000..db0be4decc7 --- /dev/null +++ b/src/discof/restore/README.md @@ -0,0 +1,35 @@ +# Snapshot Restore + +## Stream link conventions + +Various snapshot components use byte streams, not packet streams. + +These require custom conventions. + +**Stream fragment descriptors** + +Byte streams use `fd_frag_stream_meta_t` (defined in `fd_restore_base.h`). + +These have the following changes: +- `chunk` is replaced by `goff` and `loff`, which are 64-bit offsets + describing the stream offset and dcache offset respectively +- `tsorig` / `tspub` are removed (latency is less relevant) +- `sig` is removed (cannot filter without looking at stream data) +- `sz` is widened to 32 bits. + +`**Dcache allocations** + +Payloads in stream dcaches are unaligned. Payloads are addressed with +uncompressed byte offsets relative to the workspace start. + +(Compare this to the usual compact packet dcaches, which use 64 byte +aligned chunks with compressed addressing.) + +**Stream backpressure** + +Byte streams naturally require a reliable transport. + +Consumers periodically publish their progress in `fseq`. +- `fseq[0]` is the lowest sequence number not yet consumed (standard) +- `fseq[1]` is the stream offset of the next byte not yet consumed +` \ No newline at end of file diff --git a/src/discof/restore/fd_filerd_tile.c b/src/discof/restore/fd_filerd_tile.c new file mode 100644 index 00000000000..7c2574a1936 --- /dev/null +++ b/src/discof/restore/fd_filerd_tile.c @@ -0,0 +1,347 @@ +#include "fd_restore_base.h" +#include "../../disco/topo/fd_topo.h" +#include "../../disco/metrics/fd_metrics.h" +#include +#include +#include +#include + +#define NAME "FileRd" + +struct fd_filerd_tile { + int fd; + + uchar * buf; /* dcache */ + ulong buf_off; + ulong buf_sz; + ulong goff; +}; + +typedef struct fd_filerd_tile fd_filerd_tile_t; + +static ulong +scratch_align( void ) { + return alignof(fd_filerd_tile_t); +} + +static ulong +scratch_footprint( fd_topo_tile_t const * tile ) { + (void)tile; + return sizeof(fd_filerd_tile_t); +} + +static void +privileged_init( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + fd_filerd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + fd_memset( ctx, 0, sizeof(fd_filerd_tile_t) ); + + if( FD_UNLIKELY( tile->in_cnt !=0UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 0", tile->in_cnt )); + if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt )); + + ctx->fd = open( tile->filerd.file_path, O_RDONLY|O_CLOEXEC ); + if( FD_UNLIKELY( ctx->fd<0 ) ) FD_LOG_ERR(( "open() failed (%i-%s)", errno, fd_io_strerror( errno ) )); +} + +static void +unprivileged_init( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + fd_filerd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + + void * out_dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ 0 ] ].dcache_obj_id ) ); + FD_TEST( out_dcache ); + + ctx->buf = out_dcache; + ctx->buf_off = 0UL; + ctx->buf_sz = fd_dcache_data_sz( out_dcache ); + ctx->goff = 0UL; +} + +static void +during_housekeeping( fd_filerd_tile_t * ctx ) { + (void)ctx; +} + +static void +metrics_write( fd_filerd_tile_t * ctx ) { + (void)ctx; +} + +static void +close_file( fd_filerd_tile_t * ctx ) { + if( FD_UNLIKELY( ctx->fd<0 ) ) return; + if( FD_UNLIKELY( close( ctx->fd ) ) ) { + FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } + ctx->fd = -1; +} + +static void +after_credit( fd_filerd_tile_t * ctx, + fd_frag_stream_meta_t * out_mcache, + ulong const out_depth, + ulong * restrict out_seq, + ulong * restrict cr_frag_avail, + ulong * restrict cr_byte_avail, + int * restrict charge_busy_after ) { + /* Assumes *cr_frag_avail>=2 */ + + int fd = ctx->fd; + if( FD_UNLIKELY( fd<0 ) ) return; + + if( FD_UNLIKELY( ctx->buf_off >= ctx->buf_sz ) ) { + FD_LOG_CRIT(( "Buffer overflow (buf_off=%lu buf_sz=%lu)", ctx->buf_off, ctx->buf_sz )); + } + + ulong const iov0_sz = fd_ulong_min( *cr_byte_avail, ctx->buf_sz - ctx->buf_off ); + struct iovec iov[2]; + iov[ 0 ].iov_base = ctx->buf + ctx->buf_off; + iov[ 0 ].iov_len = iov0_sz; + iov[ 1 ].iov_base = ctx->buf; + iov[ 1 ].iov_len = fd_ulong_min( (ulong)fd_long_max( 0L, (long)*cr_byte_avail-(long)iov0_sz ), ctx->buf_off ); + + long res = readv( fd, iov, 2 ); + if( FD_UNLIKELY( res<=0L ) ) { + if( FD_UNLIKELY( res==0 ) ) { + FD_LOG_INFO(( "Reached end of file" )); + close_file( ctx ); + return; + } + if( FD_LIKELY( errno==EAGAIN ) ) return; + FD_LOG_ERR(( "readv() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + /* aborts app */ + } + + ulong sz = (ulong)res; + cr_byte_avail[0] -= sz; + *charge_busy_after = 1; + + ulong frag0_sz = fd_ulong_min( iov0_sz, sz ); + ulong frag1_sz = (ulong)res - frag0_sz; + + fd_mcache_publish_stream( out_mcache, out_depth, out_seq[0], ctx->goff, ctx->buf_off, frag0_sz ); + out_seq[0] = fd_seq_inc( out_seq[0], 1UL ); + cr_frag_avail[0]--; + ctx->goff += frag0_sz; + ctx->buf_off += frag0_sz; + if( ctx->buf_off >= ctx->buf_sz ) ctx->buf_off = 0UL; /* cmov */ + + if( FD_UNLIKELY( frag1_sz ) ) { + fd_mcache_publish_stream( out_mcache, out_depth, out_seq[0], ctx->goff, 0UL, frag1_sz ); + out_seq[0] = fd_seq_inc( out_seq[0], 1UL ); + cr_frag_avail[0]--; + ctx->goff += frag1_sz; + ctx->buf_off += frag1_sz; + } +} + +/* run/run1 are a custom run loop based on fd_stem.c. */ + +__attribute__((noinline)) static void +fd_filerd_run1( + fd_filerd_tile_t * ctx, + fd_frag_stream_meta_t * out_mcache, + void * out_dcache, + ulong cons_cnt, + ushort * restrict event_map, /* cnt=1+cons_cnt */ + ulong ** restrict cons_fseq, /* cnt= cons_cnt points to each consumer's fseq */ + ulong volatile ** restrict cons_slow, /* cnt= cons_cnt points to 'slow' metrics */ + ulong * restrict cons_seq, /* cnt=2*cons_cnt cache of recent fseq observations */ + long lazy, + fd_rng_t * rng +) { + + /* out flow control state */ + ulong cr_byte_avail; /* byte burst quota */ + ulong cr_frag_avail; /* frag burst quota */ + + /* housekeeping state */ + ulong event_cnt; + ulong event_seq; + ulong async_min; /* min number of ticks between a housekeeping event */ + + /* performance metrics */ + ulong metric_in_backp; + ulong metric_backp_cnt; + ulong metric_regime_ticks[9]; + + metric_in_backp = 1UL; + metric_backp_cnt = 0UL; + memset( metric_regime_ticks, 0, sizeof( metric_regime_ticks ) ); + + /* out frag stream init */ + + cr_byte_avail = 0UL; + cr_frag_avail = 0UL; + + ulong const out_depth = fd_mcache_depth( out_mcache->f ); + ulong out_seq = 0UL; + + ulong const out_bufsz = fd_dcache_data_sz( out_dcache ); + + ulong const cr_byte_max = out_bufsz; + ulong const cr_frag_max = out_depth; + + ulong const burst_byte = 512UL; /* don't producing frags smaller than this */ + ulong const burst_frag = 2UL; + + for( ulong cons_idx=0UL; cons_idx=0L ) ) { + ulong event_idx = (ulong)event_map[ event_seq ]; + + if( FD_LIKELY( event_idxgoff, cons_seq[ 2*cons_idx+1 ] ), 0L ), 0L ); + slowest_cons = fd_ulong_if( cons_cr_byte_avail=event_cnt ) ) { + event_seq = 0UL; + ulong swap_idx = (ulong)fd_rng_uint_roll( rng, (uint)event_cnt ); + ushort map_tmp = event_map[ swap_idx ]; + event_map[ swap_idx ] = event_map[ 0 ]; + event_map[ 0 ] = map_tmp; + } + + /* Reload housekeeping timer */ + then = now + (long)fd_tempo_async_reload( rng, async_min ); + long next = fd_tickcount(); + housekeeping_ticks = (ulong)(next - now); + now = next; + } + + /* Check if we are backpressured. */ + + if( FD_UNLIKELY( cr_byte_availlinks[ tile->out_link_id[ 0 ] ].mcache ); + FD_TEST( out_mcache ); + + ulong reliable_cons_cnt = 0UL; + ulong * cons_fseq[ FD_TOPO_MAX_LINKS ]; + for( ulong i=0UL; itile_cnt; i++ ) { + fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; + for( ulong j=0UL; jin_cnt; j++ ) { + if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[0] && consumer_tile->in_link_reliable[ j ] ) ) { + cons_fseq[ reliable_cons_cnt ] = consumer_tile->in_link_fseq[ j ]; + FD_TEST( cons_fseq[ reliable_cons_cnt ] ); + reliable_cons_cnt++; + FD_TEST( reliable_cons_cnttile_obj_id ); + ushort event_map[ 1+reliable_cons_cnt ]; + ulong volatile * cons_slow[ reliable_cons_cnt ]; + ulong cons_seq [ 2*reliable_cons_cnt ]; + fd_filerd_run1( ctx, out_mcache, ctx->buf, reliable_cons_cnt, event_map, cons_fseq, cons_slow, cons_seq, 0L, rng ); +} + +fd_topo_run_tile_t fd_tile_snapshot_restore_FileRd = { + .name = NAME, + .scratch_align = scratch_align, + .scratch_footprint = scratch_footprint, + .privileged_init = privileged_init, + .unprivileged_init = unprivileged_init, + .run = fd_filerd_run, +}; + +#undef NAME diff --git a/src/discof/restore/fd_restore_base.h b/src/discof/restore/fd_restore_base.h new file mode 100644 index 00000000000..de6b4898425 --- /dev/null +++ b/src/discof/restore/fd_restore_base.h @@ -0,0 +1,62 @@ +#include "../../tango/mcache/fd_mcache.h" + +/* fd_frag_stream_meta_t is a variation of fd_frag_meta_t optimized for + stream I/O. */ + +union fd_frag_stream_meta { + + struct { + + ulong seq; /* frag sequence number */ + ulong goff; /* global offset */ + + uint sz; + uint unused; + ulong loff; /* dcache offset */ + + }; + + fd_frag_meta_t f[1]; + +}; + +typedef union fd_frag_stream_meta fd_frag_stream_meta_t; + +FD_PROTOTYPES_BEGIN + +#if FD_HAS_SSE + +FD_FN_CONST static inline __m128i +fd_frag_stream_meta_sse0( ulong seq, + ulong goff ) { + return _mm_set_epi64x( (long)goff, (long)seq ); +} + +FD_FN_CONST static inline __m128i +fd_frag_stream_meta_sse1( ulong sz, /* Assumed 32-bit */ + ulong loff ) { + return _mm_set_epi64x( (long)loff, (long)(sz) ); +} + +#endif /* FD_HAS_SSE */ + +static inline void +fd_mcache_publish_stream( fd_frag_stream_meta_t * mcache, + ulong depth, + ulong seq, + ulong goff, + ulong loff, + ulong sz ) { + fd_frag_stream_meta_t * meta = mcache + fd_mcache_line_idx( seq, depth ); + FD_COMPILER_MFENCE(); + meta->seq = fd_seq_dec( seq, 1UL ); + FD_COMPILER_MFENCE(); + meta->goff = goff; + meta->loff = loff; + meta->sz = (uint)sz; + FD_COMPILER_MFENCE(); + meta->seq = seq; + FD_COMPILER_MFENCE(); +} + +FD_PROTOTYPES_END