Skip to content

Conversation

cpegeric
Copy link
Contributor

@cpegeric cpegeric commented Oct 9, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue ##21835

What this PR does / why we need it:

  1. hnsw update improvement - hnsw update will call hnsw APIs instead of running SQL. With HNSW APIs, we can
    load the models once and perform all batches from DataRetriever and finally save model files into database.
    This changes save a lot of time for upload/download the model every time there is a 8192 vector block.
  2. Common interface SqlProcess created to allow vector index and fulltext APIs in both frontend (process.Process) and background mode (when process.Process is absent)
  3. iscp integration
  4. fully async ivfflat index update. Kmean clustering will be done in separate thread in async mode.

PR Type

Enhancement, Tests


Description

ISCP Integration: Complete integration of Index Sync Change Processing (ISCP) for asynchronous vector and fulltext index operations
SqlProcess Abstraction: Introduced new SqlProcess interface to abstract both frontend (process.Process) and background (SqlContext) execution contexts across all vector index operations
HNSW Performance Improvements: Refactored HNSW update operations to use direct APIs instead of SQL, enabling model loading once and batch processing for significant performance gains
Async Index Support: Added comprehensive support for asynchronous IVF, HNSW, and FullText indexes with CDC task management
Transaction Event System: Enhanced transaction event callback system with context support and structured TxnEventCallback wrapper
DDL Integration: Integrated ISCP job management with DDL operations (create, drop, alter table) for proper index lifecycle management
Test Coverage: Added extensive test suites for async vector operations and updated existing tests for new interfaces


Diagram Walkthrough

flowchart LR
  A["process.Process"] --> B["SqlProcess Abstraction"]
  B --> C["Frontend Execution"]
  B --> D["Background Execution"]
  E["HNSW Operations"] --> F["Direct API Calls"]
  F --> G["Batch Processing"]
  H["DDL Operations"] --> I["ISCP Job Management"]
  I --> J["CDC Task Creation"]
  K["Vector Indexes"] --> L["Async Support"]
  L --> M["IVF/HNSW/FullText"]
  N["Transaction Events"] --> O["TxnEventCallback"]
  O --> P["Context Support"]
Loading

File Walkthrough

Relevant files
Enhancement
37 files
sync.go
Refactor HNSW sync to use SqlProcess abstraction                 

pkg/vectorindex/hnsw/sync.go

• Refactored CdcSync function to use new HnswSync struct with separate
initialization, update, and save phases
• Replaced process.Process
parameter with sqlexec.SqlProcess for better abstraction
• Added new
methods NewHnswSync, RunOnce, Update, Save, and DownloadAll to the
HnswSync struct
• Modified function signatures to accept SqlProcess
instead of process.Process throughout the file

+120/-76
sqlexec.go
Create SqlProcess abstraction for frontend and background execution

pkg/vectorindex/sqlexec/sqlexec.go

• Introduced SqlProcess struct to abstract both frontend
(process.Process) and background (SqlContext) execution contexts

Added SqlContext for background SQL execution with required context,
UUID, transaction operator, and account information
• Implemented
RunTxnWithSqlContext function for running transactions in background
mode
• Modified existing functions to work with SqlProcess abstraction
instead of direct process.Process

+273/-61
operator.go
Add context support to transaction event handling               

pkg/txn/client/operator.go

• Added context parameter to various transaction event handling
methods
• Updated triggerEvent and related methods to accept
context.Context parameter
• Modified transaction lifecycle methods
(Commit, Rollback, closeLocked) to pass context to event handlers

Enhanced error handling by joining multiple errors in transaction
operations

+40/-17 
mock_consumer.go
Update mock consumer for new transaction interface             

pkg/iscp/mock_consumer.go

• Updated NewInteralSqlConsumer to accept cnEngine and cnTxnClient
parameters
• Modified transaction handling to use direct transaction
operations instead of executor interface
• Updated consumeData method
signature to work with client.TxnOperator
• Enhanced error handling
and transaction lifecycle management in consumer operations

+53/-29 
build_dml_util.go
Add async index support to DML operations                               

pkg/sql/plan/build_dml_util.go

• Added async index detection logic to skip synchronous operations for
async fulltext and vector indexes
• Enhanced DML plan building to
handle async indexes appropriately
• Updated index processing logic in
insert, delete, and update operations to check for async configuration

• Improved error handling for index async parameter parsing

+65/-18 
fulltext.go
Update fulltext functions for SqlProcess interface             

pkg/sql/colexec/table_function/fulltext.go

• Updated function calls to use sqlexec.NewSqlProcess(proc) wrapper

