Skip to content

Commit a771128

Browse files
authored
Merge pull request #225 from ckan/sqlalchemy2
SQLAlchemy v2 support
2 parents a85bc19 + 873a7f6 commit a771128

File tree

6 files changed

+75
-70
lines changed

6 files changed

+75
-70
lines changed

.github/workflows/publish.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ jobs:
5353
ckan-image: "2.11-py3.10"
5454
- ckan-version: "2.10"
5555
ckan-image: "2.10-py3.10"
56-
- ckan-version: "2.9"
57-
ckan-image: "2.9-py3.9"
5856
#- ckan-version: "master" Publish does not care about master
5957
# ckan-image: "master"
6058
fail-fast: false

.github/workflows/test.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ jobs:
3030
- ckan-version: "2.10"
3131
ckan-image: "2.10-py3.10"
3232
experimental: false
33-
- ckan-version: "2.9"
34-
ckan-image: "2.9-py3.9"
35-
experimental: false
3633
- ckan-version: "master"
3734
ckan-image: "master"
3835
experimental: true # master is unstable, good to know if we are compatible or not

README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,13 @@ type -offers to contribute this are welcomed.
112112

113113
Compatibility with core CKAN versions:
114114

115-
| CKAN version | Compatibility |
116-
| -------------- | -------------------------------------------------------------------------------------------------------------- |
117-
| 2.7 | no longer supported (last supported version: 0.12.2) |
118-
| 2.8 | no longer supported (last supported version: 0.12.2) |
119-
| 2.9 | yes (Python3) (last supported version for Python 2.7: 0.12.2)), Must: `pip install "setuptools>=44.1.0,<71"` |
120-
| 2.10 | yes |
121-
| 2.11 | yes |
115+
| CKAN version | Compatibility |
116+
| -------------- |-------------------------------------------------------|
117+
| 2.7 | no longer supported (last supported version: 0.12.2) |
118+
| 2.8 | no longer supported (last supported version: 0.12.2) |
119+
| 2.9 | no longer supported (last supported version: 1.2.x) |
120+
| 2.10 | yes |
121+
| 2.11 | yes |
122122

123123
## Installation
124124

ckanext/xloader/db.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ def init(config, echo=False):
3939
global ENGINE, _METADATA, JOBS_TABLE, METADATA_TABLE, LOGS_TABLE
4040
db_uri = config.get('ckanext.xloader.jobs_db.uri',
4141
'sqlite:////tmp/xloader_jobs.db')
42-
ENGINE = sqlalchemy.create_engine(db_uri, echo=echo, convert_unicode=True)
43-
_METADATA = sqlalchemy.MetaData(ENGINE)
42+
ENGINE = sqlalchemy.create_engine(db_uri, echo=echo)
43+
_METADATA = sqlalchemy.MetaData()
4444
JOBS_TABLE = _init_jobs_table()
4545
METADATA_TABLE = _init_metadata_table()
4646
LOGS_TABLE = _init_logs_table()
@@ -111,8 +111,10 @@ def get_job(job_id):
111111
if job_id:
112112
job_id = six.text_type(job_id)
113113

114-
result = ENGINE.execute(
115-
JOBS_TABLE.select().where(JOBS_TABLE.c.job_id == job_id)).first()
114+
with ENGINE.connect() as conn:
115+
result = conn.execute(
116+
JOBS_TABLE.select().where(JOBS_TABLE.c.job_id == job_id)
117+
).first()
116118

117119
if not result:
118120
return None
@@ -298,10 +300,11 @@ def _update_job(job_id, job_dict):
298300
if "data" in job_dict:
299301
job_dict["data"] = six.text_type(job_dict["data"])
300302

301-
ENGINE.execute(
302-
JOBS_TABLE.update()
303-
.where(JOBS_TABLE.c.job_id == job_id)
304-
.values(**job_dict))
303+
with ENGINE.begin() as conn:
304+
conn.execute(
305+
JOBS_TABLE.update()
306+
.where(JOBS_TABLE.c.job_id == job_id)
307+
.values(**job_dict))
305308

306309

307310
def mark_job_as_completed(job_id, data=None):
@@ -443,9 +446,10 @@ def _get_metadata(job_id):
443446
# warnings.
444447
job_id = six.text_type(job_id)
445448

446-
results = ENGINE.execute(
447-
METADATA_TABLE.select().where(
448-
METADATA_TABLE.c.job_id == job_id)).fetchall()
449+
with ENGINE.connect() as conn:
450+
results = conn.execute(
451+
METADATA_TABLE.select().where(
452+
METADATA_TABLE.c.job_id == job_id)).fetchall()
449453
metadata = {}
450454
for row in results:
451455
value = row['value']
@@ -461,8 +465,9 @@ def _get_logs(job_id):
461465
# warnings.
462466
job_id = six.text_type(job_id)
463467

