Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,14 @@ groups:
they will also display "complete", "active", "inactive", and "unknown".
type: bool
required: false
- key: ckanext.xloader.search_update_chunks
default: 100000
example: True
description: |
The number of rows to process in each batch when populating the full-text
search index. Chunked processing prevents database timeouts and memory
exhaustion when indexing very large datasets (4GB+ files with millions of rows).
Smaller values reduce memory usage but increase processing time. Larger values
improve performance but may cause timeouts on very large tables.
type: int
required: false
104 changes: 80 additions & 24 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def strip_white_space_iter():

logger.info('Creating search index...')
with engine.begin() as conn:
_populate_fulltext(conn, resource_id, fields=fields)
_populate_fulltext(conn, resource_id, fields=fields, logger=logger)
logger.info('...search index created')

return fields
Expand Down Expand Up @@ -659,30 +659,86 @@ def _enable_fulltext_trigger(connection, resource_id):
.format(table=identifier(resource_id, True))))


def _populate_fulltext(connection, resource_id, fields):
'''Populates the _full_text column. i.e. the same as datastore_run_triggers
but it runs in 1/9 of the time.

The downside is that it reimplements the code that calculates the text to
index, breaking DRY. And its annoying to pass in the column names.

fields: list of dicts giving the each column's 'id' (name) and 'type'
(text/numeric/timestamp)
def _get_rows_count_of_resource(connection, table):
count_query = ''' SELECT count(_id) from {table} '''.format(table=table)
results = connection.execute(count_query)
rows_count = int(results.first()[0])
return rows_count


def _populate_fulltext(connection, resource_id, fields, logger):
'''Populates the _full_text column for full-text search functionality.

This function creates a PostgreSQL tsvector (text search vector) for each row
by concatenating all non-system columns. It's equivalent to datastore_run_triggers
but runs approximately 9x faster by using direct SQL updates.

To handle very large datasets (e.g., 4GB+ files with millions of rows), the update
operation is partitioned into chunks to prevent:
- Database statement timeouts
- Memory exhaustion
- Lock contention that could block other operations
- Transaction log overflow

The chunking mechanism processes rows in batches based on their _id values,
with chunk size configurable via 'ckanext.xloader.search_update_chunks'
(default: 100,000 rows per chunk).

Args:
connection: Database connection object
resource_id (str): The datastore table identifier
fields (list): List of dicts with column 'id' (name) and 'type'
(text/numeric/timestamp)
logger: Logger instance for progress tracking

Note:
This reimplements CKAN's text indexing logic for performance,
breaking DRY principle but providing significant speed improvements.
'''
stmt = sa.update(sa.table(resource_id, sa.column("_full_text"))).values(
_full_text=sa.text("to_tsvector({})".format(
" || ' ' || ".join(
'coalesce({}, \'\')'.format(
identifier(field['id'])
+ ('::text' if field['type'] != 'text' else '')
)
for field in fields
if not field['id'].startswith('_')
)
))
)

connection.execute(stmt)
try:
# Get total row count to determine chunking strategy
rows_count = _get_rows_count_of_resource(connection, identifier(resource_id))
except Exception as e:
rows_count = ''
logger.info("Failed to get resource rows count: {} ".format(str(e)))
raise

if rows_count:
# Configure chunk size - prevents timeouts and memory issues on large datasets
# Default 100,000 rows per chunk balances performance vs. resource usage
chunks = int(config.get('ckanext.xloader.search_update_chunks', 100000))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add this config item to https://github.com/ckan/ckanext-xloader/blob/master/ckanext/xloader/config_declaration.yaml with descrition and example. You can also set default value there so its not hidden in the code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


# Process table in chunks using _id range queries
# This approach ensures consistent chunk sizes and allows resuming if interrupted
for start in range(0, rows_count, chunks):
try:
# Build SQL to update _full_text column with concatenated searchable content
sql = \
'''
UPDATE {table}
SET _full_text = to_tsvector({cols}) WHERE _id BETWEEN {first} and {end};
'''.format(
table=identifier(resource_id),
# Concatenate all user columns (excluding system columns starting with '_')
# coalesce() handles NULL values by converting them to empty strings
cols=" || ' ' || ".join(
'coalesce({}, \'\')'.format(
identifier(field['id'])
+ ('::text' if field['type'] != 'text' else '') # Cast non-text types
)
for field in fields
if not field['id'].startswith('_') # Skip system columns like _id, _full_text
),
first=start,
end=start + chunks
)
connection.execute(sql)
logger.info("Indexed rows {first} to {end} of {total}".format(
first=start, end=min(start + chunks, rows_count), total=rows_count))
except Exception as e:
# Log chunk-specific errors but continue processing remaining chunks
logger.error("Failed to index rows {first}-{end}: {error}".format(
first=start, end=start + chunks, error=str(e)))


def calculate_record_count(resource_id, logger):
Expand Down
Loading