Skip to content

Fix tx modes #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 15 additions & 22 deletions tests/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,39 @@ def _test_isolation_level_read_only(
isolation_level: str,
read_only: bool,
) -> None:
connection.set_isolation_level("AUTOCOMMIT")
cursor = connection.cursor()
with suppress(dbapi.DatabaseError):
maybe_await(cursor.execute("DROP TABLE foo"))

cursor = connection.cursor()
maybe_await(cursor.execute(
maybe_await(cursor.execute_scheme("DROP TABLE foo"))
maybe_await(cursor.execute_scheme(
"CREATE TABLE foo(id Int64 NOT NULL, PRIMARY KEY (id))"
))

connection.set_isolation_level(isolation_level)
cursor = connection.cursor()

query = "UPSERT INTO foo(id) VALUES (1)"
if read_only:
with pytest.raises(dbapi.DatabaseError):
maybe_await(cursor.execute(query))

else:
maybe_await(cursor.execute(query))

maybe_await(connection.rollback())

connection.set_isolation_level("AUTOCOMMIT")

cursor = connection.cursor()

maybe_await(cursor.execute("DROP TABLE foo"))
maybe_await(cursor.execute_scheme("DROP TABLE foo"))

def _test_connection(self, connection: dbapi.Connection) -> None:
maybe_await(connection.commit())
maybe_await(connection.rollback())

cur = connection.cursor()
with suppress(dbapi.DatabaseError):
maybe_await(cur.execute("DROP TABLE foo"))
maybe_await(cur.execute_scheme("DROP TABLE foo"))

assert not maybe_await(connection.check_exists("/local/foo"))
with pytest.raises(dbapi.ProgrammingError):
maybe_await(connection.describe("/local/foo"))

maybe_await(cur.execute(
maybe_await(cur.execute_scheme(
"CREATE TABLE foo(id Int64 NOT NULL, PRIMARY KEY (id))"
))

Expand All @@ -77,17 +68,17 @@ def _test_connection(self, connection: dbapi.Connection) -> None:
assert col.name == "id"
assert col.type == ydb.PrimitiveType.Int64

maybe_await(cur.execute("DROP TABLE foo"))
maybe_await(cur.execute_scheme("DROP TABLE foo"))
maybe_await(cur.close())

def _test_cursor_raw_query(self, connection: dbapi.Connection) -> None:
cur = connection.cursor()
assert cur

with suppress(dbapi.DatabaseError):
maybe_await(cur.execute("DROP TABLE test"))
maybe_await(cur.execute_scheme("DROP TABLE test"))

maybe_await(cur.execute(
maybe_await(cur.execute_scheme(
"CREATE TABLE test(id Int64 NOT NULL, text Utf8, PRIMARY KEY (id))"
))

Expand All @@ -112,7 +103,7 @@ def _test_cursor_raw_query(self, connection: dbapi.Connection) -> None:
},
))

maybe_await(cur.execute("DROP TABLE test"))
maybe_await(cur.execute_scheme("DROP TABLE test"))

maybe_await(cur.close())

Expand All @@ -130,7 +121,7 @@ def _test_errors(
cur = connection.cursor()

with suppress(dbapi.DatabaseError):
maybe_await(cur.execute("DROP TABLE test"))
maybe_await(cur.execute_scheme("DROP TABLE test"))

with pytest.raises(dbapi.DataError):
maybe_await(cur.execute("SELECT 18446744073709551616"))
Expand All @@ -144,7 +135,7 @@ def _test_errors(
with pytest.raises(dbapi.ProgrammingError):
maybe_await(cur.execute("SELECT * FROM test"))

maybe_await(cur.execute(
maybe_await(cur.execute_scheme(
"CREATE TABLE test(id Int64, PRIMARY KEY (id))"
))

Expand All @@ -153,7 +144,7 @@ def _test_errors(
with pytest.raises(dbapi.IntegrityError):
maybe_await(cur.execute("INSERT INTO test(id) VALUES(1)"))

maybe_await(cur.execute("DROP TABLE test"))
maybe_await(cur.execute_scheme("DROP TABLE test"))
maybe_await(cur.close())


Expand Down Expand Up @@ -211,7 +202,9 @@ def connect() -> dbapi.AsyncConnection:
try:
yield conn
finally:
await greenlet_spawn(conn.close)
def close() -> None:
maybe_await(conn.close())
await greenlet_spawn(close)

@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down
4 changes: 2 additions & 2 deletions tests/test_cursors.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _test_cursor_fetch_all_multiple_result_sets(
class TestCursor(BaseCursorTestSuit):
@pytest.fixture
def sync_cursor(self, session_sync: ydb.QuerySession) -> Generator[Cursor]:
cursor = Cursor(session_sync)
cursor = Cursor(session_sync, ydb.QuerySerializableReadWrite())
yield cursor
cursor.close()

Expand Down Expand Up @@ -175,7 +175,7 @@ class TestAsyncCursor(BaseCursorTestSuit):
async def async_cursor(
self, session: ydb.aio.QuerySession
) -> AsyncGenerator[Cursor]:
cursor = AsyncCursor(session)
cursor = AsyncCursor(session, ydb.QuerySerializableReadWrite())
yield cursor
await greenlet_spawn(cursor.close)

Expand Down
43 changes: 26 additions & 17 deletions ydb_dbapi/connections.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: commit and rollback could raise exception in case of TLI or BrokenSession. I think we should wrap them to interface exceptions

Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ class _IsolationSettings(NamedTuple):
ydb.QuerySerializableReadWrite(), interactive=True
),
IsolationLevel.ONLINE_READONLY: _IsolationSettings(
ydb.QueryOnlineReadOnly(), interactive=True
ydb.QueryOnlineReadOnly(), interactive=False
),
IsolationLevel.ONLINE_READONLY_INCONSISTENT: _IsolationSettings(
ydb.QueryOnlineReadOnly().with_allow_inconsistent_reads(),
interactive=True,
interactive=False,
),
IsolationLevel.STALE_READONLY: _IsolationSettings(
ydb.QueryStaleReadOnly(), interactive=True
ydb.QueryStaleReadOnly(), interactive=False
),
IsolationLevel.SNAPSHOT_READONLY: _IsolationSettings(
ydb.QuerySnapshotReadOnly(), interactive=True
Expand Down Expand Up @@ -78,10 +78,11 @@ def __init__(

self.connection_kwargs: dict = kwargs

self._tx_mode: ydb.BaseQueryTxMode = ydb.QuerySerializableReadWrite()
self._shared_session_pool: bool = False

self._tx_context: TxContext | AsyncTxContext | None = None
self._tx_mode: ydb.BaseQueryTxMode = ydb.QuerySerializableReadWrite()
self.interactive_transaction: bool = False
self._shared_session_pool: bool = False

if ydb_session_pool is not None:
self._shared_session_pool = True
Expand All @@ -99,11 +100,13 @@ def __init__(
self._session: ydb.QuerySession | ydb.aio.QuerySession | None = None

def set_isolation_level(self, isolation_level: IsolationLevel) -> None:
ydb_isolation_settings = _ydb_isolation_settings_map[isolation_level]
if self._tx_context and self._tx_context.tx_id:
raise InternalError(
"Failed to set transaction mode: transaction is already began"
)

ydb_isolation_settings = _ydb_isolation_settings_map[isolation_level]

self._tx_mode = ydb_isolation_settings.ydb_mode
self.interactive_transaction = ydb_isolation_settings.interactive

Expand All @@ -113,7 +116,7 @@ def get_isolation_level(self) -> str:
return IsolationLevel.SERIALIZABLE
return IsolationLevel.AUTOCOMMIT
if self._tx_mode.name == ydb.QueryOnlineReadOnly().name:
if self._tx_mode.settings.allow_inconsistent_reads:
if self._tx_mode.allow_inconsistent_reads:
return IsolationLevel.ONLINE_READONLY_INCONSISTENT
return IsolationLevel.ONLINE_READONLY
if self._tx_mode.name == ydb.QueryStaleReadOnly().name:
Expand All @@ -123,6 +126,12 @@ def get_isolation_level(self) -> str:
msg = f"{self._tx_mode.name} is not supported"
raise NotSupportedError(msg)

def _maybe_init_tx(
self, session: ydb.QuerySession | ydb.aio.QuerySession
) -> None:
if self._tx_context is None and self.interactive_transaction:
self._tx_context = session.transaction(self._tx_mode)


class Connection(BaseConnection):
_driver_cls = ydb.Driver
Expand Down Expand Up @@ -154,15 +163,12 @@ def cursor(self) -> Cursor:
if self._session is None:
raise RuntimeError("Connection is not ready, use wait_ready.")

if self.interactive_transaction:
self._tx_context = self._session.transaction(self._tx_mode)
else:
self._tx_context = None
self._maybe_init_tx(self._session)

self._current_cursor = self._cursor_cls(
session=self._session,
tx_mode=self._tx_mode,
tx_context=self._tx_context,
autocommit=(not self.interactive_transaction),
)
return self._current_cursor

Expand All @@ -181,16 +187,19 @@ def wait_ready(self, timeout: int = 10) -> None:

self._session = self._session_pool.acquire()

@handle_ydb_errors
def commit(self) -> None:
if self._tx_context and self._tx_context.tx_id:
self._tx_context.commit()
self._tx_context = None

@handle_ydb_errors
def rollback(self) -> None:
if self._tx_context and self._tx_context.tx_id:
self._tx_context.rollback()
self._tx_context = None

@handle_ydb_errors
def close(self) -> None:
self.rollback()

Expand Down Expand Up @@ -281,15 +290,12 @@ def cursor(self) -> AsyncCursor:
if self._session is None:
raise RuntimeError("Connection is not ready, use wait_ready.")

if self.interactive_transaction:
self._tx_context = self._session.transaction(self._tx_mode)
else:
self._tx_context = None
self._maybe_init_tx(self._session)

self._current_cursor = self._cursor_cls(
session=self._session,
tx_mode=self._tx_mode,
tx_context=self._tx_context,
autocommit=(not self.interactive_transaction),
)
return self._current_cursor

Expand All @@ -308,16 +314,19 @@ async def wait_ready(self, timeout: int = 10) -> None:

self._session = await self._session_pool.acquire()

@handle_ydb_errors
async def commit(self) -> None:
if self._tx_context and self._tx_context.tx_id:
await self._tx_context.commit()
self._tx_context = None

@handle_ydb_errors
async def rollback(self) -> None:
if self._tx_context and self._tx_context.tx_id:
await self._tx_context.rollback()
self._tx_context = None

@handle_ydb_errors
async def close(self) -> None:
await self.rollback()

Expand Down
Loading
Loading