464-
results = ENGINE.execute(
465-
LOGS_TABLE.select().where(LOGS_TABLE.c.job_id == job_id)).fetchall()
468+
with ENGINE.connect() as conn:
469+
results = conn.execute(
470+
LOGS_TABLE.select().where(LOGS_TABLE.c.job_id == job_id)).fetchall()
466471

467472
results = [dict(result) for result in results]
468473

ckanext/xloader/loader.py

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from six.moves import zip
1515
from tabulator import config as tabulator_config, EncodingError, Stream, TabulatorException
1616
from unidecode import unidecode
17+
import sqlalchemy as sa
1718

1819
import ckan.plugins as p
1920

@@ -118,8 +119,8 @@ def _clear_datastore_resource(resource_id):
118119
'''
119120
engine = get_write_engine()
120121
with engine.begin() as conn:
121-
conn.execute("SET LOCAL lock_timeout = '15s'")
122-
conn.execute('TRUNCATE TABLE "{}" RESTART IDENTITY'.format(resource_id))
122+
conn.execute(sa.text("SET LOCAL lock_timeout = '15s'"))
123+
conn.execute(sa.text('TRUNCATE TABLE "{}" RESTART IDENTITY'.format(resource_id)))
123124

124125

125126
def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
@@ -282,12 +283,17 @@ def strip_white_space_iter():
282283
except Exception as e:
283284
raise LoaderError('Could not create the database table: {}'
284285
.format(e))
285-
connection = context['connection'] = engine.connect()
286+
286287

287288
# datstore_active is switched on by datastore_create - TODO temporarily
288289
# disable it until the load is complete
289-
_disable_fulltext_trigger(connection, resource_id)
290-
_drop_indexes(context, data_dict, False)
290+
291+
with engine.begin() as conn:
292+
_disable_fulltext_trigger(conn, resource_id)
293+
294+
with engine.begin() as conn:
295+
context['connection'] = conn
296+
_drop_indexes(context, data_dict, False)
291297

292298
logger.info('Copying to database...')
293299

@@ -305,9 +311,8 @@ def strip_white_space_iter():
305311
# 4. COPY FROM STDIN - not quite as fast as COPY from a file, but avoids
306312
# the superuser issue. <-- picked
307313

308-
raw_connection = engine.raw_connection()
309-
try:
310-
cur = raw_connection.cursor()
314+
with engine.begin() as conn:
315+
cur = conn.connection.cursor()
311316
try:
312317
with open(csv_filepath, 'rb') as f:
313318
# can't use :param for table name because params are only
@@ -337,15 +342,14 @@ def strip_white_space_iter():
337342

338343
finally:
339344
cur.close()
340-
finally:
341-
raw_connection.commit()
342345
finally:
343346
os.remove(csv_filepath) # i.e. the tempfile
344347

345348
logger.info('...copying done')
346349

347350
logger.info('Creating search index...')
348-
_populate_fulltext(connection, resource_id, fields=fields)
351+
with engine.begin() as conn:
352+
_populate_fulltext(conn, resource_id, fields=fields)
349353
logger.info('...search index created')
350354

351355
return fields
@@ -631,9 +635,9 @@ def fulltext_function_exists(connection):
631635
https://github.com/ckan/ckan/pull/3786
632636
or otherwise it is checked on startup of this plugin.
633637
'''
634-
res = connection.execute('''
638+
res = connection.execute(sa.text('''
635639
select * from pg_proc where proname = 'populate_full_text_trigger';
636-
''')
640+
'''))
637641
return bool(res.rowcount)
638642

639643

@@ -642,24 +646,25 @@ def fulltext_trigger_exists(connection, resource_id):
642646
This will only be the case if your CKAN is new enough to have:
643647
https://github.com/ckan/ckan/pull/3786
644648
'''
645-
res = connection.execute('''
649+
res = connection.execute(sa.text('''
646650
SELECT pg_trigger.tgname FROM pg_class
647651
JOIN pg_trigger ON pg_class.oid=pg_trigger.tgrelid
648652
WHERE pg_class.relname={table}
649653
AND pg_trigger.tgname='zfulltext';
650654
'''.format(
651-
table=literal_string(resource_id)))
655+
table=literal_string(resource_id))))
652656
return bool(res.rowcount)
653657

654658

655659
def _disable_fulltext_trigger(connection, resource_id):
656-
connection.execute('ALTER TABLE {table} DISABLE TRIGGER zfulltext;'
657-
.format(table=identifier(resource_id)))
660+
connection.execute(sa.text('ALTER TABLE {table} DISABLE TRIGGER zfulltext;'
661+
.format(table=identifier(resource_id, True))))
658662

