@@ -19,7 +19,7 @@ use prism_storage::{
19
19
use rand:: { rngs:: StdRng , Rng , SeedableRng } ;
20
20
use sp1_sdk:: { HashableKey , Prover as _, ProverClient } ;
21
21
use std:: sync:: Arc ;
22
- use tokio:: { spawn, time:: Duration } ;
22
+ use tokio:: { spawn, sync :: mpsc , time:: Duration } ;
23
23
24
24
use tempfile:: TempDir ;
25
25
@@ -76,6 +76,7 @@ async fn test_light_client_prover_talking() -> Result<()> {
76
76
) ?) ;
77
77
78
78
let event_channel = EventChannel :: new ( ) ;
79
+ let ( shutdown_tx, mut shutdown_rx) = mpsc:: channel :: < ( ) > ( 1 ) ;
79
80
80
81
let lightclient = Arc :: new ( LightClient :: new (
81
82
lc_da_layer. clone ( ) ,
@@ -86,18 +87,18 @@ async fn test_light_client_prover_talking() -> Result<()> {
86
87
) ) ;
87
88
88
89
let prover_clone = prover. clone ( ) ;
89
- spawn ( async move {
90
+ let _prover_handle = spawn ( async move {
90
91
debug ! ( "starting prover" ) ;
91
92
prover_clone. run ( ) . await . unwrap ( ) ;
92
93
} ) ;
93
94
94
95
let lc_clone = lightclient. clone ( ) ;
95
- spawn ( async move {
96
+ let _lc_handle = spawn ( async move {
96
97
debug ! ( "starting light client" ) ;
97
98
lc_clone. run ( ) . await . unwrap ( ) ;
98
99
} ) ;
99
100
100
- spawn ( async move {
101
+ let tx_handle = spawn ( async move {
101
102
let mut transaction_builder = TestTransactionBuilder :: new ( ) ;
102
103
let register_service_req = transaction_builder
103
104
. register_service_with_random_keys ( service_algorithm, "test_service" )
@@ -109,6 +110,12 @@ async fn test_light_client_prover_talking() -> Result<()> {
109
110
let mut added_account_ids: Vec < String > = Vec :: new ( ) ;
110
111
111
112
loop {
113
+ // Check if we should shut down
114
+ if shutdown_rx. try_recv ( ) . is_ok ( ) {
115
+ debug ! ( "Transaction generator received shutdown signal" ) ;
116
+ break ;
117
+ }
118
+
112
119
// Create 1 to 3 new accounts
113
120
let num_new_accounts = rng. gen_range ( 1 ..=3 ) ;
114
121
for _ in 0 ..num_new_accounts {
@@ -173,14 +180,32 @@ async fn test_light_client_prover_talking() -> Result<()> {
173
180
}
174
181
} ) ;
175
182
176
- let mut rx = lc_da_layer. clone ( ) . subscribe_to_heights ( ) ;
177
- let initial_height = rx. recv ( ) . await . unwrap ( ) ;
178
- while let Ok ( height) = rx. recv ( ) . await {
179
- debug ! ( "received height {}" , height) ;
180
- if height >= initial_height + 50 {
181
- break ;
183
+ // Monitor height and stop test when target height is reached
184
+ let height_monitor = spawn ( async move {
185
+ let mut rx = bridge_da_layer. clone ( ) . subscribe_to_heights ( ) ;
186
+ let initial_height = rx. recv ( ) . await . unwrap ( ) ;
187
+ debug ! ( "Initial height: {}" , initial_height) ;
188
+ let target_height = initial_height + 50 ;
189
+
190
+ while let Ok ( height) = rx. recv ( ) . await {
191
+ debug ! ( "Received height {}" , height) ;
192
+
193
+ if height >= target_height {
194
+ info ! ( "Reached target height {}. Stopping test." , target_height) ;
195
+ let _ = shutdown_tx. send ( ( ) ) . await ;
196
+ break ;
197
+ }
182
198
}
183
- }
199
+ } ) ;
200
+
201
+ // Wait for height monitor to complete
202
+ height_monitor. await ?;
203
+
204
+ // Wait for transaction generator to complete
205
+ tx_handle. await ?;
206
+
207
+ // We could add code to gracefully shut down the prover and light client here
208
+ // but for test purposes, we'll just return
184
209
185
210
Ok ( ( ) )
186
211
}
0 commit comments