Skip to content

Commit d0228c2

Browse files
committed
feat(replay): replay incoming batches from repair tile
1 parent c71a3a1 commit d0228c2

File tree

8 files changed

+32
-26
lines changed

8 files changed

+32
-26
lines changed

src/disco/fd_disco_base.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,16 +236,17 @@ FD_FN_CONST static inline uint fd_disco_repair_shred_sig_last_shred_idx( ulong s
236236

237237

238238
FD_FN_CONST static inline ulong
239-
fd_disco_repair_replay_sig( ulong slot, uint data_cnt, ushort parent_off, int slot_complete ) {
239+
fd_disco_repair_replay_sig( ulong slot, ushort parent_off, uint data_cnt, int slot_complete ) {
240240
/*
241-
| slot (32) | data_cnt (15) | parent_off (15) | slot_complete(1)
242-
| [32, 63] | [17, 31] | [1, 16] | [0]
241+
| slot (32) | parent_off (16) | data_cnt (15) | slot_complete(1)
242+
| [32, 63] | [16, 31] | [1, 16] | [0]
243243
*/
244244
ulong slot_ul = fd_ulong_min( slot, (ulong)UINT_MAX );
245245
ulong data_cnt_ul = fd_ulong_min( (ulong)data_cnt, (ulong)FD_SHRED_BLK_MAX );
246246
ulong parent_off_ul = (ulong)parent_off;
247+
ulong data_cnt_ul = fd_ulong_min( (ulong)data_cnt, (ulong)FD_SHRED_BLK_MAX );
247248
ulong slot_complete_ul = !!slot_complete;
248-
return slot_ul << 32 | data_cnt_ul << 17 | parent_off_ul << 1 | slot_complete_ul;
249+
return slot_ul << 32 | parent_off_ul << 16 | data_cnt_ul << 1 | slot_complete_ul;
249250
}
250251

251252
FD_FN_CONST static inline ulong fd_disco_repair_replay_sig_slot ( ulong sig ) { return fd_ulong_extract ( sig, 32, 63 ); }

src/disco/net/xdp/fd_xdp_tile.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -752,9 +752,8 @@ net_rx_packet( fd_net_ctx_t * ctx,
752752
out = ctx->gossip_out;
753753
} else if( FD_UNLIKELY( udp_dstport==ctx->repair_intake_listen_port ) ) {
754754
proto = DST_PROTO_REPAIR;
755-
out = ctx->repair_out;
756-
// if( FD_UNLIKELY( sz == REPAIR_PING_SZ ) ) out = ctx->repair_out; /* ping-pong */
757-
// else out = ctx->shred_out;
755+
if( FD_UNLIKELY( sz == REPAIR_PING_SZ ) ) out = ctx->repair_out; /* ping-pong */
756+
else out = ctx->shred_out;
758757
} else if( FD_UNLIKELY( udp_dstport==ctx->repair_serve_listen_port ) ) {
759758
proto = DST_PROTO_REPAIR;
760759
out = ctx->repair_out;

src/disco/shred/fd_shred_tile.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,8 @@ during_frag( fd_shred_ctx_t * ctx,
506506
uchar const * dcache_entry = (uchar const *)fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk ) + ctl;
507507
ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
508508
FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
509-
fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
509+
ulong repair_nonce_sz = fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR*4;
510+
fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz-repair_nonce_sz );
510511
if( FD_UNLIKELY( !shred ) ) {
511512
ctx->skip_frag = 1;
512513
return;
@@ -520,7 +521,7 @@ during_frag( fd_shred_ctx_t * ctx,
520521
return;
521522
}
522523
fd_memcpy( ctx->shred_buffer, dcache_entry+hdr_sz, sz-hdr_sz );
523-
ctx->shred_buffer_sz = sz-hdr_sz;
524+
ctx->shred_buffer_sz = sz-hdr_sz-repair_nonce_sz;
524525
}
525526
}
526527

src/discof/repair/fd_fec_chainer.c

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,15 @@ link_orphans( fd_fec_chainer_t * chainer ) {
193193
/* Verify the chained merkle root. */
194194

195195
if ( FD_UNLIKELY( memcmp( ele->chained_merkle_root, parent->merkle_root, FD_SHRED_MERKLE_ROOT_SZ ) ) ) {
196-
fd_fec_out_push_tail( chainer->out, (fd_fec_out_t){ .slot = ele->slot, .fec_set_idx = ele->fec_set_idx, .err = FD_FEC_CHAINER_ERR_MERKLE } );
196+
fd_fec_out_push_tail( chainer->out, (fd_fec_out_t){ .slot = ele->slot, .parent_off = ele->parent_off, .fec_set_idx = ele->fec_set_idx, .data_cnt = ele->data_cnt, .data_complete = ele->data_complete, .slot_complete = ele->slot_complete, .err = FD_FEC_CHAINER_SUCCESS } );
197197
continue;
198198
}
199199

200200
/* Insert into frontier (ele is either advancing a fork or starting
201201
a new fork) and deliver to `out`. */
202202

203203
fd_fec_frontier_ele_insert( chainer->frontier, ele, chainer->pool );
204-
fd_fec_out_push_tail( chainer->out, (fd_fec_out_t){ .slot = ele->slot, .fec_set_idx = ele->fec_set_idx, .err = FD_FEC_CHAINER_SUCCESS } );
204+
fd_fec_out_push_tail( chainer->out, (fd_fec_out_t){ .slot = ele->slot, .parent_off = ele->parent_off, .fec_set_idx = ele->fec_set_idx, .data_cnt = ele->data_cnt, .data_complete = ele->data_complete, .slot_complete = ele->slot_complete, .err = FD_FEC_CHAINER_SUCCESS } );
205205

206206
/* Check whether any of ele's children are orphaned and can be
207207
chained into the frontier. */
@@ -232,7 +232,7 @@ fd_fec_ele_t *
232232
fd_fec_chainer_insert( fd_fec_chainer_t * chainer,
233233
ulong slot,
234234
uint fec_set_idx,
235-
uint data_cnt,
235+
ushort data_cnt,
236236
int data_complete,
237237
int slot_complete,
238238
ushort parent_off,
@@ -241,7 +241,7 @@ fd_fec_chainer_insert( fd_fec_chainer_t * chainer,
241241
ulong key = slot << 32 | fec_set_idx;
242242

243243
if( FD_UNLIKELY( fd_fec_chainer_query( chainer, slot, fec_set_idx ) ) ) {
244-
fd_fec_out_push_tail( chainer->out, (fd_fec_out_t){ slot, fec_set_idx, .err = FD_FEC_CHAINER_ERR_UNIQUE } );
244+
fd_fec_out_push_tail( chainer->out, (fd_fec_out_t){ slot, parent_off, fec_set_idx, data_cnt, data_complete, slot_complete, .err = FD_FEC_CHAINER_ERR_UNIQUE } );
245245
return NULL;
246246
}
247247

@@ -300,7 +300,6 @@ fd_fec_chainer_insert( fd_fec_chainer_t * chainer,
300300
fd_fec_parent_t * parent = fd_fec_parents_insert( chainer->parents, child_key );
301301
parent->parent_key = key;
302302
} else {
303-
FD_LOG_WARNING(( "already inserted %lu %u", slot, fec_set_idx + data_cnt ));
304303
fd_fec_parent_t * parent = fd_fec_parents_query( chainer->parents, child_key, NULL );
305304
FD_TEST( parent->parent_key == key );
306305
}

src/discof/repair/fd_fec_chainer.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ struct fd_fec_ele {
176176

177177
ulong slot;
178178
uint fec_set_idx;
179-
uint data_cnt;
179+
ushort data_cnt;
180180
int data_complete;
181181
int slot_complete;
182182
ushort parent_off;
@@ -242,9 +242,13 @@ typedef struct fd_fec_children fd_fec_children_t;
242242
#include "../../util/tmpl/fd_deque_dynamic.c"
243243

244244
struct fd_fec_out {
245-
ulong slot;
246-
uint fec_set_idx;
247-
int err;
245+
ulong slot;
246+
ushort parent_off;
247+
uint fec_set_idx;
248+
ushort data_cnt;
249+
int data_complete;
250+
int slot_complete;
251+
int err;
248252
};
249253
typedef struct fd_fec_out fd_fec_out_t;
250254

@@ -354,7 +358,7 @@ fd_fec_ele_t *
354358
fd_fec_chainer_insert( fd_fec_chainer_t * chainer,
355359
ulong slot,
356360
uint fec_set_idx,
357-
uint data_cnt,
361+
ushort data_cnt,
358362
int data_complete,
359363
int slot_complete,
360364
ushort parent_off,

src/discof/repair/fd_repair_tile.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx,
231231
}
232232
}
233233

234-
static inline void
234+
FD_FN_UNUSED static inline void
235235
handle_new_repair_requests( fd_repair_tile_ctx_t * ctx,
236236
uchar const * buf,
237237
ulong buf_sz ) {
@@ -495,7 +495,7 @@ after_frag( fd_repair_tile_ctx_t * ctx,
495495
}
496496

497497
if( FD_UNLIKELY( in_kind==IN_KIND_STORE ) ) {
498-
handle_new_repair_requests( ctx, ctx->buffer, sz );
498+
// handle_new_repair_requests( ctx, ctx->buffer, sz );
499499
return;
500500
}
501501

@@ -533,7 +533,7 @@ after_frag( fd_repair_tile_ctx_t * ctx,
533533
FD_TEST( fd_fec_pool_free( ctx->fec_chainer->pool ) );
534534
int data_complete = shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE;
535535
int slot_complete = shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE;
536-
FD_TEST( fd_fec_chainer_insert( ctx->fec_chainer, shred->slot, shred->fec_set_idx, fec->data_cnt, data_complete, slot_complete, shred->data.parent_off, merkle, merkle ) );
536+
FD_TEST( fd_fec_chainer_insert( ctx->fec_chainer, shred->slot, shred->fec_set_idx, (ushort)fec->data_cnt, data_complete, slot_complete, shred->data.parent_off, merkle, merkle ) );
537537
fd_fec_repair_remove( ctx->fec_repair, fec->key );
538538
/* TODO set range ops */
539539
for( uint idx = shred->fec_set_idx; idx < shred->fec_set_idx + fec->data_cnt; idx++ ) {

src/discof/store/fd_storei_tile.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ during_frag( fd_store_tile_ctx_t * ctx,
202202
ulong chunk,
203203
ulong sz,
204204
ulong ctl FD_PARAM_UNUSED ) {
205+
return;
205206
if( FD_UNLIKELY( in_idx==STAKE_IN_IDX ) ) {
206207
if( FD_UNLIKELY( chunk<ctx->stake_in_chunk0 || chunk>ctx->stake_in_wmark ) )
207208
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
@@ -259,6 +260,7 @@ after_frag( fd_store_tile_ctx_t * ctx,
259260
ulong tsorig FD_PARAM_UNUSED,
260261
ulong tspub FD_PARAM_UNUSED,
261262
fd_stem_context_t * stem FD_PARAM_UNUSED ) {
263+
return;
262264
if( FD_UNLIKELY( in_idx==STAKE_IN_IDX ) ) {
263265
fd_stake_ci_stake_msg_fini( ctx->stake_ci );
264266
return;
@@ -309,7 +311,7 @@ after_frag( fd_store_tile_ctx_t * ctx,
309311
}
310312

311313
/* everything else is shred */
312-
FD_TEST( (ctx->s34_buffer->shred_cnt>0UL) & (ctx->s34_buffer->shred_cnt<=34UL) );
314+
//FD_TEST( (ctx->s34_buffer->shred_cnt>0UL) & (ctx->s34_buffer->shred_cnt<=34UL) );
313315

314316
if( FD_UNLIKELY( ctx->is_trusted ) ) {
315317
/* this slot is coming from our leader pipeline */
@@ -469,7 +471,7 @@ fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx,
469471
if( FD_UNLIKELY( fd_block_set_test( data_complete_idxs, idx ) ) ) {
470472
uint data_cnt = consumed_idx != UINT_MAX ? idx - consumed_idx : idx + 1;
471473

472-
replay_sig = fd_disco_repair_replay_sig( slot, data_cnt, (ushort)( slot - parent_slot ), complete_idx == idx );
474+
replay_sig = fd_disco_repair_replay_sig( slot, (ushort)( slot - parent_slot ), data_cnt, complete_idx == idx );
473475
fd_stem_publish( stem, REPLAY_OUT_IDX, replay_sig, ctx->replay_out_chunk, 0, 0UL, tsorig, tspub );
474476
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, 0, ctx->replay_out_chunk0, ctx->replay_out_wmark );
475477
consumed_idx = idx;

src/flamenco/repair/fd_repair.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,7 +1067,7 @@ fd_repair_create_needed_request( fd_repair_t * glob, int type, ulong slot, uint
10671067
if( dupelem == NULL ) {
10681068
dupelem = fd_dupdetect_table_insert( glob->dupdetect, &dupkey );
10691069
dupelem->last_send_time = 0L;
1070-
} else if( ( dupelem->last_send_time+(long)200e6 )<glob->now ) {
1070+
} else if( ( dupelem->last_send_time+(long)100e6 )<glob->now ) {
10711071
fd_repair_unlock( glob );
10721072
return 0;
10731073
}
@@ -1077,7 +1077,7 @@ fd_repair_create_needed_request( fd_repair_t * glob, int type, ulong slot, uint
10771077

10781078
if (fd_needed_table_is_full(glob->needed)) {
10791079
fd_repair_unlock( glob );
1080-
FD_LOG_NOTICE(("table full"));
1080+
// FD_LOG_NOTICE(("table full"));
10811081
( *glob->deliver_fail_fun )(ids[0], slot, shred_index, glob->fun_arg, FD_REPAIR_DELIVER_FAIL_REQ_LIMIT_EXCEEDED );
10821082
return -1;
10831083
}

0 commit comments

Comments
 (0)