Modified runWordStats and runCountStar functions to work with new
SqlProcess interface
• Maintained existing functionality while
adapting to new SQL execution abstraction

+2/-2     
cache_test.go
Update vector index cache tests for SQL process interface

pkg/vectorindex/cache/cache_test.go

• Replaced process.Process with sqlexec.SqlProcess in mock search
interfaces
• Updated test functions to create and use SqlProcess
instances
• Modified all Search and Load method signatures to use the
new SQL process interface

+24/-17 
model.go
Refactor HNSW model to use SQL process interface                 

pkg/vectorindex/hnsw/model.go

• Replaced process.Process with sqlexec.SqlProcess throughout the file

• Added NThread field to HnswModel struct for thread configuration

Enhanced error handling and initialization logic in initIndex method

Updated streaming SQL execution to use new SQL process interface

+70/-26 
search.go
Refactor IVF flat search to use SQL process interface       

pkg/vectorindex/ivfflat/search.go

• Replaced process.Process with sqlexec.SqlProcess in all method
signatures
• Updated context handling to use SQL process context

Modified streaming operations to work with new SQL process interface

+22/-19 
service_txn_event.go
Update transaction trace service for new event callback system

pkg/txn/trace/service_txn_event.go

• Updated transaction event callback registration to use new callback
wrapper
• Modified event handler signatures to include context and
return error
• Enhanced event handling with proper error propagation

+26/-20 
search.go
Refactor HNSW search to use SQL process interface               

pkg/vectorindex/hnsw/search.go

• Replaced process.Process with sqlexec.SqlProcess in all method
signatures
• Updated context handling and error messages to use SQL
process context
• Modified metadata loading and index operations for
new interface

+9/-10   
cache.go
Update vector index cache for SQL process interface           

pkg/vectorindex/cache/cache.go

• Updated VectorIndexSearchIf interface to use sqlexec.SqlProcess

Modified cache search and load operations for new SQL process
interface
• Updated all method signatures and implementations
consistently

+10/-10 
client.go
Implement new transaction event callback system                   

pkg/txn/client/client.go

• Updated transaction event callback system with new wrapper structure

• Modified callback registration to use TxnEventCallback wrapper

Enhanced event handlers with context and error return parameters

+11/-5   
index_sqlwriter.go
Refactor HNSW SQL writer for CDC integration                         

pkg/iscp/index_sqlwriter.go

• Modified HNSW SQL writer to output JSON instead of SQL statements

Added NewSync method for creating HNSW synchronization objects

Updated CDC writer capacity configuration for better performance

+9/-4     
ivf_search.go
Update IVF search table function for SQL process interface

pkg/sql/colexec/table_function/ivf_search.go

• Updated IVF search table function to use sqlexec.SqlProcess

Modified version retrieval and cache search operations
• Wrapped
process instances with SQL process interface

+3/-2     
service.go
Update shard service for new transaction callback system 

pkg/shardservice/service.go

• Updated transaction event callback registration with new wrapper
system
• Modified callback functions to include context and error
return
• Enhanced error handling in shard service operations

+26/-22 
operator_events.go
Implement enhanced transaction event callback system         

pkg/txn/client/operator_events.go

• Introduced TxnEventCallback wrapper structure for callbacks

Updated callback registration and triggering mechanisms
• Enhanced
event system with context and error handling support

+11/-6   
build.go
Update HNSW build operations for SQL process interface     

pkg/vectorindex/hnsw/build.go

• Updated HNSW build operations to use sqlexec.SqlProcess
• Modified
context handling in worker threads
• Updated method signatures for
consistency with new interface

+7/-6     
storage_test.go
Update transaction event callback interface in partition storage tests

pkg/partitionservice/storage_test.go

• Updated transaction event callback to use new TxnEventCallback
structure
• Changed callback function signature to include context,
operator, event and callback data parameters
• Added error return type
to callback function

+24/-20 
build_test.go
Integrate SqlProcess wrapper for HNSW build operations     

pkg/vectorindex/hnsw/build_test.go

• Added import for sqlexec package
• Updated NewHnswBuild calls to use
sqlexec.NewSqlProcess(proc) instead of proc directly

+5/-2     
hnsw_search.go
Integrate SqlProcess wrapper for HNSW search operations   

pkg/sql/colexec/table_function/hnsw_search.go

• Added import for sqlexec package
• Updated vector cache search call
to use sqlexec.NewSqlProcess(proc) wrapper

+2/-1     
data_retriever.go
Refactor watermark update to use transaction operator interface

pkg/iscp/data_retriever.go

