@@ -373,6 +373,58 @@ void shouldResetLockTimeoutAfterTransaction() {
373
373
.verifyComplete ();
374
374
}
375
375
376
+ @ Test
377
+ void shouldGetRowsUpdatedInUpsert () throws Exception {
378
+ createTable (connection );
379
+ String insertSql = "INSERT INTO r2dbc_example (id, first_name, last_name) VALUES(1, 'Jessi', 'Pinkmann')" ;
380
+
381
+
382
+ reactor .core .publisher .Flux <Long > flux = connection
383
+ .createStatement (insertSql )
384
+ .execute ()
385
+ .flatMap (MssqlResult ::getRowsUpdated );
386
+
387
+ java .util .concurrent .CompletableFuture <Long > future = reactor .core .publisher .Mono .from (flux ).toFuture ();
388
+ Long result = future .get (2 , java .util .concurrent .TimeUnit .SECONDS );
389
+ org .junit .jupiter .api .Assertions .assertEquals (1L , result );
390
+
391
+ String updateSql = "UPDATE r2dbc_example SET id = 1, first_name = 'Jesse', last_name = 'Pinkman' WHERE id = 1" ;
392
+ reactor .core .publisher .Flux <Long > fluxUpdate = connection
393
+ .createStatement (updateSql )
394
+ .execute ()
395
+ .flatMap (MssqlResult ::getRowsUpdated );
396
+ java .util .concurrent .CompletableFuture <Long > futureUpdate = reactor .core .publisher .Mono .from (fluxUpdate ).toFuture ();
397
+ Long resultUpdate = futureUpdate .get (2 , java .util .concurrent .TimeUnit .SECONDS );
398
+ org .junit .jupiter .api .Assertions .assertEquals (1L , resultUpdate );
399
+
400
+ //clean the state
401
+ createTable (connection );
402
+
403
+ String upsertSQl = updateSql + " if @@ROWCOUNT = 0 " + insertSql ;
404
+ reactor .core .publisher .Flux <Long > fluxUpsert = connection
405
+ .createStatement (upsertSQl )
406
+ .execute ()
407
+ .flatMap (MssqlResult ::getRowsUpdated );
408
+
409
+ java .util .concurrent .CompletableFuture <Long > futureUpsert = reactor .core .publisher .Mono .from (fluxUpsert ).toFuture ();
410
+ Long resultUpsert = futureUpsert .get (2 , java .util .concurrent .TimeUnit .SECONDS );
411
+
412
+ // should also have 1 updated row (from the insert part of the upsert)
413
+ org .junit .jupiter .api .Assertions .assertEquals (1L , resultUpsert );
414
+
415
+ reactor .core .publisher .Flux <Long > secondFluxUpsert = connection
416
+ .createStatement (upsertSQl )
417
+ .execute ()
418
+ .flatMap (MssqlResult ::getRowsUpdated );
419
+
420
+ java .util .concurrent .CompletableFuture <Long > secondFutureUpsert = reactor .core .publisher .Mono .from (secondFluxUpsert ).toFuture ();
421
+ Long secondResultUpsert = secondFutureUpsert .get (2 , java .util .concurrent .TimeUnit .SECONDS );
422
+
423
+ // should also have 1 updated row (from the update part of the upsert)
424
+ org .junit .jupiter .api .Assertions .assertEquals (1L , secondResultUpsert );
425
+ }
426
+
427
+
376
428
Mono <IsolationLevel > getIsolationLevel () {
377
429
378
430
return connection .createStatement ("SELECT CASE transaction_isolation_level \n " +
0 commit comments