Skip to content

Use fd_stem_publish for net out #5557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 15 additions & 33 deletions src/disco/quic/fd_quic_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#include <linux/unistd.h>
#include <sys/random.h>

#define OUT_IDX_VERIFY 0
#define OUT_IDX_NET 1

/* fd_quic_tile provides a TPU server tile.

This tile handles incoming transactions that clients request to be
Expand Down Expand Up @@ -89,14 +92,6 @@ legacy_stream_notify( fd_quic_ctx_t * ctx,
}
}

/* Because of the separate mcache for publishing network fragments
back to networking tiles, which is not managed by fd_stem, we
need to periodically update the sync. */
static void
during_housekeeping( fd_quic_ctx_t * ctx ) {
fd_mcache_seq_update( ctx->net_out_sync, ctx->net_out_seq );
}

/* This tile always publishes messages downstream, even if there are
no credits available. It ignores the flow control of the downstream
verify tile. This is OK as the verify tile is written to expect
Expand Down Expand Up @@ -409,19 +404,11 @@ quic_tx_aio_send( void * _ctx,
just indicate where they came from so they don't bounce back */
ulong sig = fd_disco_netmux_sig( ip_dst, 0U, ip_dst, DST_PROTO_OUTGOING, FD_NETMUX_SIG_MIN_HDR_SZ );

long tspub = fd_tickcount();
fd_mcache_publish( ctx->net_out_mcache,
ctx->net_out_depth,
ctx->net_out_seq,
sig,
ctx->net_out_chunk,
sz_l2,
fd_frag_meta_ctl( 0UL, 1, 1, 0 ),
0,
fd_frag_meta_ts_comp( tspub ) );

ctx->net_out_seq = fd_seq_inc( ctx->net_out_seq, 1UL );
ctx->net_out_chunk = fd_dcache_compact_next( ctx->net_out_chunk, FD_NET_MTU, ctx->net_out_chunk0, ctx->net_out_wmark );
ulong chunk = ctx->net_out_chunk;
ulong ctl = fd_frag_meta_ctl( 0UL, 1, 1, 0 );
fd_stem_publish( ctx->stem, OUT_IDX_NET, sig, chunk, sz_l2, ctl, 0L, 0L );

ctx->net_out_chunk = fd_dcache_compact_next( chunk, FD_NET_MTU, ctx->net_out_chunk0, ctx->net_out_wmark );
}

if( FD_LIKELY( opt_batch_idx ) ) {
Expand Down Expand Up @@ -475,8 +462,8 @@ unprivileged_init( fd_topo_t * topo,
}

if( FD_UNLIKELY( tile->out_cnt!=2UL ||
strcmp( topo->links[ tile->out_link_id[ 0UL ] ].name, "quic_verify" ) ||
strcmp( topo->links[ tile->out_link_id[ 1UL ] ].name, "quic_net" ) ) )
strcmp( topo->links[ tile->out_link_id[ OUT_IDX_VERIFY ] ].name, "quic_verify" ) ||
strcmp( topo->links[ tile->out_link_id[ OUT_IDX_NET ] ].name, "quic_net" ) ) )
FD_LOG_ERR(( "quic tile has none or unexpected output links %lu %s %s",
tile->out_cnt, topo->links[ tile->out_link_id[ 0 ] ].name, topo->links[ tile->out_link_id[ 1 ] ].name ));