• Updated UpdateWatermark method signature to use context and
transaction operator instead of executor
• Added system account
context setup with timeout
• Replaced SQL execution with
ExecWithResult function call

+18/-4   
service.go
Update transaction event handling in increment service     

pkg/incrservice/service.go

• Updated transaction event callback registration to use new
TxnEventCallback structure
• Modified txnClosed method signature to
match new callback interface

+4/-3     
storage_test.go
Update transaction event callback interface in shard storage tests

pkg/shardservice/storage_test.go

• Updated transaction event callbacks to use new TxnEventCallback
structure
• Changed callback function signatures to include context,
operator, event and callback data parameters
• Added error return type
to callback functions

+20/-16 
hnsw_create.go
Integrate SqlProcess wrapper for HNSW creation operations

pkg/sql/colexec/table_function/hnsw_create.go

• Updated SQL execution calls to use sqlexec.NewSqlProcess(proc)
wrapper
• Modified HNSW build creation calls to use the new SQL
process interface

+3/-3     
consumer.go
Extend consumer creation with engine and transaction client

pkg/iscp/consumer.go

• Added engine and transaction client parameters to NewConsumer
function
• Updated consumer creation calls to pass additional
parameters

+9/-3     
types.go
Introduce structured transaction event callback system     

pkg/txn/client/types.go

• Updated AppendEventCallback interface to use TxnEventCallback
instead of function type
• Added new TxnEventCallback struct with
function and value fields
• Added constructor functions for creating
callback instances

+18/-1   
types.go
Update DataRetriever interface for transaction operator usage

pkg/iscp/types.go

• Updated DataRetriever interface method signature for UpdateWatermark

• Removed executor import and updated to use context and transaction
operator

+1/-2     
metadata_scan.go
Integrate SqlProcess wrapper for metadata scan operations

pkg/sql/colexec/table_function/metadata_scan.go

• Added import for sqlexec package and reorganized imports
• Updated
SQL execution call to use sqlexec.NewSqlProcess(proc) wrapper

+4/-2     
store_mem.go
Update transaction event callback in memory store               

pkg/incrservice/store_mem.go

• Updated transaction event callback to use new TxnEventCallback
structure
• Modified callback function signature to include context,
operator, event and callback data parameters

+12/-10 
sql.go
Integrate SqlProcess wrapper for IVF flat SQL operations 

pkg/vectorindex/ivfflat/sql.go

• Updated GetVersion function to use SqlProcess instead of
process.Process
• Modified SQL execution and error context to use the
new interface

+4/-4     
types.go
Enhance vector index CDC with configurable capacity and sonic JSON

pkg/vectorindex/types.go

• Added capacity parameter to NewVectorIndexCdc constructor function

Replaced json package with sonic for JSON marshaling

+4/-4     
ivf_create.go
Integrate SqlProcess wrapper for IVF creation operations 

pkg/sql/colexec/table_function/ivf_create.go

• Updated version retrieval and SQL execution calls to use
sqlexec.NewSqlProcess(proc) wrapper

+2/-2     
service.go
Update transaction event callback in partition service     

pkg/partitionservice/service.go

• Updated transaction event callback to use new TxnEventCallback
structure
• Modified callback function signature to include context,
operator, event and callback data parameters

+7/-5     
iteration.go
Update consumer creation with additional parameters in iteration

pkg/iscp/iteration.go

• Updated NewConsumer call to include engine and transaction client
parameters

+1/-1     
storage_txn_client.go
Update memory storage transaction client for new callback interface

pkg/txn/storage/memorystorage/storage_txn_client.go

• Updated AppendEventCallback method signature to use TxnEventCallback
instead of function type

+1/-1     
watermark_updater.go
Make ExecWithResult function mockable for testing               

pkg/iscp/watermark_updater.go

• Made ExecWithResult function a variable to allow for testing/mocking

+1/-1     
Tests
24 files
index_consumer_test.go
Update ISCP consumer tests for new interface                         

pkg/iscp/index_consumer_test.go

• Updated test functions to use new NewConsumer signature with
cnEngine and cnClient parameters
• Replaced mock SQL executor with
ExecWithResult stub for cleaner testing
• Added new test functions for
IVF index handling (newTestIvfTableDef, newTestIvfConsumerInfo)

Modified existing tests to work with the new consumer interface and
removed complex mock implementations

+158/-106
sync_test.go
Update HNSW sync tests for SqlProcess interface                   

pkg/vectorindex/hnsw/sync_test.go

