Skip to content

Commit 72ec659

Browse files
committed
cursor methods
1 parent 5e6b396 commit 72ec659

File tree

8 files changed

+312
-193
lines changed

8 files changed

+312
-193
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ jobs:
4444
4545
- name: Run tests
4646
run: |
47-
poetry run -- pytest tests
47+
poetry run pytest tests

poetry.lock

Lines changed: 148 additions & 148 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ ruff = "ruff check"
3636
lint = ["mypy", "ruff"]
3737
format-check = "ruff format --check"
3838
format = "ruff format"
39+
tests = "pytest -v --docker-compose-remove-volumes --docker-compose=docker-compose.yml"
3940

4041
[tool.ruff]
4142
exclude = [".venv", ".git", "__pycache__", "build", "dist", "venv"]
@@ -48,6 +49,7 @@ select = ["F", "E"]
4849

4950
[tool.pytest.ini_options]
5051
asyncio_mode = "auto"
52+
addopts = "-p no:warnings"
5153

5254
[[tool.mypy.overrides]]
5355
module = "ydb.*"

tests/conftest.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,23 @@ async def driver(endpoint, database, event_loop):
5555
yield driver
5656

5757
await driver.stop(timeout=10)
58+
59+
60+
@pytest.fixture
61+
async def session_pool(driver: ydb.aio.Driver, event_loop):
62+
session_pool = ydb.aio.QuerySessionPool(driver)
63+
async with session_pool:
64+
await session_pool.execute_with_retries(
65+
"""DROP TABLE IF EXISTS table"""
66+
)
67+
await session_pool.execute_with_retries(
68+
"""
69+
CREATE TABLE table (
70+
id Int64 NOT NULL,
71+
val Int64,
72+
PRIMARY KEY(id)
73+
)
74+
"""
75+
)
76+
77+
yield session_pool

tests/test_cursor.py

Lines changed: 114 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,125 @@
11
import pytest
2-
3-
import ydb
4-
52
import ydb_dbapi
63
import ydb_dbapi.cursors
74

85

9-
@pytest.fixture
10-
async def cursor(driver: ydb.aio.Driver):
11-
session_pool = ydb.aio.QuerySessionPool(driver)
12-
session = await session_pool.acquire()
13-
cursor = ydb_dbapi.cursors.Cursor(session_pool, session)
14-
15-
yield cursor
16-
17-
await session_pool.release(session)
18-
19-
206
@pytest.mark.asyncio
21-
async def test_cursor_ddl(cursor):
22-
op = ydb_dbapi.cursors.YdbQuery(
23-
yql_text="""
7+
async def test_cursor_ddl(session_pool):
8+
cursor = ydb_dbapi.cursors.Cursor(session_pool=session_pool)
9+
10+
yql = """
2411
CREATE TABLE table (
2512
id Int64 NOT NULL,
26-
i64Val Int64,
13+
val Int64,
2714
PRIMARY KEY(id)
2815
)
29-
""",
30-
is_ddl=True,
31-
)
16+
"""
17+
18+
with pytest.raises(ydb_dbapi.Error):
19+
await cursor.execute(query=yql)
20+
21+
yql = """
22+
DROP TABLE table
23+
"""
24+
25+
await cursor.execute(query=yql)
26+
27+
assert await cursor.fetchone() is None
28+
29+
30+
@pytest.mark.asyncio
31+
async def test_cursor_dml(session_pool):
32+
cursor = ydb_dbapi.cursors.Cursor(session_pool=session_pool)
33+
yql_text = """
34+
INSERT INTO table (id, val) VALUES
35+
(1, 1),
36+
(2, 2),
37+
(3, 3)
38+
"""
39+
40+
await cursor.execute(query=yql_text)
41+
assert await cursor.fetchone() is None
42+
43+
cursor = ydb_dbapi.cursors.Cursor(session_pool=session_pool)
44+
45+
yql_text = """
46+
SELECT COUNT(*) FROM table as sum
47+
"""
48+
49+
await cursor.execute(query=yql_text)
50+
51+
res = await cursor.fetchone()
52+
assert len(res) == 1
53+
assert res[0] == 3
54+
55+
56+
@pytest.mark.asyncio
57+
async def test_cursor_fetch_one(session_pool):
58+
cursor = ydb_dbapi.cursors.Cursor(session_pool=session_pool)
59+
yql_text = """
60+
INSERT INTO table (id, val) VALUES
61+
(1, 1),
62+
(2, 2)
63+
"""
64+
65+
await cursor.execute(query=yql_text)
66+
assert await cursor.fetchone() is None
67+
68+
cursor = ydb_dbapi.cursors.Cursor(session_pool=session_pool)
69+
70+
yql_text = """
71+
SELECT id, val FROM table
72+
"""
73+
74+
await cursor.execute(query=yql_text)
75+
76+
res = await cursor.fetchone()
77+
assert res[0] == 1
78+
79+
res = await cursor.fetchone()
80+
assert res[0] == 2
81+
82+
assert await cursor.fetchone() is None
83+
84+
85+
@pytest.mark.asyncio
86+
async def test_cursor_fetch_many(session_pool):
87+
cursor = ydb_dbapi.cursors.Cursor(session_pool=session_pool)
88+
yql_text = """
89+
INSERT INTO table (id, val) VALUES
90+
(1, 1),
91+
(2, 2),
92+
(3, 3),
93+
(4, 4)
94+
"""
3295

33-
await cursor.execute(op)
96+
await cursor.execute(query=yql_text)
97+
assert await cursor.fetchone() is None
98+
99+
cursor = ydb_dbapi.cursors.Cursor(session_pool=session_pool)
100+
101+
yql_text = """
102+
SELECT id, val FROM table
103+
"""
104+
105+
await cursor.execute(query=yql_text)
106+
107+
res = await cursor.fetchmany()
108+
assert len(res) == 1
109+
assert res[0][0] == 1
110+
111+
res = await cursor.fetchmany(size=2)
112+
assert len(res) == 2
113+
assert res[0][0] == 2
114+
assert res[1][0] == 3
115+
116+
res = await cursor.fetchmany(size=2)
117+
assert len(res) == 1
118+
assert res[0][0] == 4
119+
120+
assert await cursor.fetchmany(size=2) is None
121+
122+
123+
@pytest.mark.asyncio
124+
async def test_cursor_fetch_all(session_pool):
125+
pass

ydb_dbapi/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .errors import * # noqa

ydb_dbapi/cursors.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
)
1313

