-
Notifications
You must be signed in to change notification settings - Fork 56
perf: implement chunked fulltext indexing for large datasets #256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -341,7 +341,7 @@ def strip_white_space_iter(): | |
|
||
logger.info('Creating search index...') | ||
with engine.begin() as conn: | ||
_populate_fulltext(conn, resource_id, fields=fields) | ||
_populate_fulltext(conn, resource_id, fields=fields, logger=logger) | ||
logger.info('...search index created') | ||
|
||
return fields | ||
|
@@ -659,30 +659,86 @@ def _enable_fulltext_trigger(connection, resource_id): | |
.format(table=identifier(resource_id, True)))) | ||
|
||
|
||
def _populate_fulltext(connection, resource_id, fields): | ||
'''Populates the _full_text column. i.e. the same as datastore_run_triggers | ||
but it runs in 1/9 of the time. | ||
|
||
The downside is that it reimplements the code that calculates the text to | ||
index, breaking DRY. And its annoying to pass in the column names. | ||
|
||
fields: list of dicts giving the each column's 'id' (name) and 'type' | ||
(text/numeric/timestamp) | ||
def _get_rows_count_of_resource(connection, table): | ||
count_query = ''' SELECT count(_id) from {table} '''.format(table=table) | ||
results = connection.execute(count_query) | ||
rows_count = int(results.first()[0]) | ||
return rows_count | ||
|
||
|
||
def _populate_fulltext(connection, resource_id, fields, logger): | ||
'''Populates the _full_text column for full-text search functionality. | ||
|
||
This function creates a PostgreSQL tsvector (text search vector) for each row | ||
by concatenating all non-system columns. It's equivalent to datastore_run_triggers | ||
but runs approximately 9x faster by using direct SQL updates. | ||
|
||
To handle very large datasets (e.g., 4GB+ files with millions of rows), the update | ||
operation is partitioned into chunks to prevent: | ||
- Database statement timeouts | ||
- Memory exhaustion | ||
- Lock contention that could block other operations | ||
- Transaction log overflow | ||
|
||
The chunking mechanism processes rows in batches based on their _id values, | ||
with chunk size configurable via 'ckanext.xloader.search_update_chunks' | ||
(default: 100,000 rows per chunk). | ||
|
||
Args: | ||
connection: Database connection object | ||
resource_id (str): The datastore table identifier | ||
fields (list): List of dicts with column 'id' (name) and 'type' | ||
(text/numeric/timestamp) | ||
logger: Logger instance for progress tracking | ||
|
||
Note: | ||
This reimplements CKAN's text indexing logic for performance, | ||
breaking DRY principle but providing significant speed improvements. | ||
''' | ||
stmt = sa.update(sa.table(resource_id, sa.column("_full_text"))).values( | ||
_full_text=sa.text("to_tsvector({})".format( | ||
" || ' ' || ".join( | ||
'coalesce({}, \'\')'.format( | ||
identifier(field['id']) | ||
+ ('::text' if field['type'] != 'text' else '') | ||
) | ||
for field in fields | ||
if not field['id'].startswith('_') | ||
) | ||
)) | ||
) | ||
|
||
connection.execute(stmt) | ||
try: | ||
# Get total row count to determine chunking strategy | ||
rows_count = _get_rows_count_of_resource(connection, identifier(resource_id)) | ||
except Exception as e: | ||
rows_count = '' | ||
logger.info("Failed to get resource rows count: {} ".format(str(e))) | ||
raise | ||
|
||
if rows_count: | ||
# Configure chunk size - prevents timeouts and memory issues on large datasets | ||
# Default 100,000 rows per chunk balances performance vs. resource usage | ||
chunks = int(config.get('ckanext.xloader.search_update_chunks', 100000)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add this config item to https://github.com/ckan/ckanext-xloader/blob/master/ckanext/xloader/config_declaration.yaml with descrition and example. You can also set default value there so its not hidden in the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
# Process table in chunks using _id range queries | ||
# This approach ensures consistent chunk sizes and allows resuming if interrupted | ||
for start in range(0, rows_count, chunks): | ||
try: | ||
# Build SQL to update _full_text column with concatenated searchable content | ||
sql = \ | ||
''' | ||
UPDATE {table} | ||
SET _full_text = to_tsvector({cols}) WHERE _id BETWEEN {first} and {end}; | ||
'''.format( | ||
table=identifier(resource_id), | ||
# Concatenate all user columns (excluding system columns starting with '_') | ||
# coalesce() handles NULL values by converting them to empty strings | ||
cols=" || ' ' || ".join( | ||
'coalesce({}, \'\')'.format( | ||
identifier(field['id']) | ||
+ ('::text' if field['type'] != 'text' else '') # Cast non-text types | ||
) | ||
for field in fields | ||
if not field['id'].startswith('_') # Skip system columns like _id, _full_text | ||
), | ||
first=start, | ||
end=start + chunks | ||
) | ||
connection.execute(sql) | ||
logger.info("Indexed rows {first} to {end} of {total}".format( | ||
first=start, end=min(start + chunks, rows_count), total=rows_count)) | ||
except Exception as e: | ||
# Log chunk-specific errors but continue processing remaining chunks | ||
logger.error("Failed to index rows {first}-{end}: {error}".format( | ||
first=start, end=start + chunks, error=str(e))) | ||
|
||
|
||
def calculate_record_count(resource_id, logger): | ||
|
Uh oh!
There was an error while loading. Please reload this page.