Skip to content

Commit 45bcf5e

Browse files
committed
Add pool retries to execute calls
1 parent 3404c63 commit 45bcf5e

File tree

7 files changed

+261
-244
lines changed

7 files changed

+261
-244
lines changed

poetry.lock

Lines changed: 150 additions & 150 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ readme = "README.md"
77

88
[tool.poetry.dependencies]
99
python = "^3.9"
10-
ydb = "^3.18.3"
10+
ydb = "^3.18.8"
1111

1212
[tool.poetry.group.dev.dependencies]
1313
pre-commit = "^4.0.1"

tests/conftest.py

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,17 @@ async def session_pool(
163163
"""
164164
)
165165

166+
await session_pool.execute_with_retries(
167+
f"""
168+
DELETE FROM {name};
169+
INSERT INTO {name} (id, val) VALUES
170+
(0, 0),
171+
(1, 1),
172+
(2, 2),
173+
(3, 3)
174+
"""
175+
)
176+
166177
yield session_pool
167178

168179

@@ -184,46 +195,15 @@ def session_pool_sync(
184195
"""
185196
)
186197

187-
yield session_pool
188-
189-
190-
@pytest.fixture
191-
async def session(
192-
session_pool: ydb.aio.QuerySessionPool,
193-
) -> AsyncGenerator[ydb.aio.QuerySession]:
194-
for name in ["table", "table1", "table2"]:
195-
await session_pool.execute_with_retries(
196-
f"""
197-
DELETE FROM {name};
198-
INSERT INTO {name} (id, val) VALUES
199-
(0, 0),
200-
(1, 1),
201-
(2, 2),
202-
(3, 3)
203-
"""
204-
)
205-
206-
session = await session_pool.acquire()
207-
yield session
208-
await session_pool.release(session)
209-
210-
211-
@pytest.fixture
212-
def session_sync(
213-
session_pool_sync: ydb.QuerySessionPool,
214-
) -> Generator[ydb.QuerySession]:
215-
for name in ["table", "table1", "table2"]:
216-
session_pool_sync.execute_with_retries(
217-
f"""
218-
DELETE FROM {name};
219-
INSERT INTO {name} (id, val) VALUES
220-
(0, 0),
221-
(1, 1),
222-
(2, 2),
223-
(3, 3)
224-
"""
225-
)
198+
session_pool.execute_with_retries(
199+
f"""
200+
DELETE FROM {name};
201+
INSERT INTO {name} (id, val) VALUES
202+
(0, 0),
203+
(1, 1),
204+
(2, 2),
205+
(3, 3)
206+
"""
207+
)
226208

227-
session = session_pool_sync.acquire()
228-
yield session
229-
session_pool_sync.release(session)
209+
yield session_pool