659663

660664
def _enable_fulltext_trigger(connection, resource_id):
661-
connection.execute('ALTER TABLE {table} ENABLE TRIGGER zfulltext;'
662-
.format(table=identifier(resource_id)))
665+
connection.execute(sa.text(
666+
'ALTER TABLE {table} ENABLE TRIGGER zfulltext;'
667+
.format(table=identifier(resource_id, True))))
663668

664669

665670
def _populate_fulltext(connection, resource_id, fields):
@@ -672,23 +677,20 @@ def _populate_fulltext(connection, resource_id, fields):
672677
fields: list of dicts giving the each column's 'id' (name) and 'type'
673678
(text/numeric/timestamp)
674679
'''
675-
sql = \
676-
u'''
677-
UPDATE {table}
678-
SET _full_text = to_tsvector({cols});
679-
'''.format(
680-
# coalesce copes with blank cells
681-
table=identifier(resource_id),
682-
cols=" || ' ' || ".join(
680+
stmt = sa.update(sa.table(resource_id, sa.column("_full_text"))).values(
681+
_full_text=sa.text("to_tsvector({})".format(
682+
" || ' ' || ".join(
683683
'coalesce({}, \'\')'.format(
684684
identifier(field['id'])
685685
+ ('::text' if field['type'] != 'text' else '')
686686
)
687687
for field in fields
688688
if not field['id'].startswith('_')
689689
)
690-
)
691-
connection.execute(sql)
690+
))
691+
)
692+
693+
connection.execute(stmt)
692694

693695

694696
def calculate_record_count(resource_id, logger):
@@ -700,15 +702,18 @@ def calculate_record_count(resource_id, logger):
700702
logger.info('Calculating record count (running ANALYZE on the table)')
701703
engine = get_write_engine()
702704
conn = engine.connect()
703-
conn.execute("ANALYZE \"{resource_id}\";"
704-
.format(resource_id=resource_id))
705+
conn.execute(sa.text("ANALYZE \"{resource_id}\";"
706+
.format(resource_id=resource_id)))
705707

706708

707-
def identifier(s):
709+
def identifier(s, escape_binds=False):
708710
# "%" needs to be escaped, otherwise connection.execute thinks it is for
709711
# substituting a bind parameter
710-
return u'"' + s.replace(u'"', u'""').replace(u'\0', '').replace('%', '%%')\
711-
+ u'"'
712+
escaped = s.replace(u'"', u'""').replace(u'\0', '')
713+
if escape_binds:
714+
escaped = escaped.replace('%', '%%')
715+
716+
return u'"' + escaped + u'"'
712717

713718

714719
def literal_string(s):

ckanext/xloader/tests/test_loader.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import pytest
66
import six
7+
import sqlalchemy as sa
78
import sqlalchemy.orm as orm
89
import datetime
910
import logging
@@ -47,17 +48,16 @@ def _get_records(
4748
c = Session.connection()
4849
if exclude_full_text_column:
4950
cols = self._get_column_names(Session, table_name)
50-
cols = ", ".join(
51-
loader.identifier(col) for col in cols if col != "_full_text"
52-
)
51+
cols = [
52+
sa.column(col) for col in cols if col != "_full_text"
53+
]
5354
else:
54-
cols = "*"
55-
sql = 'SELECT {cols} FROM "{table_name}"'.format(
56-
cols=cols, table_name=table_name
57-
)
55+
cols = [sa.text("*")]
56+
stmt = sa.select(*cols).select_from(sa.table(table_name))
57+
5858
if limit is not None:
59-
sql += " LIMIT {}".format(limit)
60-
results = c.execute(sql)
59+
stmt = stmt.limit(limit)
60+
results = c.execute(stmt)
6161
return results.fetchall()
6262

6363
def _get_column_names(self, Session, table_name):
@@ -71,7 +71,7 @@ def _get_column_names(self, Session, table_name):
7171
ORDER BY ordinal_position;
7272
""".format(table_name)
7373
)
74-
results = c.execute(sql)
74+
results = c.execute(sa.text(sql))
7575
records = results.fetchall()
7676
return [r[0] for r in records]
7777

@@ -85,7 +85,7 @@ def _get_column_types(self, Session, table_name):
8585
ORDER BY ordinal_position;
8686
""".format(table_name)
8787
)
88-
results = c.execute(sql)
88+
results = c.execute(sa.text(sql))
8989
records = results.fetchall()
9090
return [r[0] for r in records]
9191

0 commit comments

Comments
 (0)