@@ -25,7 +25,63 @@ class IsolationLevel:
25
25
AUTOCOMMIT = "AUTOCOMMIT"
26
26
27
27
28
- class Connection :
28
+ class BaseConnection :
29
+ _tx_mode : ydb .BaseQueryTxMode = ydb .QuerySerializableReadWrite ()
30
+ _tx_context : ydb .QueryTxContext | ydb .aio .QueryTxContext | None = None
31
+ interactive_transaction : bool = False
32
+
33
+ def set_isolation_level (self , isolation_level : str ) -> None :
34
+ class IsolationSettings (NamedTuple ):
35
+ ydb_mode : ydb .BaseQueryTxMode
36
+ interactive : bool
37
+
38
+ ydb_isolation_settings_map = {
39
+ IsolationLevel .AUTOCOMMIT : IsolationSettings (
40
+ ydb .QuerySerializableReadWrite (), interactive = False
41
+ ),
42
+ IsolationLevel .SERIALIZABLE : IsolationSettings (
43
+ ydb .QuerySerializableReadWrite (), interactive = True
44
+ ),
45
+ IsolationLevel .ONLINE_READONLY : IsolationSettings (
46
+ ydb .QueryOnlineReadOnly (), interactive = True
47
+ ),
48
+ IsolationLevel .ONLINE_READONLY_INCONSISTENT : IsolationSettings (
49
+ ydb .QueryOnlineReadOnly ().with_allow_inconsistent_reads (),
50
+ interactive = True ,
51
+ ),
52
+ IsolationLevel .STALE_READONLY : IsolationSettings (
53
+ ydb .QueryStaleReadOnly (), interactive = True
54
+ ),
55
+ IsolationLevel .SNAPSHOT_READONLY : IsolationSettings (
56
+ ydb .QuerySnapshotReadOnly (), interactive = True
57
+ ),
58
+ }
59
+ ydb_isolation_settings = ydb_isolation_settings_map [isolation_level ]
60
+ if self ._tx_context and self ._tx_context .tx_id :
61
+ raise InternalError (
62
+ "Failed to set transaction mode: transaction is already began"
63
+ )
64
+ self ._tx_mode = ydb_isolation_settings .ydb_mode
65
+ self .interactive_transaction = ydb_isolation_settings .interactive
66
+
67
+ def get_isolation_level (self ) -> str :
68
+ if self ._tx_mode .name == ydb .QuerySerializableReadWrite ().name :
69
+ if self .interactive_transaction :
70
+ return IsolationLevel .SERIALIZABLE
71
+ return IsolationLevel .AUTOCOMMIT
72
+ if self ._tx_mode .name == ydb .QueryOnlineReadOnly ().name :
73
+ if self ._tx_mode .settings .allow_inconsistent_reads :
74
+ return IsolationLevel .ONLINE_READONLY_INCONSISTENT
75
+ return IsolationLevel .ONLINE_READONLY
76
+ if self ._tx_mode .name == ydb .QueryStaleReadOnly ().name :
77
+ return IsolationLevel .STALE_READONLY
78
+ if self ._tx_mode .name == ydb .QuerySnapshotReadOnly ().name :
79
+ return IsolationLevel .SNAPSHOT_READONLY
80
+ msg = f"{ self ._tx_mode .name } is not supported"
81
+ raise NotSupportedError (msg )
82
+
83
+
84
+ class Connection (BaseConnection ):
29
85
def __init__ (
30
86
self ,
31
87
host : str = "" ,
@@ -80,56 +136,6 @@ def wait_ready(self, timeout: int = 10) -> None:
80
136
81
137
self ._session = self ._session_pool .acquire ()
82
138
83
- def set_isolation_level (self , isolation_level : str ) -> None :
84
- class IsolationSettings (NamedTuple ):
85
- ydb_mode : ydb .BaseQueryTxMode
86
- interactive : bool
87
-
88
- ydb_isolation_settings_map = {
89
- IsolationLevel .AUTOCOMMIT : IsolationSettings (
90
- ydb .QuerySerializableReadWrite (), interactive = False
91
- ),
92
- IsolationLevel .SERIALIZABLE : IsolationSettings (
93
- ydb .QuerySerializableReadWrite (), interactive = True
94
- ),
95
- IsolationLevel .ONLINE_READONLY : IsolationSettings (
96
- ydb .QueryOnlineReadOnly (), interactive = True
97
- ),
98
- IsolationLevel .ONLINE_READONLY_INCONSISTENT : IsolationSettings (
99
- ydb .QueryOnlineReadOnly ().with_allow_inconsistent_reads (),
100
- interactive = True ,
101
- ),
102
- IsolationLevel .STALE_READONLY : IsolationSettings (
103
- ydb .QueryStaleReadOnly (), interactive = True
104
- ),
105
- IsolationLevel .SNAPSHOT_READONLY : IsolationSettings (
106
- ydb .QuerySnapshotReadOnly (), interactive = True
107
- ),
108
- }
109
- ydb_isolation_settings = ydb_isolation_settings_map [isolation_level ]
110
- if self ._tx_context and self ._tx_context .tx_id :
111
- raise InternalError (
112
- "Failed to set transaction mode: transaction is already began"
113
- )
114
- self ._tx_mode = ydb_isolation_settings .ydb_mode
115
- self .interactive_transaction = ydb_isolation_settings .interactive
116
-
117
- def get_isolation_level (self ) -> str :
118
- if self ._tx_mode .name == ydb .QuerySerializableReadWrite ().name :
119
- if self .interactive_transaction :
120
- return IsolationLevel .SERIALIZABLE
121
- return IsolationLevel .AUTOCOMMIT
122
- if self ._tx_mode .name == ydb .QueryOnlineReadOnly ().name :
123
- if self ._tx_mode .settings .allow_inconsistent_reads :
124
- return IsolationLevel .ONLINE_READONLY_INCONSISTENT
125
- return IsolationLevel .ONLINE_READONLY
126
- if self ._tx_mode .name == ydb .QueryStaleReadOnly ().name :
127
- return IsolationLevel .STALE_READONLY
128
- if self ._tx_mode .name == ydb .QuerySnapshotReadOnly ().name :
129
- return IsolationLevel .SNAPSHOT_READONLY
130
- msg = f"{ self ._tx_mode .name } is not supported"
131
- raise NotSupportedError (msg )
132
-
133
139
def cursor (self ) -> Cursor :
134
140
if self ._session is None :
135
141
raise RuntimeError ("Connection is not ready, use wait_ready." )
@@ -220,7 +226,7 @@ def callee() -> ydb.Directory:
220
226
return result
221
227
222
228
223
- class AsyncConnection :
229
+ class AsyncConnection ( BaseConnection ) :
224
230
def __init__ (
225
231
self ,
226
232
host : str = "" ,
@@ -275,56 +281,6 @@ async def wait_ready(self, timeout: int = 10) -> None:
275
281
276
282
self ._session = await self ._session_pool .acquire ()
277
283
278
- def set_isolation_level (self , isolation_level : str ) -> None :
279
- class IsolationSettings (NamedTuple ):
280
- ydb_mode : ydb .BaseQueryTxMode
281
- interactive : bool
282
-
283
- ydb_isolation_settings_map = {
284
- IsolationLevel .AUTOCOMMIT : IsolationSettings (
285
- ydb .QuerySerializableReadWrite (), interactive = False
286
- ),
287
- IsolationLevel .SERIALIZABLE : IsolationSettings (
288
- ydb .QuerySerializableReadWrite (), interactive = True
289
- ),
290
- IsolationLevel .ONLINE_READONLY : IsolationSettings (
291
- ydb .QueryOnlineReadOnly (), interactive = True
292
- ),
293
- IsolationLevel .ONLINE_READONLY_INCONSISTENT : IsolationSettings (
294
- ydb .QueryOnlineReadOnly ().with_allow_inconsistent_reads (),
295
- interactive = True ,
296
- ),
297
- IsolationLevel .STALE_READONLY : IsolationSettings (
298
- ydb .QueryStaleReadOnly (), interactive = True
299
- ),
300
- IsolationLevel .SNAPSHOT_READONLY : IsolationSettings (
301
- ydb .QuerySnapshotReadOnly (), interactive = True
302
- ),
303
- }
304
- ydb_isolation_settings = ydb_isolation_settings_map [isolation_level ]
305
- if self ._tx_context and self ._tx_context .tx_id :
306
- raise InternalError (
307
- "Failed to set transaction mode: transaction is already began"
308
- )
309
- self ._tx_mode = ydb_isolation_settings .ydb_mode
310
- self .interactive_transaction = ydb_isolation_settings .interactive
311
-
312
- def get_isolation_level (self ) -> str :
313
- if self ._tx_mode .name == ydb .QuerySerializableReadWrite ().name :
314
- if self .interactive_transaction :
315
- return IsolationLevel .SERIALIZABLE
316
- return IsolationLevel .AUTOCOMMIT
317
- if self ._tx_mode .name == ydb .QueryOnlineReadOnly ().name :
318
- if self ._tx_mode .settings .allow_inconsistent_reads :
319
- return IsolationLevel .ONLINE_READONLY_INCONSISTENT
320
- return IsolationLevel .ONLINE_READONLY
321
- if self ._tx_mode .name == ydb .QueryStaleReadOnly ().name :
322
- return IsolationLevel .STALE_READONLY
323
- if self ._tx_mode .name == ydb .QuerySnapshotReadOnly ().name :
324
- return IsolationLevel .SNAPSHOT_READONLY
325
- msg = f"{ self ._tx_mode .name } is not supported"
326
- raise NotSupportedError (msg )
327
-
328
284
def cursor (self ) -> AsyncCursor :
329
285
if self ._session is None :
330
286
raise RuntimeError ("Connection is not ready, use wait_ready." )
0 commit comments