• Updated all test functions to use sqlexec.NewSqlProcess(proc)
instead of passing process.Process directly
• Modified test calls from
CdcSync to NewHnswSync followed by RunOnce pattern
• Added new test
for continuous update operations with
TestSyncContinuousUpdateInsertShuffle2FilesF32WithSmallCap
• Updated
mock function signatures to accept SqlProcess parameter

+136/-27
iscp_util_test.go
Add comprehensive tests for ISCP utility functions             

pkg/sql/compile/iscp_util_test.go

• Comprehensive test suite for ISCP utility functions
• Tests for
index CDC validation, task creation/deletion
• Mock functions for
testing error scenarios
• Coverage for async index parameter
validation

+301/-0 
model_test.go
Update HNSW model tests for SQL process interface               

pkg/vectorindex/hnsw/model_test.go

• Updated test functions to use sqlexec.SqlProcess instead of
process.Process
• Modified mock streaming functions to accept new SQL
process interface
• Updated all test cases to create and use
SqlProcess instances

+22/-18 
engine_mock.go
Update engine mock for interface compatibility                     

pkg/frontend/test/engine_mock.go

• Updated mock method signatures to match engine interface changes

Added HasTempEngine method to mock engine
• Fixed parameter names and
method signatures for consistency

+22/-20 
search_test.go
Update HNSW search tests for SQL process interface             

pkg/vectorindex/hnsw/search_test.go

• Updated mock functions to use sqlexec.SqlProcess interface

Modified test setup to create and use SqlProcess instances
• Updated
all search operations to use new SQL process interface

+15/-10 
search_test.go
Update IVF flat search tests for SQL process interface     

pkg/vectorindex/ivfflat/search_test.go

• Updated mock streaming functions to use sqlexec.SqlProcess

Modified test setup to create and use SqlProcess instances
• Updated
all search operations and mock interfaces consistently

+12/-7   
ivf_search_test.go
Update IVF search table function tests for SQL process interface

pkg/sql/colexec/table_function/ivf_search_test.go

• Updated mock functions to use sqlexec.SqlProcess interface

Modified mock search implementations for new interface
• Updated test
helper functions consistently

+6/-5     
build_dml_util_test.go
Add tests for async FullText index functionality                 

pkg/sql/plan/build_dml_util_test.go

• Added tests for async FullText index functionality
• Tests for
parameter validation and error handling
• Coverage for both sync and
async index modes

+49/-0   
fulltext_test.go
Integrate SqlProcess wrapper for fulltext test operations

pkg/sql/colexec/table_function/fulltext_test.go

• Added import for sqlexec package
• Updated fake SQL execution
functions to accept SqlProcess instead of process.Process
• Modified
function signatures to use the new SQL process wrapper

+5/-3     
hnsw_search_test.go
Update mock search interface for SqlProcess integration   

pkg/sql/colexec/table_function/hnsw_search_test.go

• Added import for sqlexec package
• Updated mock search interface
methods to use SqlProcess instead of process.Process

+3/-2     
hnsw_create_test.go
Update HNSW creation test for SqlProcess integration         

pkg/sql/colexec/table_function/hnsw_create_test.go

• Added import for sqlexec package
• Updated mock SQL execution
function to use SqlProcess instead of process.Process

+3/-2     
sqlexec_test.go
Update SQL execution tests for SqlProcess integration       

pkg/vectorindex/sqlexec/sqlexec_test.go

• Updated test functions to create and use SqlProcess wrapper

Modified RunTxn calls to use the new SQL process interface

+5/-2     
operator_events_test.go
Update transaction event callback tests for new interface

pkg/txn/client/operator_events_test.go

• Updated transaction event callback test to use new TxnEventCallback
structure
• Modified callback function signature to match new
interface requirements

+6/-3     
txn_mock.go
Update mock transaction operator for new callback interface

pkg/frontend/test/txn_mock.go

• Updated mock transaction operator interface to use TxnEventCallback
instead of function type

+1/-1     
store_sql_test.go
Update test transaction operator for new callback interface

pkg/incrservice/store_sql_test.go

• Updated test transaction operator interface to use TxnEventCallback
instead of function type

+1/-1     
service_test.go
Update bootstrap test transaction operator for new callback interface

pkg/bootstrap/service_test.go

• Updated test transaction operator interface to use TxnEventCallback
instead of function type

+1/-1     
txn_test.go
Update frontend test transaction operator for new callback interface

pkg/frontend/txn_test.go

• Updated test transaction operator interface to use TxnEventCallback
instead of function type

+1/-1     
entire_engine_test.go
Update engine test operator for new callback interface     

pkg/vm/engine/entire_engine_test.go