1414
import ydb
15-
from .errors import DatabaseError
15+
from .errors import Error, DatabaseError
1616
from .utils import handle_ydb_errors, AsyncFromSyncIterator
1717

1818

@@ -40,37 +40,37 @@ class Cursor:
4040
def __init__(
4141
self,
4242
session_pool: ydb.aio.QuerySessionPool,
43-
session: ydb.aio.QuerySession,
44-
tx_mode: Optional[ydb.aio.QueryTxContext] = None,
4543
tx_context: Optional[ydb.aio.QueryTxContext] = None,
4644
table_path_prefix: str = "",
4745
autocommit: bool = True,
4846
):
4947
self.arraysize: int = 1
5048
self._description: Optional[List[Tuple]] = None
5149

52-
self._pool = session_pool
53-
self._session = session
54-
self._tx_mode = tx_mode
55-
self._tx_context: ydb.aio.QueryTxContext = tx_context
50+
self._session_pool = session_pool
51+
self._tx_context = tx_context
5652
self._table_path_prefix = table_path_prefix
5753
self._autocommit = autocommit
5854

5955
self._stream: Optional[AsyncIterator] = None
6056
self._rows: Optional[Iterator[Dict]] = None
6157

6258
@handle_ydb_errors
63-
async def _execute_ddl_query(
59+
async def _execute_generic_query(
6460
self, query: str, parameters: Optional[ParametersType] = None
6561
) -> List[ydb.convert.ResultSet]:
66-
return await self._pool.execute_with_retries(
62+
return await self._session_pool.execute_with_retries(
6763
query=query, parameters=parameters
6864
)
6965

7066
@handle_ydb_errors
71-
async def _execute_dml_query(
67+
async def _execute_transactional_query(
7268
self, query: str, parameters: Optional[ParametersType] = None
7369
) -> AsyncIterator:
70+
if self._tx_context is None:
71+
raise Error(
72+
"Unable to execute tx based queries without transaction."
73+
)
7474
return await self._tx_context.execute(
7575
query=query,
7676
parameters=parameters,
@@ -79,17 +79,17 @@ async def _execute_dml_query(
7979

8080
@handle_ydb_errors
8181
async def execute(
82-
self, operation: YdbQuery, parameters: Optional[ParametersType] = None
82+
self, query: str, parameters: Optional[ParametersType] = None
8383
):
84-
if operation.is_ddl:
85-
result_sets = await self._execute_ddl_query(
86-
query=operation.yql_text, parameters=parameters
84+
if self._tx_context is not None:
85+
self._stream = await self._execute_transactional_query(
86+
query=query, parameters=parameters
8787
)
88-
self._stream = AsyncFromSyncIterator(iter(result_sets))
8988
else:
90-
self._stream = await self._execute_dml_query(
91-
query=operation.yql_text, parameters=parameters
89+
result_sets = await self._execute_generic_query(
90+
query=query, parameters=parameters
9291
)
92+
self._stream = AsyncFromSyncIterator(iter(result_sets))
9393

9494
if self._stream is None:
9595
return
@@ -98,7 +98,6 @@ async def execute(
9898
self._update_result_set(result_set)
9999

100100
def _update_result_set(self, result_set: ydb.convert.ResultSet):
101-
# self._result_set = result_set
102101
self._update_description(result_set)
103102
self._rows = self._rows_iterable(result_set)
104103

@@ -133,8 +132,13 @@ async def fetchone(self):
133132
return next(self._rows or iter([]), None)
134133

135134
async def fetchmany(self, size: Optional[int] = None):
136-
return list(
137-
itertools.islice(self._rows or iter([]), size or self.arraysize)
135+
return (
136+
list(
137+
itertools.islice(
138+
self._rows or iter([]), size or self.arraysize
139+
)
140+
)
141+
or None
138142
)
139143

140144
async def fetchall(self):

ydb_dbapi/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
def handle_ydb_errors(func):
1717
@functools.wraps(func)
18-
def wrapper(*args, **kwargs):
18+
async def wrapper(*args, **kwargs):
1919
try:
20-
return func(*args, **kwargs)
20+
return await func(*args, **kwargs)
2121
except (ydb.issues.AlreadyExists, ydb.issues.PreconditionFailed) as e:
2222
raise IntegrityError(e.message, original_error=e) from e
2323
except (ydb.issues.Unsupported, ydb.issues.Unimplemented) as e:

0 commit comments

Comments
 (0)