@@ -373,6 +373,79 @@ void shouldResetLockTimeoutAfterTransaction() {
373
373
.verifyComplete ();
374
374
}
375
375
376
+ @ Test
377
+ void shouldGetRowsUpdatedForUpsert () {
378
+ createTable (connection );
379
+
380
+ String upsertSQl = "UPDATE r2dbc_example SET id = 1, first_name = 'Jesse', last_name = 'Pinkman' WHERE id = 1 if @@ROWCOUNT = 0 " +
381
+ "INSERT INTO r2dbc_example (id, first_name, last_name)VALUES(1, 'Jesse', 'Pinkmann')" ;
382
+
383
+ // should trigger the "INSERT" part of the query
384
+ connection .createStatement (upsertSQl )
385
+ .execute ().flatMap (Result ::getRowsUpdated )
386
+ .as (StepVerifier ::create )
387
+ .expectNext (1L )
388
+ .verifyComplete ();
389
+
390
+ // should trigger the "UPDATE" part of the query
391
+ connection .createStatement (upsertSQl )
392
+ .execute ().flatMap (Result ::getRowsUpdated )
393
+ .as (StepVerifier ::create )
394
+ .expectNext (1L )
395
+ .verifyComplete ();
396
+ }
397
+
398
+ @ Test
399
+ void shouldGetRowsUpdatedInUpsert () throws Exception {
400
+ createTable (connection );
401
+ String insertSql = "INSERT INTO r2dbc_example (id, first_name, last_name) VALUES(1, 'Jessi', 'Pinkmann')" ;
402
+
403
+ reactor .core .publisher .Flux <Long > flux = connection
404
+ .createStatement (insertSql )
405
+ .execute ()
406
+ .flatMap (MssqlResult ::getRowsUpdated );
407
+
408
+ java .util .concurrent .CompletableFuture <Long > future = reactor .core .publisher .Mono .from (flux ).toFuture ();
409
+ Long result = future .get (2 , java .util .concurrent .TimeUnit .SECONDS );
410
+ org .junit .jupiter .api .Assertions .assertEquals (1L , result );
411
+
412
+ String updateSql = "UPDATE r2dbc_example SET id = 1, first_name = 'Jesse', last_name = 'Pinkman' WHERE id = 1" ;
413
+ reactor .core .publisher .Flux <Long > fluxUpdate = connection
414
+ .createStatement (updateSql )
415
+ .execute ()
416
+ .flatMap (MssqlResult ::getRowsUpdated );
417
+ java .util .concurrent .CompletableFuture <Long > futureUpdate = reactor .core .publisher .Mono .from (fluxUpdate ).toFuture ();
418
+ Long resultUpdate = futureUpdate .get (2 , java .util .concurrent .TimeUnit .SECONDS );
419
+ org .junit .jupiter .api .Assertions .assertEquals (1L , resultUpdate );
420
+
421
+ //clean the state
422
+ createTable (connection );
423
+
424
+ String upsertSQl = updateSql + " if @@ROWCOUNT = 0 " + insertSql ;
425
+ reactor .core .publisher .Flux <Long > fluxUpsert = connection
426
+ .createStatement (upsertSQl )
427
+ .execute ()
428
+ .flatMap (MssqlResult ::getRowsUpdated );
429
+
430
+ java .util .concurrent .CompletableFuture <Long > futureUpsert = reactor .core .publisher .Mono .from (fluxUpsert ).toFuture ();
431
+ Long resultUpsert = futureUpsert .get (2 , java .util .concurrent .TimeUnit .SECONDS );
432
+
433
+ // should also have 1 updated row (from the insert part of the upsert)
434
+ org .junit .jupiter .api .Assertions .assertEquals (1L , resultUpsert );
435
+
436
+ reactor .core .publisher .Flux <Long > secondFluxUpsert = connection
437
+ .createStatement (upsertSQl )
438
+ .execute ()
439
+ .flatMap (MssqlResult ::getRowsUpdated );
440
+
441
+ java .util .concurrent .CompletableFuture <Long > secondFutureUpsert = reactor .core .publisher .Mono .from (secondFluxUpsert ).toFuture ();
442
+ Long secondResultUpsert = secondFutureUpsert .get (2 , java .util .concurrent .TimeUnit .SECONDS );
443
+
444
+ // should also have 1 updated row (from the update part of the upsert)
445
+ org .junit .jupiter .api .Assertions .assertEquals (1L , secondResultUpsert );
446
+ }
447
+
448
+
376
449
Mono <IsolationLevel > getIsolationLevel () {
377
450
378
451
return connection .createStatement ("SELECT CASE transaction_isolation_level \n " +
0 commit comments