tests/test_connections.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ def _test_isolation_level_read_only(
3535

3636
connection.set_isolation_level(isolation_level)
3737
cursor = connection.cursor()
38+
maybe_await(connection.begin())
39+
3840
query = "UPSERT INTO foo(id) VALUES (1)"
3941
if read_only:
4042
with pytest.raises(dbapi.DatabaseError):

tests/test_cursors.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,10 @@ def _test_cursor_fetch_all_multiple_result_sets(
139139

140140
class TestCursor(BaseCursorTestSuit):
141141
@pytest.fixture
142-
def sync_cursor(self, session_sync: ydb.QuerySession) -> Generator[Cursor]:
143-
cursor = Cursor(session_sync, ydb.QuerySerializableReadWrite())
142+
def sync_cursor(
143+
self, session_pool_sync: ydb.QuerySessionPool
144+
) -> Generator[Cursor]:
145+
cursor = Cursor(session_pool_sync, ydb.QuerySerializableReadWrite())
144146
yield cursor
145147
cursor.close()
146148

@@ -173,9 +175,9 @@ def test_cursor_fetch_all_multiple_result_sets(
173175
class TestAsyncCursor(BaseCursorTestSuit):
174176
@pytest.fixture
175177
async def async_cursor(
176-
self, session: ydb.aio.QuerySession
178+
self, session_pool: ydb.aio.QuerySessionPool
177179
) -> AsyncGenerator[Cursor]:
178-
cursor = AsyncCursor(session, ydb.QuerySerializableReadWrite())
180+
cursor = AsyncCursor(session_pool, ydb.QuerySerializableReadWrite())
179181
yield cursor
180182
await greenlet_spawn(cursor.close)
181183

ydb_dbapi/connections.py

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,15 @@ def __init__(
8787
if ydb_session_pool is not None:
8888
self._shared_session_pool = True
8989
self._session_pool = ydb_session_pool
90+
settings = self._get_client_settings()
91+
self._session_pool._query_client_settings = settings
9092
self._driver = self._session_pool._driver
9193
else:
9294
driver_config = ydb.DriverConfig(
9395
endpoint=self.endpoint,
9496
database=self.database,
9597
credentials=self.credentials,
98+
query_client_settings=self._get_client_settings(),
9699
)
97100
self._driver = self._driver_cls(driver_config)
98101
self._session_pool = self._pool_cls(self._driver, size=5)
@@ -126,11 +129,15 @@ def get_isolation_level(self) -> str:
126129
msg = f"{self._tx_mode.name} is not supported"
127130
raise NotSupportedError(msg)
128131

129-
def _maybe_init_tx(
130-
self, session: ydb.QuerySession | ydb.aio.QuerySession
131-
) -> None:
132-
if self._tx_context is None and self.interactive_transaction:
133-
self._tx_context = session.transaction(self._tx_mode)
132+
def _get_client_settings(self) -> ydb.QueryClientSettings:
133+
return (
134+
ydb.QueryClientSettings()
135+
.with_native_date_in_result_sets(True)
136+
.with_native_datetime_in_result_sets(True)
137+
.with_native_timestamp_in_result_sets(True)
138+
.with_native_interval_in_result_sets(True)
139+
.with_native_json_in_result_sets(False)
140+
)
134141

135142

136143
class Connection(BaseConnection):
@@ -160,17 +167,11 @@ def __init__(
160167
self._current_cursor: Cursor | None = None
161168

162169
def cursor(self) -> Cursor:
163-
if self._session is None:
164-
raise RuntimeError("Connection is not ready, use wait_ready.")
165-
166-
self._maybe_init_tx(self._session)
167-
168-
self._current_cursor = self._cursor_cls(
169-
session=self._session,
170+
return self._cursor_cls(
171+
session_pool=self._session_pool,
170172
tx_mode=self._tx_mode,
171173
tx_context=self._tx_context,
172174
)
173-
return self._current_cursor
174175

175176
def wait_ready(self, timeout: int = 10) -> None:
176177
try:
@@ -185,27 +186,33 @@ def wait_ready(self, timeout: int = 10) -> None:
185186
)
186187
raise InterfaceError(msg) from e
187188

188-
self._session = self._session_pool.acquire()
189+
@handle_ydb_errors
190+
def begin(self) -> None:
191+
self._tx_context = None
192+
if self.interactive_transaction:
193+
self._session = self._session_pool.acquire()
194+
self._tx_context = self._session.transaction(self._tx_mode)
189195

190196
@handle_ydb_errors
191197
def commit(self) -> None:
192198
if self._tx_context and self._tx_context.tx_id:
193199
self._tx_context.commit()
200+
self._session_pool.release(self._session)
194201
self._tx_context = None
202+
self._session = None
195203

196204
@handle_ydb_errors
197205
def rollback(self) -> None:
198206
if self._tx_context and self._tx_context.tx_id:
199207
self._tx_context.rollback()
208+
self._session_pool.release(self._session)
200209
self._tx_context = None
210+
self._session = None
201211

202212
@handle_ydb_errors
203213
def close(self) -> None:
204214
self.rollback()
205215

206-
if self._current_cursor:
207-
self._current_cursor.close()
208-
209216
if self._session:
210217
self._session_pool.release(self._session)
211218

@@ -287,17 +294,11 @@ def __init__(
287294
self._current_cursor: AsyncCursor | None = None
288295

289296
def cursor(self) -> AsyncCursor:
290-
if self._session is None:
291-
raise RuntimeError("Connection is not ready, use wait_ready.")
292-
293-
self._maybe_init_tx(self._session)
294-
295-
self._current_cursor = self._cursor_cls(
296-
session=self._session,
297+
return self._cursor_cls(
298+
session_pool=self._session_pool,
297299
tx_mode=self._tx_mode,
298300
tx_context=self._tx_context,
299301
)
300-
return self._current_cursor
301302

302303
async def wait_ready(self, timeout: int = 10) -> None:
303304
try:
@@ -312,27 +313,33 @@ async def wait_ready(self, timeout: int = 10) -> None:
312313
)
313314
raise InterfaceError(msg) from e
314315

315-
self._session = await self._session_pool.acquire()
316+
@handle_ydb_errors
317+
async def begin(self) -> None:
318+
self._tx_context = None
319+
if self.interactive_transaction:
320+
self._session = await self._session_pool.acquire()
321+
self._tx_context = self._session.transaction(self._tx_mode)
316322

317323
@handle_ydb_errors
318324
async def commit(self) -> None:
319-
if self._tx_context and self._tx_context.tx_id:
325+
if self._session and self._tx_context and self._tx_context.tx_id:
320326
await self._tx_context.commit()
327+
await self._session_pool.release(self._session)
328+
self._session = None
321329
self._tx_context = None
322330

323331
@handle_ydb_errors
324332
async def rollback(self) -> None:
325-
if self._tx_context and self._tx_context.tx_id:
333+
if self._session and self._tx_context and self._tx_context.tx_id:
326334
await self._tx_context.rollback()
335+
await self._session_pool.release(self._session)
336+
self._session = None
327337
self._tx_context = None
328338

329339
@handle_ydb_errors
330340
async def close(self) -> None:
331341
await self.rollback()
332342

333-
if self._current_cursor:
334-
await self._current_cursor.close()
335-
336343
if self._session:
337344
await self._session_pool.release(self._session)
338345

ydb_dbapi/cursors.py

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,13 @@ def _fetchall_from_buffer(self) -> list:
138138
class Cursor(BufferedCursor):
139139
def __init__(
140140
self,
141-
session: ydb.QuerySession,
141+
session_pool: ydb.QuerySessionPool,
142142
tx_mode: ydb.BaseQueryTxMode,
143143
tx_context: ydb.QueryTxContext | None = None,
144144
table_path_prefix: str = "",
145145
) -> None:
146146
super().__init__()
147-
self._session = session
147+
self._session_pool = session_pool
148148
self._tx_mode = tx_mode
149149
self._tx_context = tx_context
150150
self._table_path_prefix = table_path_prefix
@@ -165,19 +165,32 @@ def fetchall(self) -> list:
165165
def _execute_generic_query(
166166
self, query: str, parameters: ParametersType | None = None
167167
) -> Iterator[ydb.convert.ResultSet]:
168-
return self._session.execute(query=query, parameters=parameters)
168+
def callee(
169+
session: ydb.QuerySession,
170+
) -> Iterator[ydb.convert.ResultSet]:
171+
return session.execute(
172+
query=query,
173+
parameters=parameters,
174+
)
175+
176+
return self._session_pool.retry_operation_sync(callee)
169177

170178
@handle_ydb_errors
171179
def _execute_session_query(
172180
self,
173181
query: str,
174182
parameters: ParametersType | None = None,
175183
) -> Iterator[ydb.convert.ResultSet]:
176-
return self._session.transaction(self._tx_mode).execute(
177-
query=query,
178-
parameters=parameters,
179-
commit_tx=True,
180-
)
184+
def callee(
185+
session: ydb.QuerySession,
186+
) -> Iterator[ydb.convert.ResultSet]:
187+
return session.transaction(self._tx_mode).execute(
188+
query=query,
189+
parameters=parameters,
190+
commit_tx=True,
191+
)
192+
193+
return self._session_pool.retry_operation_sync(callee)
181194

182195
@handle_ydb_errors
183196
def _execute_transactional_query(
@@ -276,13 +289,13 @@ def __exit__(
276289
class AsyncCursor(BufferedCursor):
277290
def __init__(
278291
self,
279-
session: ydb.aio.QuerySession,
292+
session_pool: ydb.aio.QuerySessionPool,
280293
tx_mode: ydb.BaseQueryTxMode,
281294
tx_context: ydb.aio.QueryTxContext | None = None,
282295
table_path_prefix: str = "",
283296
) -> None:
284297
super().__init__()
285-
self._session = session
298+
self._session_pool = session_pool
286299
self._tx_mode = tx_mode
287300
self._tx_context = tx_context
288301
self._table_path_prefix = table_path_prefix
@@ -303,19 +316,32 @@ async def fetchall(self) -> list:
303316
async def _execute_generic_query(
304317
self, query: str, parameters: ParametersType | None = None
305318
) -> AsyncIterator[ydb.convert.ResultSet]:
306-
return await self._session.execute(query=query, parameters=parameters)
319+
async def callee(
320+
session: ydb.aio.QuerySession,
321+
) -> AsyncIterator[ydb.convert.ResultSet]:
322+
return await session.execute(
323+
query=query,
324+
parameters=parameters,
325+
)
326+
327+
return await self._session_pool.retry_operation_async(callee)
307328

308329
@handle_ydb_errors
309330
async def _execute_session_query(
310331
self,
311332
query: str,
312333
parameters: ParametersType | None = None,
313334
) -> AsyncIterator[ydb.convert.ResultSet]:
314-
return await self._session.transaction(self._tx_mode).execute(
315-
query=query,
316-
parameters=parameters,
317-
commit_tx=True,
318-
)
335+
async def callee(
336+
session: ydb.aio.QuerySession,
337+
) -> AsyncIterator[ydb.convert.ResultSet]:
338+
return await session.transaction(self._tx_mode).execute(
339+
query=query,
340+
parameters=parameters,
341+
commit_tx=True,
342+
)
343+
344+
return await self._session_pool.retry_operation_async(callee)
319345

320346
@handle_ydb_errors
321347
async def _execute_transactional_query(

0 commit comments

Comments
 (0)