• Updated test operator interface to use TxnEventCallback instead of
function type

+1/-1     
types_test.go
Update vector index CDC test for new constructor signature

pkg/vectorindex/types_test.go

• Updated NewVectorIndexCdc call to include capacity parameter (8192)

+1/-1     
vector_ivf_async.result
Add test results for asynchronous IVF vector index functionality

test/distributed/cases/vector/vector_ivf_async.result

• Added test results for asynchronous IVF vector index operations

Includes test cases for index creation, data loading, and vector
similarity searches

+61/-0   
vector_hnsw.result
Update HNSW vector test results for asynchronous operations

test/distributed/cases/vector/vector_hnsw.result

• Added sleep statement and updated expected query results
• Modified
test results to account for asynchronous index updates

+10/-2   
vector_hnsw_f64_async.sql
Add asynchronous HNSW F64 vector index test suite               

test/distributed/cases/vector/vector_hnsw_f64_async.sql

• Added comprehensive test suite for asynchronous HNSW vector index
operations with F64 precision
• Includes tests for CRUD operations,
index creation, and vector similarity searches

+96/-0   
vector_hnsw.sql
Update HNSW vector test for asynchronous index operations

test/distributed/cases/vector/vector_hnsw.sql

• Added sleep statement to allow for asynchronous index updates

Added table cleanup and updated test comments

+6/-1     
Feature
5 files
alter.go
Integrate ISCP with alter table operations                             

pkg/sql/compile/alter.go

• Added ISCP integration by dropping index CDC tasks for temporary
tables during alter table operations
• Implemented logic to handle
unaffected indexes during table alteration with ISCP job registration

• Added support for async index handling and improved index table
cloning logic
• Enhanced error handling and logging for ISCP-related
operations during alter table

+150/-47
ddl.go
Integrate ISCP job management with DDL operations               

pkg/sql/compile/ddl.go

• Added ISCP job management for database and table operations (create,
drop, rename)
• Integrated index CDC task creation and deletion for
vector and fulltext indexes
• Added async index handling logic to skip
synchronous operations for async indexes
• Enhanced alter table
operations with ISCP job lifecycle management

+97/-9   
index_consumer.go
Implement ISCP integration with HNSW and SQL process refactoring

pkg/iscp/index_consumer.go

• Added new imports for sonic JSON, types, and vector index components

• Replaced SQL executor factory with direct engine and transaction
client usage
• Implemented separate execution paths for HNSW vs other
index types
• Added runHnsw function for HNSW-specific CDC processing
with model synchronization

+171/-55
iscp_util.go
Add ISCP utility functions for CDC task management             

pkg/sql/compile/iscp_util.go

• New utility file for ISCP (Index Sync Change Processing) integration

• Functions for CDC task management (create, delete,
register/unregister jobs)
• Index validation logic for async vector
indexes (HNSW, IVF, FullText)
• Transaction callback system for
deferred ISCP job registration

+321/-0 
ddl_index_algo.go
Implement async index support with ISCP integration           

pkg/sql/compile/ddl_index_algo.go

• Added async index support for FullText indexes with CDC task
creation
• Modified IVF index handling to support async mode with ISCP
job registration
• Updated HNSW index creation to handle both sync and
async modes
• Integrated transaction callback system for deferred
index operations

+119/-47
Miscellaneous
2 files
function_id.go
Remove HNSW CDC update function ID                                             

pkg/sql/plan/function/function_id.go

• Removed HNSW_CDC_UPDATE function ID constant
• Adjusted
FUNCTION_END_NUMBER accordingly

+1/-7     
function_id_test.go
Remove HNSW CDC update function from predefined function IDs

pkg/sql/plan/function/function_id_test.go

• Removed HNSW_CDC_UPDATE function ID entry
• Updated
FUNCTION_END_NUMBER from 351 to 350

+1/-3     
Error handling
1 files
util.go
Add error handling to fulltext index SQL generation           

pkg/sql/compile/util.go

• Added error return type to genInsertIndexTableSqlForFullTextIndex
function
• Updated function to return error alongside SQL strings

+2/-2     
Additional files
9 files
func_hnsw.go +0/-106 
func_hnsw_test.go +0/-191 
list_builtIn.go +0/-21   
fulltext_async.result +23/-0   
fulltext_async.sql +25/-0   
vector_hnsw_async.result +66/-0   
vector_hnsw_async.sql +96/-0   
vector_hnsw_f64_async.result +66/-0   
vector_ivf_async.sql +60/-0   

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/feature Review effort 4/5 size/XXL Denotes a PR that changes 2000+ lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants