@@ -7,16 +7,6 @@ use std::{
7
7
time:: Duration ,
8
8
} ;
9
9
10
- use futures:: { FutureExt , StreamExt } ;
11
- use indexmap:: IndexMap ;
12
- use monero_serai:: {
13
- block:: { Block , BlockHeader } ,
14
- transaction:: { Input , Timelock , Transaction , TransactionPrefix } ,
15
- } ;
16
- use proptest:: { collection:: vec, prelude:: * } ;
17
- use tokio:: time:: timeout;
18
- use tower:: { service_fn, Service } ;
19
-
20
10
use cuprate_fixed_bytes:: ByteArrayVec ;
21
11
use cuprate_p2p_core:: {
22
12
client:: { mock_client, Client , InternalPeerID , PeerInformation } ,
@@ -28,10 +18,21 @@ use cuprate_wire::{
28
18
protocol:: { ChainResponse , GetObjectsResponse } ,
29
19
CoreSyncData ,
30
20
} ;
21
+ use futures:: { FutureExt , StreamExt } ;
22
+ use indexmap:: IndexMap ;
23
+ use monero_serai:: {
24
+ block:: { Block , BlockHeader } ,
25
+ transaction:: { Input , Timelock , Transaction , TransactionPrefix } ,
26
+ } ;
27
+ use proptest:: { collection:: vec, prelude:: * } ;
28
+ use tokio:: sync:: mpsc;
29
+ use tokio:: time:: timeout;
30
+ use tower:: buffer:: Buffer ;
31
+ use tower:: { service_fn, Service , ServiceExt } ;
31
32
32
33
use crate :: {
33
34
block_downloader:: { download_blocks, BlockDownloaderConfig , ChainSvcRequest , ChainSvcResponse } ,
34
- client_pool :: ClientPool ,
35
+ peer_set :: PeerSet ,
35
36
} ;
36
37
37
38
proptest ! {
@@ -51,16 +52,18 @@ proptest! {
51
52
#[ expect( clippy:: significant_drop_tightening) ]
52
53
tokio_pool. block_on( async move {
53
54
timeout( Duration :: from_secs( 600 ) , async move {
54
- let client_pool = ClientPool :: new( ) ;
55
+ let ( new_connection_tx, new_connection_rx) = mpsc:: channel( peers) ;
56
+
57
+ let peer_set = PeerSet :: new( new_connection_rx) ;
55
58
56
59
for _ in 0 ..peers {
57
60
let client = mock_block_downloader_client( Arc :: clone( & blockchain) ) ;
58
61
59
- client_pool . add_new_client ( client) ;
62
+ new_connection_tx . try_send ( client) . unwrap ( ) ;
60
63
}
61
64
62
65
let stream = download_blocks(
63
- client_pool ,
66
+ Buffer :: new ( peer_set , 10 ) . boxed_clone ( ) ,
64
67
OurChainSvc {
65
68
genesis: * blockchain. blocks. first( ) . unwrap( ) . 0
66
69
} ,
0 commit comments