Expand Down Expand Up @@ -549,10 +536,6 @@ unprivileged_init( fd_topo_t * topo,

fd_topo_link_t * net_out = &topo->links[ tile->out_link_id[ 1 ] ];

ctx->net_out_mcache = net_out->mcache;
ctx->net_out_sync = fd_mcache_seq_laddr( ctx->net_out_mcache );
ctx->net_out_depth = fd_mcache_depth( ctx->net_out_mcache );
ctx->net_out_seq = fd_mcache_seq_query( ctx->net_out_sync );
ctx->net_out_mem = topo->workspaces[ topo->objs[ net_out->dcache_obj_id ].wksp_id ].wksp;
ctx->net_out_chunk0 = fd_dcache_compact_chunk0( ctx->net_out_mem, net_out->dcache );
ctx->net_out_wmark = fd_dcache_compact_wmark ( ctx->net_out_mem, net_out->dcache, net_out->mtu );
Expand Down Expand Up @@ -615,12 +598,11 @@ populate_allowed_fds( fd_topo_t const * topo,
#define STEM_CALLBACK_CONTEXT_TYPE fd_quic_ctx_t
#define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_quic_ctx_t)

#define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
#define STEM_CALLBACK_METRICS_WRITE metrics_write
#define STEM_CALLBACK_BEFORE_CREDIT before_credit
#define STEM_CALLBACK_BEFORE_FRAG before_frag
#define STEM_CALLBACK_DURING_FRAG during_frag
#define STEM_CALLBACK_AFTER_FRAG after_frag
#define STEM_CALLBACK_METRICS_WRITE metrics_write
#define STEM_CALLBACK_BEFORE_CREDIT before_credit
#define STEM_CALLBACK_BEFORE_FRAG before_frag
#define STEM_CALLBACK_DURING_FRAG during_frag
#define STEM_CALLBACK_AFTER_FRAG after_frag

#include "../stem/fd_stem.c"

Expand Down
5 changes: 0 additions & 5 deletions src/disco/quic/fd_quic_tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ typedef struct {

fd_net_rx_bounds_t net_in_bounds[ FD_QUIC_TILE_IN_MAX ];

fd_frag_meta_t * net_out_mcache;
ulong * net_out_sync;
ulong net_out_depth;
ulong net_out_seq;

fd_wksp_t * net_out_mem;
ulong net_out_chunk0;
ulong net_out_wmark;
Expand Down
24 changes: 8 additions & 16 deletions src/disco/shred/fd_shred_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,6 @@ typedef struct {
fd_shred_in_ctx_t in[ 32 ];
int in_kind[ 32 ];

fd_frag_meta_t * net_out_mcache;
ulong * net_out_sync;
ulong net_out_depth;
ulong net_out_seq;

fd_wksp_t * net_out_mem;
ulong net_out_chunk0;
ulong net_out_wmark;
Expand Down Expand Up @@ -620,6 +615,7 @@ during_frag( fd_shred_ctx_t * ctx,

static inline void
send_shred( fd_shred_ctx_t * ctx,
fd_stem_context_t * stem,
fd_shred_t const * shred,
fd_shred_dest_weighted_t const * dest,
ulong tsorig ) {
Expand Down Expand Up @@ -673,9 +669,9 @@ send_shred( fd_shred_ctx_t * ctx,
ulong pkt_sz = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
ulong sig = fd_disco_netmux_sig( dest->ip4, dest->port, dest->ip4, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
fd_mcache_publish( ctx->net_out_mcache, ctx->net_out_depth, ctx->net_out_seq, sig, ctx->net_out_chunk, pkt_sz, 0UL, tsorig, tspub );
ctx->net_out_seq = fd_seq_inc( ctx->net_out_seq, 1UL );
ctx->net_out_chunk = fd_dcache_compact_next( ctx->net_out_chunk, pkt_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
ulong const chunk = ctx->net_out_chunk;
fd_stem_publish( stem, NET_OUT_IDX, sig, chunk, pkt_sz, 0UL, tsorig, tspub );
ctx->net_out_chunk = fd_dcache_compact_next( chunk, pkt_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
}

static void
Expand Down Expand Up @@ -832,8 +828,8 @@ after_frag( fd_shred_ctx_t * ctx,
fd_shred_dest_idx_t * dests = fd_shred_dest_compute_children( sdest, &shred, 1UL, ctx->scratchpad_dests, 1UL, fanout, fanout, max_dest_cnt );
if( FD_UNLIKELY( !dests ) ) break;

send_shred( ctx, *out_shred, ctx->adtl_dest, ctx->tsorig );
for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, *out_shred, fd_shred_dest_idx_to_dest( sdest, dests[ j ]), ctx->tsorig );
send_shred( ctx, stem, *out_shred, ctx->adtl_dest, ctx->tsorig );
for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, stem, *out_shred, fd_shred_dest_idx_to_dest( sdest, dests[ j ]), ctx->tsorig );
} while( 0 );
}

Expand Down Expand Up @@ -985,8 +981,8 @@ after_frag( fd_shred_ctx_t * ctx,

/* Send only the ones we didn't receive. */
for( ulong i=0UL; i<k; i++ ) {
send_shred( ctx, new_shreds[ i ], ctx->adtl_dest, ctx->tsorig );
for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, new_shreds[ i ], fd_shred_dest_idx_to_dest( sdest, dests[ j*out_stride+i ]), ctx->tsorig );
send_shred( ctx, stem, new_shreds[ i ], ctx->adtl_dest, ctx->tsorig );
for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], fd_shred_dest_idx_to_dest( sdest, dests[ j*out_stride+i ]), ctx->tsorig );
}
}
}
Expand Down Expand Up @@ -1180,10 +1176,6 @@ unprivileged_init( fd_topo_t * topo,

fd_topo_link_t * net_out = &topo->links[ tile->out_link_id[ NET_OUT_IDX ] ];

ctx->net_out_mcache = net_out->mcache;
ctx->net_out_sync = fd_mcache_seq_laddr( ctx->net_out_mcache );
ctx->net_out_depth = fd_mcache_depth( ctx->net_out_mcache );
ctx->net_out_seq = fd_mcache_seq_query( ctx->net_out_sync );
ctx->net_out_chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( net_out->dcache ), net_out->dcache );
ctx->net_out_mem = topo->workspaces[ topo->objs[ net_out->dcache_obj_id ].wksp_id ].wksp;
ctx->net_out_wmark = fd_dcache_compact_wmark ( ctx->net_out_mem, net_out->dcache, net_out->mtu );
Expand Down
2 changes: 1 addition & 1 deletion src/discof/backtest/fd_backtest_tile.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "../../disco/tiles.h"
#include "../../disco/topo/fd_topo.h"
#include "../../disco/fd_disco.h"
#include "../../disco/stem/fd_stem.h"
#include "../../choreo/tower/fd_tower.h"
Expand Down
Loading
Loading