Skip to content

Commit 98dc11c

Browse files
committed
Merge branch 'main' into fix_tx_modes
2 parents 5870e24 + 5f50b2c commit 98dc11c

File tree

7 files changed

+96
-43
lines changed

7 files changed

+96
-43
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
## 0.0.1b1 ##
12
* YDB DBAPI based on QueryService

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "ydb-dbapi"
3-
version = "0.0.0" # AUTOVERSION
3+
version = "0.0.1b1" # AUTOVERSION
44
description = "YDB Python DBAPI which complies with PEP 249"
55
authors = ["Yandex LLC <ydb@yandex-team.ru>"]
66
readme = "README.md"

tests/test_connections.py

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,10 @@ def _test_isolation_level_read_only(
2626
isolation_level: str,
2727
read_only: bool,
2828
) -> None:
29-
connection.set_isolation_level("AUTOCOMMIT")
3029
cursor = connection.cursor()
3130
with suppress(dbapi.DatabaseError):
32-
maybe_await(cursor.execute("DROP TABLE foo"))
33-
cursor = connection.cursor()
34-
maybe_await(cursor.execute(
31+
maybe_await(cursor.execute_scheme("DROP TABLE foo"))
32+
maybe_await(cursor.execute_scheme(
3533
"CREATE TABLE foo(id Int64 NOT NULL, PRIMARY KEY (id))"
3634
))
3735

@@ -46,23 +44,21 @@ def _test_isolation_level_read_only(
4644

4745
maybe_await(connection.rollback())
4846

49-
connection.set_isolation_level("AUTOCOMMIT")
50-
cursor = connection.cursor()
51-
maybe_await(cursor.execute("DROP TABLE foo"))
47+
maybe_await(cursor.execute_scheme("DROP TABLE foo"))
5248

5349
def _test_connection(self, connection: dbapi.Connection) -> None:
5450
maybe_await(connection.commit())
5551
maybe_await(connection.rollback())
5652

5753
cur = connection.cursor()
5854
with suppress(dbapi.DatabaseError):
59-
maybe_await(cur.execute("DROP TABLE foo"))
55+
maybe_await(cur.execute_scheme("DROP TABLE foo"))
6056

6157
assert not maybe_await(connection.check_exists("/local/foo"))
6258
with pytest.raises(dbapi.ProgrammingError):
6359
maybe_await(connection.describe("/local/foo"))
6460

65-
maybe_await(cur.execute(
61+
maybe_await(cur.execute_scheme(
6662
"CREATE TABLE foo(id Int64 NOT NULL, PRIMARY KEY (id))"
6763
))
6864

@@ -72,17 +68,17 @@ def _test_connection(self, connection: dbapi.Connection) -> None:
7268
assert col.name == "id"
7369
assert col.type == ydb.PrimitiveType.Int64
7470

75-
maybe_await(cur.execute("DROP TABLE foo"))
71+
maybe_await(cur.execute_scheme("DROP TABLE foo"))
7672
maybe_await(cur.close())
7773

7874
def _test_cursor_raw_query(self, connection: dbapi.Connection) -> None:
7975
cur = connection.cursor()
8076
assert cur
8177

8278
with suppress(dbapi.DatabaseError):
83-
maybe_await(cur.execute("DROP TABLE test"))
79+
maybe_await(cur.execute_scheme("DROP TABLE test"))
8480

85-
maybe_await(cur.execute(
81+
maybe_await(cur.execute_scheme(
8682
"CREATE TABLE test(id Int64 NOT NULL, text Utf8, PRIMARY KEY (id))"
8783
))
8884

@@ -107,7 +103,7 @@ def _test_cursor_raw_query(self, connection: dbapi.Connection) -> None:
107103
},
108104
))
109105

110-
maybe_await(cur.execute("DROP TABLE test"))
106+
maybe_await(cur.execute_scheme("DROP TABLE test"))
111107

112108
maybe_await(cur.close())
113109

@@ -125,7 +121,7 @@ def _test_errors(
125121
cur = connection.cursor()
126122

127123
with suppress(dbapi.DatabaseError):
128-
maybe_await(cur.execute("DROP TABLE test"))
124+
maybe_await(cur.execute_scheme("DROP TABLE test"))
129125

130126
with pytest.raises(dbapi.DataError):
131127
maybe_await(cur.execute("SELECT 18446744073709551616"))
@@ -139,7 +135,7 @@ def _test_errors(
139135
with pytest.raises(dbapi.ProgrammingError):
140136
maybe_await(cur.execute("SELECT * FROM test"))
141137

142-
maybe_await(cur.execute(
138+
maybe_await(cur.execute_scheme(
143139
"CREATE TABLE test(id Int64, PRIMARY KEY (id))"
144140
))
145141

@@ -148,7 +144,7 @@ def _test_errors(
148144
with pytest.raises(dbapi.IntegrityError):
149145
maybe_await(cur.execute("INSERT INTO test(id) VALUES(1)"))
150146

151-
maybe_await(cur.execute("DROP TABLE test"))
147+
maybe_await(cur.execute_scheme("DROP TABLE test"))
152148
maybe_await(cur.close())
153149

154150

tests/test_cursors.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ def _test_cursor_fetch_all_multiple_result_sets(
140140
class TestCursor(BaseCursorTestSuit):
141141
@pytest.fixture
142142
def sync_cursor(self, session_sync: ydb.QuerySession) -> Generator[Cursor]:
143-
cursor = Cursor(session_sync)
143+
cursor = Cursor(session_sync, ydb.QuerySerializableReadWrite())
144144
yield cursor
145145
cursor.close()
146146

@@ -175,7 +175,7 @@ class TestAsyncCursor(BaseCursorTestSuit):
175175
async def async_cursor(
176176
self, session: ydb.aio.QuerySession
177177
) -> AsyncGenerator[Cursor]:
178-
cursor = AsyncCursor(session)
178+
cursor = AsyncCursor(session, ydb.QuerySerializableReadWrite())
179179
yield cursor
180180
await greenlet_spawn(cursor.close)
181181

ydb_dbapi/connections.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ class _IsolationSettings(NamedTuple):
3535

3636

3737
_ydb_isolation_settings_map = {
38-
IsolationLevel.AUTOCOMMIT: _IsolationSettings(None, interactive=False),
38+
IsolationLevel.AUTOCOMMIT: _IsolationSettings(
39+
ydb.QuerySerializableReadWrite(), interactive=False
40+
),
3941
IsolationLevel.SERIALIZABLE: _IsolationSettings(
4042
ydb.QuerySerializableReadWrite(), interactive=True
4143
),
@@ -79,7 +81,7 @@ def __init__(
7981
self._shared_session_pool: bool = False
8082

8183
self._tx_context: TxContext | AsyncTxContext | None = None
82-
self._tx_mode: ydb.BaseQueryTxMode | None = None
84+
self._tx_mode: ydb.BaseQueryTxMode = ydb.QuerySerializableReadWrite()
8385
self.interactive_transaction: bool = False
8486

8587
if ydb_session_pool is not None:
@@ -105,15 +107,14 @@ def set_isolation_level(self, isolation_level: IsolationLevel) -> None:
105107

106108
ydb_isolation_settings = _ydb_isolation_settings_map[isolation_level]
107109

108-
self._tx_context = None
109110
self._tx_mode = ydb_isolation_settings.ydb_mode
110111
self.interactive_transaction = ydb_isolation_settings.interactive
111112

112113
def get_isolation_level(self) -> str:
113-
if self._tx_mode is None:
114-
return IsolationLevel.AUTOCOMMIT
115114
if self._tx_mode.name == ydb.QuerySerializableReadWrite().name:
116-
return IsolationLevel.SERIALIZABLE
115+
if self.interactive_transaction:
116+
return IsolationLevel.SERIALIZABLE
117+
return IsolationLevel.AUTOCOMMIT
117118
if self._tx_mode.name == ydb.QueryOnlineReadOnly().name:
118119
if self._tx_mode.allow_inconsistent_reads:
119120
return IsolationLevel.ONLINE_READONLY_INCONSISTENT
@@ -128,7 +129,7 @@ def get_isolation_level(self) -> str:
128129
def _maybe_init_tx(
129130
self, session: ydb.QuerySession | ydb.aio.QuerySession
130131
) -> None:
131-
if self._tx_context is None and self._tx_mode is not None:
132+
if self._tx_context is None and self.interactive_transaction:
132133
self._tx_context = session.transaction(self._tx_mode)
133134

134135

@@ -166,8 +167,8 @@ def cursor(self) -> Cursor:
166167

167168
self._current_cursor = self._cursor_cls(
168169
session=self._session,
170+
tx_mode=self._tx_mode,
169171
tx_context=self._tx_context,
170-
autocommit=(not self.interactive_transaction),
171172
)
172173
return self._current_cursor
173174

@@ -290,8 +291,8 @@ def cursor(self) -> AsyncCursor:
290291

291292
self._current_cursor = self._cursor_cls(
292293
session=self._session,
294+
tx_mode=self._tx_mode,
293295
tx_context=self._tx_context,
294-
autocommit=(not self.interactive_transaction),
295296
)
296297
return self._current_cursor
297298

ydb_dbapi/cursors.py

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from collections.abc import AsyncIterator
55
from collections.abc import Generator
66
from collections.abc import Iterator
7+
from collections.abc import Sequence
78
from typing import Any
89
from typing import Union
910

@@ -138,15 +139,15 @@ class Cursor(BufferedCursor):
138139
def __init__(
139140
self,
140141
session: ydb.QuerySession,
142+
tx_mode: ydb.BaseQueryTxMode,
141143
tx_context: ydb.QueryTxContext | None = None,
142144
table_path_prefix: str = "",
143-
autocommit: bool = True,
144145
) -> None:
145146
super().__init__()
146147
self._session = session
148+
self._tx_mode = tx_mode
147149
self._tx_context = tx_context
148150
self._table_path_prefix = table_path_prefix
149-
self._autocommit = autocommit
150151

151152
self._stream: Iterator | None = None
152153

@@ -166,6 +167,18 @@ def _execute_generic_query(
166167
) -> Iterator[ydb.convert.ResultSet]:
167168
return self._session.execute(query=query, parameters=parameters)
168169

170+
@handle_ydb_errors
171+
def _execute_session_query(
172+
self,
173+
query: str,
174+
parameters: ParametersType | None = None,
175+
) -> Iterator[ydb.convert.ResultSet]:
176+
return self._session.transaction(self._tx_mode).execute(
177+
query=query,
178+
parameters=parameters,
179+
commit_tx=True,
180+
)
181+
169182
@handle_ydb_errors
170183
def _execute_transactional_query(
171184
self,
@@ -176,8 +189,21 @@ def _execute_transactional_query(
176189
return tx_context.execute(
177190
query=query,
178191
parameters=parameters,
179-
commit_tx=self._autocommit,
192+
commit_tx=False,
193+
)
194+
195+
def execute_scheme(
196+
self,
197+
query: str,
198+
parameters: ParametersType | None = None,
199+
) -> None:
200+
self._raise_if_closed()
201+
202+
self._stream = self._execute_generic_query(
203+
query=query, parameters=parameters
180204
)
205+
self._begin_query()
206+
self._scroll_stream(replace_current=False)
181207

182208
def execute(
183209
self,
@@ -191,16 +217,18 @@ def execute(
191217
tx_context=self._tx_context, query=query, parameters=parameters
192218
)
193219
else:
194-
self._stream = self._execute_generic_query(
220+
self._stream = self._execute_session_query(
195221
query=query, parameters=parameters
196222
)
197223

198224
self._begin_query()
199-
200225
self._scroll_stream(replace_current=False)
201226

202-
async def executemany(self) -> None:
203-
pass
227+
def executemany(
228+
self, query: str, seq_of_parameters: Sequence[ParametersType]
229+
) -> None:
230+
for parameters in seq_of_parameters:
231+
self.execute(query, parameters)
204232

205233
@handle_ydb_errors
206234
def nextset(self, replace_current: bool = True) -> bool:
@@ -249,15 +277,15 @@ class AsyncCursor(BufferedCursor):
249277
def __init__(
250278
self,
251279
session: ydb.aio.QuerySession,
280+
tx_mode: ydb.BaseQueryTxMode,
252281
tx_context: ydb.aio.QueryTxContext | None = None,
253282
table_path_prefix: str = "",
254-
autocommit: bool = True,
255283
) -> None:
256284
super().__init__()
257285
self._session = session
286+
self._tx_mode = tx_mode
258287
self._tx_context = tx_context
259288
self._table_path_prefix = table_path_prefix
260-
self._autocommit = autocommit
261289

262290
self._stream: AsyncIterator | None = None
263291

@@ -277,6 +305,18 @@ async def _execute_generic_query(
277305
) -> AsyncIterator[ydb.convert.ResultSet]:
278306
return await self._session.execute(query=query, parameters=parameters)
279307

308+
@handle_ydb_errors
309+
async def _execute_session_query(
310+
self,
311+
query: str,
312+
parameters: ParametersType | None = None,
313+
) -> AsyncIterator[ydb.convert.ResultSet]:
314+
return await self._session.transaction(self._tx_mode).execute(
315+
query=query,
316+
parameters=parameters,
317+
commit_tx=True,
318+
)
319+
280320
@handle_ydb_errors
281321
async def _execute_transactional_query(
282322
self,
@@ -287,8 +327,21 @@ async def _execute_transactional_query(
287327
return await tx_context.execute(
288328
query=query,
289329
parameters=parameters,
290-
commit_tx=self._autocommit,
330+
commit_tx=False,
331+
)
332+
333+
async def execute_scheme(
334+
self,
335+
query: str,
336+
parameters: ParametersType | None = None,
337+
) -> None:
338+
self._raise_if_closed()
339+
340+
self._stream = await self._execute_generic_query(
341+
query=query, parameters=parameters
291342
)
343+
self._begin_query()
344+
await self._scroll_stream(replace_current=False)
292345

293346
async def execute(
294347
self,
@@ -302,16 +355,18 @@ async def execute(
302355
tx_context=self._tx_context, query=query, parameters=parameters
303356
)
304357
else:
305-
self._stream = await self._execute_generic_query(
358+
self._stream = await self._execute_session_query(
306359
query=query, parameters=parameters
307360
)
308361

309362
self._begin_query()
310-
311363
await self._scroll_stream(replace_current=False)
312364

313-
async def executemany(self) -> None:
314-
pass
365+
async def executemany(
366+
self, query: str, seq_of_parameters: Sequence[ParametersType]
367+
) -> None:
368+
for parameters in seq_of_parameters:
369+
await self.execute(query, parameters)
315370

316371
@handle_ydb_errors
317372
async def nextset(self, replace_current: bool = True) -> bool:

ydb_dbapi/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = "0.0.1"
1+
VERSION = "0.0.1b1"

0 commit comments

Comments
 (0)