diff --git a/src/test/java/io/r2dbc/mssql/TransactionIntegrationTests.java b/src/test/java/io/r2dbc/mssql/TransactionIntegrationTests.java index c89e5ab..ac3e285 100644 --- a/src/test/java/io/r2dbc/mssql/TransactionIntegrationTests.java +++ b/src/test/java/io/r2dbc/mssql/TransactionIntegrationTests.java @@ -373,6 +373,58 @@ void shouldResetLockTimeoutAfterTransaction() { .verifyComplete(); } + @Test + void shouldGetRowsUpdatedInUpsert() throws Exception { + createTable(connection); + String insertSql = "INSERT INTO r2dbc_example (id, first_name, last_name) VALUES(1, 'Jessi', 'Pinkmann')"; + + + reactor.core.publisher.Flux flux = connection + .createStatement(insertSql) + .execute() + .flatMap(MssqlResult::getRowsUpdated); + + java.util.concurrent.CompletableFuture future = reactor.core.publisher.Mono.from(flux).toFuture(); + Long result = future.get(2, java.util.concurrent.TimeUnit.SECONDS); + org.junit.jupiter.api.Assertions.assertEquals(1L, result); + + String updateSql = "UPDATE r2dbc_example SET id = 1, first_name = 'Jesse', last_name = 'Pinkman' WHERE id = 1"; + reactor.core.publisher.Flux fluxUpdate = connection + .createStatement(updateSql) + .execute() + .flatMap(MssqlResult::getRowsUpdated); + java.util.concurrent.CompletableFuture futureUpdate = reactor.core.publisher.Mono.from(fluxUpdate).toFuture(); + Long resultUpdate = futureUpdate.get(2, java.util.concurrent.TimeUnit.SECONDS); + org.junit.jupiter.api.Assertions.assertEquals(1L, resultUpdate); + + //clean the state + createTable(connection); + + String upsertSQl = updateSql + " if @@ROWCOUNT = 0 " + insertSql; + reactor.core.publisher.Flux fluxUpsert = connection + .createStatement(upsertSQl) + .execute() + .flatMap(MssqlResult::getRowsUpdated); + + java.util.concurrent.CompletableFuture futureUpsert = reactor.core.publisher.Mono.from(fluxUpsert).toFuture(); + Long resultUpsert = futureUpsert.get(2, java.util.concurrent.TimeUnit.SECONDS); + + // should also have 1 updated row (from the insert part of the upsert) + org.junit.jupiter.api.Assertions.assertEquals(1L, resultUpsert); + + reactor.core.publisher.Flux secondFluxUpsert = connection + .createStatement(upsertSQl) + .execute() + .flatMap(MssqlResult::getRowsUpdated); + + java.util.concurrent.CompletableFuture secondFutureUpsert = reactor.core.publisher.Mono.from(secondFluxUpsert).toFuture(); + Long secondResultUpsert = secondFutureUpsert.get(2, java.util.concurrent.TimeUnit.SECONDS); + + // should also have 1 updated row (from the update part of the upsert) + org.junit.jupiter.api.Assertions.assertEquals(1L, secondResultUpsert); + } + + Mono getIsolationLevel() { return connection.createStatement("SELECT CASE transaction_isolation_level \n" +