@@ -1389,7 +1389,8 @@ mod tests {
13891389 ] ,
13901390 )
13911391 . unwrap ( ) ;
1392- // Use tokio runtime to call async push from sync test context
1392+
1393+ // For sync tests - create a new runtime
13931394 let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
13941395 rt. block_on ( staging. push (
13951396 "abc" ,
@@ -1401,6 +1402,33 @@ mod tests {
14011402 staging. flush ( true ) ;
14021403 }
14031404
1405+ // Async version for async tests (like #[tokio::test])
1406+ async fn write_log_async ( staging : & StreamRef , schema : & Schema , mins : i64 ) {
1407+ let time: NaiveDateTime = Utc :: now ( )
1408+ . checked_sub_signed ( TimeDelta :: minutes ( mins) )
1409+ . unwrap ( )
1410+ . naive_utc ( ) ;
1411+ let batch = RecordBatch :: try_new (
1412+ Arc :: new ( schema. clone ( ) ) ,
1413+ vec ! [
1414+ Arc :: new( TimestampMillisecondArray :: from( vec![ 1 , 2 , 3 ] ) ) ,
1415+ Arc :: new( Int32Array :: from( vec![ 1 , 2 , 3 ] ) ) ,
1416+ Arc :: new( StringArray :: from( vec![ "a" , "b" , "c" ] ) ) ,
1417+ ] ,
1418+ )
1419+ . unwrap ( ) ;
1420+
1421+ // Direct await - we're already in an async context
1422+ staging. push (
1423+ "abc" ,
1424+ & batch,
1425+ time,
1426+ & HashMap :: new ( ) ,
1427+ StreamType :: UserDefined ,
1428+ ) . await . unwrap ( ) ;
1429+ staging. flush ( true ) ;
1430+ }
1431+
14041432 #[ test]
14051433 fn different_minutes_multiple_arrow_files_to_parquet ( ) {
14061434 let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
@@ -1529,11 +1557,11 @@ mod tests {
15291557
15301558 // 2 logs in the previous minutes
15311559 for i in 0 ..2 {
1532- write_log ( & staging, & schema, i) ;
1560+ write_log_async ( & staging, & schema, i) . await ;
15331561 }
15341562 sleep ( Duration :: from_secs ( 60 ) ) . await ;
15351563
1536- write_log ( & staging, & schema, 0 ) ;
1564+ write_log_async ( & staging, & schema, 0 ) . await ;
15371565
15381566 // verify the arrow files exist in staging
15391567 assert_eq ! ( staging. arrow_files( ) . len( ) , 3 ) ;
0 commit comments