diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 3327c84b..4b529839 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -187,3 +187,14 @@ groups: they will also display "complete", "active", "inactive", and "unknown". type: bool required: false + - key: ckanext.xloader.search_update_chunks + default: 100000 + example: True + description: | + The number of rows to process in each batch when populating the full-text + search index. Chunked processing prevents database timeouts and memory + exhaustion when indexing very large datasets (4GB+ files with millions of rows). + Smaller values reduce memory usage but increase processing time. Larger values + improve performance but may cause timeouts on very large tables. + type: int + required: false diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index f45b579c..23f9ea09 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -341,7 +341,7 @@ def strip_white_space_iter(): logger.info('Creating search index...') with engine.begin() as conn: - _populate_fulltext(conn, resource_id, fields=fields) + _populate_fulltext(conn, resource_id, fields=fields, logger=logger) logger.info('...search index created') return fields @@ -659,30 +659,86 @@ def _enable_fulltext_trigger(connection, resource_id): .format(table=identifier(resource_id, True)))) -def _populate_fulltext(connection, resource_id, fields): - '''Populates the _full_text column. i.e. the same as datastore_run_triggers - but it runs in 1/9 of the time. - - The downside is that it reimplements the code that calculates the text to - index, breaking DRY. And its annoying to pass in the column names. - - fields: list of dicts giving the each column's 'id' (name) and 'type' - (text/numeric/timestamp) +def _get_rows_count_of_resource(connection, table): + count_query = ''' SELECT count(_id) from {table} '''.format(table=table) + results = connection.execute(count_query) + rows_count = int(results.first()[0]) + return rows_count + + +def _populate_fulltext(connection, resource_id, fields, logger): + '''Populates the _full_text column for full-text search functionality. + + This function creates a PostgreSQL tsvector (text search vector) for each row + by concatenating all non-system columns. It's equivalent to datastore_run_triggers + but runs approximately 9x faster by using direct SQL updates. + + To handle very large datasets (e.g., 4GB+ files with millions of rows), the update + operation is partitioned into chunks to prevent: + - Database statement timeouts + - Memory exhaustion + - Lock contention that could block other operations + - Transaction log overflow + + The chunking mechanism processes rows in batches based on their _id values, + with chunk size configurable via 'ckanext.xloader.search_update_chunks' + (default: 100,000 rows per chunk). + + Args: + connection: Database connection object + resource_id (str): The datastore table identifier + fields (list): List of dicts with column 'id' (name) and 'type' + (text/numeric/timestamp) + logger: Logger instance for progress tracking + + Note: + This reimplements CKAN's text indexing logic for performance, + breaking DRY principle but providing significant speed improvements. ''' - stmt = sa.update(sa.table(resource_id, sa.column("_full_text"))).values( - _full_text=sa.text("to_tsvector({})".format( - " || ' ' || ".join( - 'coalesce({}, \'\')'.format( - identifier(field['id']) - + ('::text' if field['type'] != 'text' else '') - ) - for field in fields - if not field['id'].startswith('_') - ) - )) - ) - - connection.execute(stmt) + try: + # Get total row count to determine chunking strategy + rows_count = _get_rows_count_of_resource(connection, identifier(resource_id)) + except Exception as e: + rows_count = '' + logger.info("Failed to get resource rows count: {} ".format(str(e))) + raise + + if rows_count: + # Configure chunk size - prevents timeouts and memory issues on large datasets + # Default 100,000 rows per chunk balances performance vs. resource usage + chunks = int(config.get('ckanext.xloader.search_update_chunks', 100000)) + + # Process table in chunks using _id range queries + # This approach ensures consistent chunk sizes and allows resuming if interrupted + for start in range(0, rows_count, chunks): + try: + # Build SQL to update _full_text column with concatenated searchable content + sql = \ + ''' + UPDATE {table} + SET _full_text = to_tsvector({cols}) WHERE _id BETWEEN {first} and {end}; + '''.format( + table=identifier(resource_id), + # Concatenate all user columns (excluding system columns starting with '_') + # coalesce() handles NULL values by converting them to empty strings + cols=" || ' ' || ".join( + 'coalesce({}, \'\')'.format( + identifier(field['id']) + + ('::text' if field['type'] != 'text' else '') # Cast non-text types + ) + for field in fields + if not field['id'].startswith('_') # Skip system columns like _id, _full_text + ), + first=start, + end=start + chunks + ) + connection.execute(sql) + logger.info("Indexed rows {first} to {end} of {total}".format( + first=start, end=min(start + chunks, rows_count), total=rows_count)) + except Exception as e: + # Log chunk-specific errors but continue processing remaining chunks + logger.error("Failed to index rows {first}-{end}: {error}".format( + first=start, end=start + chunks, error=str(e))) def calculate_record_count(resource_id, logger):