From 9ef7b360c836ab2e3b5d334e2b61dd24e591f31e Mon Sep 17 00:00:00 2001 From: MaddyDev Date: Mon, 16 Jun 2025 15:43:32 -0700 Subject: [PATCH 1/5] add table creation to scale provider --- src/TriggerBinding/SqlScalerProvider.cs | 61 +++- src/TriggerBinding/SqlTriggerListener.cs | 250 +---------------- src/TriggerBinding/SqlTriggerUtils.cs | 261 ++++++++++++++++++ .../TriggerBinding/SqlTriggerListenerTests.cs | 99 +++++++ 4 files changed, 422 insertions(+), 249 deletions(-) create mode 100644 test/Unit/TriggerBinding/SqlTriggerListenerTests.cs diff --git a/src/TriggerBinding/SqlScalerProvider.cs b/src/TriggerBinding/SqlScalerProvider.cs index 3040bd3ba..9c1541e29 100644 --- a/src/TriggerBinding/SqlScalerProvider.cs +++ b/src/TriggerBinding/SqlScalerProvider.cs @@ -2,14 +2,24 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry; using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Logging; +using Microsoft.Data.SqlClient; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; +using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerUtils; +using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlBindingUtilities; +using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry; +using System.Diagnostics; + namespace Microsoft.Azure.WebJobs.Extensions.Sql { @@ -17,6 +27,8 @@ internal class SqlScalerProvider : IScaleMonitorProvider, ITargetScalerProvider { private readonly SqlTriggerScaleMonitor _scaleMonitor; private readonly SqlTriggerTargetScaler _targetScaler; + private readonly string _connectionString; + private readonly IDictionary _telemetryProps = new Dictionary(); /// /// Initializes a new instance of the class. @@ -32,7 +44,7 @@ public SqlScalerProvider(IServiceProvider serviceProvider, TriggerMetadata trigg SqlMetaData sqlMetadata = JsonConvert.DeserializeObject(triggerMetadata.Metadata.ToString()); sqlMetadata.ResolveProperties(serviceProvider.GetService()); var userTable = new SqlObject(sqlMetadata.TableName); - string connectionString = SqlBindingUtilities.GetConnectionString(sqlMetadata.ConnectionStringSetting, config); + this._connectionString = GetConnectionString(sqlMetadata.ConnectionStringSetting, config); IOptions options = serviceProvider.GetService>(); int configOptionsMaxChangesPerWorker = options.Value.MaxChangesPerWorker; int configAppSettingsMaxChangesPerWorker = config.GetValue(SqlTriggerConstants.ConfigKey_SqlTrigger_MaxChangesPerWorker); @@ -41,8 +53,8 @@ public SqlScalerProvider(IServiceProvider serviceProvider, TriggerMetadata trigg string userDefinedLeasesTableName = sqlMetadata.LeasesTableName; string userFunctionId = sqlMetadata.UserFunctionId; - this._scaleMonitor = new SqlTriggerScaleMonitor(userFunctionId, userTable, userDefinedLeasesTableName, connectionString, maxChangesPerWorker, logger); - this._targetScaler = new SqlTriggerTargetScaler(userFunctionId, userTable, userDefinedLeasesTableName, connectionString, maxChangesPerWorker, logger); + this._scaleMonitor = new SqlTriggerScaleMonitor(userFunctionId, userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger); + this._targetScaler = new SqlTriggerTargetScaler(userFunctionId, userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger); } public IScaleMonitor GetMonitor() @@ -54,6 +66,49 @@ public ITargetScaler GetTargetScaler() { return this._targetScaler; } + internal async Task CreateTriggerTables(SqlObject userTable, string oldUserFunctionId, string userFunctionId, int maxChangesPerWorker, bool hasConfiguredMaxChangesPerWorker, ILogger logger, CancellationToken cancellationToken = default) + { + using (var connection = new SqlConnection(this._connectionString)) + { + ServerProperties serverProperties = await GetServerTelemetryProperties(connection, logger, cancellationToken); + this._telemetryProps.AddConnectionProps(connection, serverProperties); + await VerifyDatabaseSupported(connection, logger, cancellationToken); + + int userTableId = await GetUserTableIdAsync(connection, userTable, logger, cancellationToken); + IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumnsAsync(connection, userTableId, logger, userTable.FullName, cancellationToken); + + string bracketedLeasesTableName = GetBracketedLeasesTableName(null, userFunctionId, userTableId); + this._telemetryProps[TelemetryPropertyName.LeasesTableName] = bracketedLeasesTableName; + + var transactionSw = Stopwatch.StartNew(); + long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createLeasesTableDurationMs = 0L; + using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead)) + { + createdSchemaDurationMs = await CreateSchemaAsync(connection, transaction, this._telemetryProps, logger, cancellationToken); + createGlobalStateTableDurationMs = await CreateGlobalStateTableAsync(connection, transaction, this._telemetryProps, logger, cancellationToken); + insertGlobalStateTableRowDurationMs = await InsertGlobalStateTableRowAsync(connection, transaction, userTableId, userTable, oldUserFunctionId, userFunctionId, logger, cancellationToken); + createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, oldUserFunctionId, userFunctionId, this._telemetryProps, logger, cancellationToken); + transaction.Commit(); + } + + var measures = new Dictionary + { + [TelemetryMeasureName.CreatedSchemaDurationMs] = createdSchemaDurationMs, + [TelemetryMeasureName.CreateGlobalStateTableDurationMs] = createGlobalStateTableDurationMs, + [TelemetryMeasureName.InsertGlobalStateTableRowDurationMs] = insertGlobalStateTableRowDurationMs, + [TelemetryMeasureName.CreateLeasesTableDurationMs] = createLeasesTableDurationMs, + [TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds, + [TelemetryMeasureName.MaxChangesPerWorker] = maxChangesPerWorker + }; + + TelemetryInstance.TrackEvent( + TelemetryEventName.StartListener, + new Dictionary(this._telemetryProps) { + { TelemetryPropertyName.HasConfiguredMaxChangesPerWorker, hasConfiguredMaxChangesPerWorker.ToString() } + }, + measures); + } + } internal class SqlMetaData { diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index 114ad67e2..b6c6579c4 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -135,10 +135,10 @@ public async Task StartAsync(CancellationToken cancellationToken) using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead)) { - createdSchemaDurationMs = await this.CreateSchemaAsync(connection, transaction, cancellationToken); - createGlobalStateTableDurationMs = await this.CreateGlobalStateTableAsync(connection, transaction, cancellationToken); - insertGlobalStateTableRowDurationMs = await this.InsertGlobalStateTableRowAsync(connection, transaction, userTableId, cancellationToken); - createLeasesTableDurationMs = await this.CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, cancellationToken); + createdSchemaDurationMs = await CreateSchemaAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken); + createGlobalStateTableDurationMs = await CreateGlobalStateTableAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken); + insertGlobalStateTableRowDurationMs = await InsertGlobalStateTableRowAsync(connection, transaction, userTableId, this._userTable, this._oldUserFunctionId, this._userFunctionId, this._logger, cancellationToken); + createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, this._oldUserFunctionId, this._userFunctionId, this._telemetryProps, this._logger, cancellationToken); transaction.Commit(); } @@ -265,248 +265,6 @@ FROM sys.columns AS c } } - /// - /// Creates the schema for global state table and leases tables, if it does not already exist. - /// - /// The already-opened connection to use for executing the command - /// The transaction wrapping this command - /// Cancellation token to pass to the command - /// The time taken in ms to execute the command - private async Task CreateSchemaAsync(SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken) - { - string createSchemaQuery = $@" - {AppLockStatements} - - IF SCHEMA_ID(N'{SchemaName}') IS NULL - EXEC ('CREATE SCHEMA {SchemaName}'); - "; - - using (var createSchemaCommand = new SqlCommand(createSchemaQuery, connection, transaction)) - { - var stopwatch = Stopwatch.StartNew(); - - try - { - await createSchemaCommand.ExecuteNonQueryAsyncWithLogging(this._logger, cancellationToken); - } - catch (Exception ex) - { - TelemetryInstance.TrackException(TelemetryErrorName.CreateSchema, ex, this._telemetryProps); - var sqlEx = ex as SqlException; - if (sqlEx?.Number == ObjectAlreadyExistsErrorNumber) - { - // This generally shouldn't happen since we check for its existence in the statement but occasionally - // a race condition can make it so that multiple instances will try and create the schema at once. - // In that case we can just ignore the error since all we care about is that the schema exists at all. - this._logger.LogWarning($"Failed to create schema '{SchemaName}'. Exception message: {ex.Message} This is informational only, function startup will continue as normal."); - } - else - { - throw; - } - } - - return stopwatch.ElapsedMilliseconds; - } - } - - /// - /// Creates the global state table if it does not already exist. - /// - /// The already-opened connection to use for executing the command - /// The transaction wrapping this command - /// Cancellation token to pass to the command - /// The time taken in ms to execute the command - private async Task CreateGlobalStateTableAsync(SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken) - { - string createGlobalStateTableQuery = $@" - {AppLockStatements} - - IF OBJECT_ID(N'{GlobalStateTableName}', 'U') IS NULL - CREATE TABLE {GlobalStateTableName} ( - UserFunctionID char(16) NOT NULL, - UserTableID int NOT NULL, - LastSyncVersion bigint NOT NULL, - LastAccessTime Datetime NOT NULL DEFAULT GETUTCDATE(), - PRIMARY KEY (UserFunctionID, UserTableID) - ); - ELSE IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'LastAccessTime' - AND Object_ID = Object_ID(N'{GlobalStateTableName}')) - ALTER TABLE {GlobalStateTableName} ADD LastAccessTime Datetime NOT NULL DEFAULT GETUTCDATE(); - "; - - using (var createGlobalStateTableCommand = new SqlCommand(createGlobalStateTableQuery, connection, transaction)) - { - var stopwatch = Stopwatch.StartNew(); - try - { - await createGlobalStateTableCommand.ExecuteNonQueryAsyncWithLogging(this._logger, cancellationToken); - } - catch (Exception ex) - { - TelemetryInstance.TrackException(TelemetryErrorName.CreateGlobalStateTable, ex, this._telemetryProps); - var sqlEx = ex as SqlException; - if (sqlEx?.Number == ObjectAlreadyExistsErrorNumber) - { - // This generally shouldn't happen since we check for its existence in the statement but occasionally - // a race condition can make it so that multiple instances will try and create the schema at once. - // In that case we can just ignore the error since all we care about is that the schema exists at all. - this._logger.LogWarning($"Failed to create global state table '{GlobalStateTableName}'. Exception message: {ex.Message} This is informational only, function startup will continue as normal."); - } - else - { - throw; - } - } - return stopwatch.ElapsedMilliseconds; - } - } - - /// - /// Inserts row for the 'user function and table' inside the global state table, if one does not already exist. - /// - /// The already-opened connection to use for executing the command - /// The transaction wrapping this command - /// The ID of the table being watched - /// Cancellation token to pass to the command - /// The time taken in ms to execute the command - private async Task InsertGlobalStateTableRowAsync(SqlConnection connection, SqlTransaction transaction, int userTableId, CancellationToken cancellationToken) - { - object minValidVersion; - - string getMinValidVersionQuery = $"SELECT CHANGE_TRACKING_MIN_VALID_VERSION({userTableId});"; - - using (var getMinValidVersionCommand = new SqlCommand(getMinValidVersionQuery, connection, transaction)) - using (SqlDataReader reader = getMinValidVersionCommand.ExecuteReaderWithLogging(this._logger)) - { - if (!await reader.ReadAsync(cancellationToken)) - { - throw new InvalidOperationException($"Received empty response when querying the 'change tracking min valid version' for table: '{this._userTable.FullName}'."); - } - - minValidVersion = reader.GetValue(0); - - if (minValidVersion is DBNull) - { - throw new InvalidOperationException($"Could not find change tracking enabled for table: '{this._userTable.FullName}'."); - } - } - - string insertRowGlobalStateTableQuery = $@" - {AppLockStatements} - -- For back compatibility copy the lastSyncVersion from _oldUserFunctionId if it exists. - IF NOT EXISTS ( - SELECT * FROM {GlobalStateTableName} - WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {userTableId} - ) - BEGIN - -- Migrate LastSyncVersion from oldUserFunctionId if it exists and delete the record - DECLARE @lastSyncVersion bigint; - SELECT @lastSyncVersion = LastSyncVersion from az_func.GlobalState where UserFunctionID = '{this._oldUserFunctionId}' AND UserTableID = {userTableId} - IF @lastSyncVersion IS NULL - SET @lastSyncVersion = {(long)minValidVersion}; - ELSE - DELETE FROM az_func.GlobalState WHERE UserFunctionID = '{this._oldUserFunctionId}' AND UserTableID = {userTableId} - - INSERT INTO {GlobalStateTableName} - VALUES ('{this._userFunctionId}', {userTableId}, @lastSyncVersion, GETUTCDATE()); - END - "; - - using (var insertRowGlobalStateTableCommand = new SqlCommand(insertRowGlobalStateTableQuery, connection, transaction)) - { - var stopwatch = Stopwatch.StartNew(); - int rowsInserted = await insertRowGlobalStateTableCommand.ExecuteNonQueryAsyncWithLogging(this._logger, cancellationToken); - if (rowsInserted > 0) - { - TelemetryInstance.TrackEvent(TelemetryEventName.InsertGlobalStateTableRow); - } - return stopwatch.ElapsedMilliseconds; - } - } - - /// - /// Creates the leases table for the 'user function and table', if one does not already exist. - /// - /// The already-opened connection to use for executing the command - /// The transaction wrapping this command - /// The name of the leases table to create - /// The primary keys of the user table this leases table is for - /// Cancellation token to pass to the command - /// The time taken in ms to execute the command - private async Task CreateLeasesTableAsync( - SqlConnection connection, - SqlTransaction transaction, - string leasesTableName, - IReadOnlyList<(string name, string type)> primaryKeyColumns, - CancellationToken cancellationToken) - { - string primaryKeysWithTypes = string.Join(", ", primaryKeyColumns.Select(col => $"{col.name.AsBracketQuotedString()} {col.type}")); - string primaryKeys = string.Join(", ", primaryKeyColumns.Select(col => col.name.AsBracketQuotedString())); - string oldLeasesTableName = leasesTableName.Contains(this._userFunctionId) ? leasesTableName.Replace(this._userFunctionId, this._oldUserFunctionId) : string.Empty; - - string createLeasesTableQuery = string.IsNullOrEmpty(oldLeasesTableName) ? $@" - {AppLockStatements} - - IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL - CREATE TABLE {leasesTableName} ( - {primaryKeysWithTypes}, - {LeasesTableChangeVersionColumnName} bigint NOT NULL, - {LeasesTableAttemptCountColumnName} int NOT NULL, - {LeasesTableLeaseExpirationTimeColumnName} datetime2, - PRIMARY KEY ({primaryKeys}) - ); - " : $@" - {AppLockStatements} - - IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL - BEGIN - CREATE TABLE {leasesTableName} ( - {primaryKeysWithTypes}, - {LeasesTableChangeVersionColumnName} bigint NOT NULL, - {LeasesTableAttemptCountColumnName} int NOT NULL, - {LeasesTableLeaseExpirationTimeColumnName} datetime2, - PRIMARY KEY ({primaryKeys}) - ); - - -- Migrate all data from OldLeasesTable and delete it. - IF OBJECT_ID(N'{oldLeasesTableName}', 'U') IS NOT NULL - BEGIN - INSERT INTO {leasesTableName} - SELECT * FROM {oldLeasesTableName}; - - DROP TABLE {oldLeasesTableName}; - END - End - "; - - using (var createLeasesTableCommand = new SqlCommand(createLeasesTableQuery, connection, transaction)) - { - var stopwatch = Stopwatch.StartNew(); - try - { - await createLeasesTableCommand.ExecuteNonQueryAsyncWithLogging(this._logger, cancellationToken); - } - catch (Exception ex) - { - TelemetryInstance.TrackException(TelemetryErrorName.CreateLeasesTable, ex, this._telemetryProps); - var sqlEx = ex as SqlException; - if (sqlEx?.Number == ObjectAlreadyExistsErrorNumber) - { - // This generally shouldn't happen since we check for its existence in the statement but occasionally - // a race condition can make it so that multiple instances will try and create the schema at once. - // In that case we can just ignore the error since all we care about is that the schema exists at all. - this._logger.LogWarning($"Failed to create leases table '{leasesTableName}'. Exception message: {ex.Message} This is informational only, function startup will continue as normal."); - } - else - { - throw; - } - } - long durationMs = stopwatch.ElapsedMilliseconds; - return durationMs; - } - } public IScaleMonitor GetMonitor() { return this._scaleMonitor; diff --git a/src/TriggerBinding/SqlTriggerUtils.cs b/src/TriggerBinding/SqlTriggerUtils.cs index 98637bd44..693d2c60b 100644 --- a/src/TriggerBinding/SqlTriggerUtils.cs +++ b/src/TriggerBinding/SqlTriggerUtils.cs @@ -3,13 +3,16 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Globalization; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerConstants; +using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry; namespace Microsoft.Azure.WebJobs.Extensions.Sql { @@ -126,5 +129,263 @@ internal static string GetBracketedLeasesTableName(string userDefinedLeasesTable return string.IsNullOrEmpty(userDefinedLeasesTableName) ? string.Format(CultureInfo.InvariantCulture, LeasesTableNameFormat, $"{userFunctionId}_{userTableId}") : string.Format(CultureInfo.InvariantCulture, UserDefinedLeasesTableNameFormat, $"{userDefinedLeasesTableName.AsBracketQuotedString()}"); } + + /// + /// Creates the schema for global state table and leases tables, if it does not already exist. + /// + /// The already-opened connection to use for executing the command + /// The transaction wrapping this command + /// The property bag for telemetry + /// Facilitates logging of messages + /// Cancellation token to pass to the command + /// The time taken in ms to execute the command + internal static async Task CreateSchemaAsync(SqlConnection connection, SqlTransaction transaction, IDictionary telemetryProps, ILogger logger, CancellationToken cancellationToken) + { + string createSchemaQuery = $@" + {AppLockStatements} + + IF SCHEMA_ID(N'{SchemaName}') IS NULL + EXEC ('CREATE SCHEMA {SchemaName}'); + "; + + using (var createSchemaCommand = new SqlCommand(createSchemaQuery, connection, transaction)) + { + var stopwatch = Stopwatch.StartNew(); + + try + { + await createSchemaCommand.ExecuteNonQueryAsyncWithLogging(logger, cancellationToken); + } + catch (Exception ex) + { + TelemetryInstance.TrackException(TelemetryErrorName.CreateSchema, ex, telemetryProps); + var sqlEx = ex as SqlException; + if (sqlEx?.Number == ObjectAlreadyExistsErrorNumber) + { + // This generally shouldn't happen since we check for its existence in the statement but occasionally + // a race condition can make it so that multiple instances will try and create the schema at once. + // In that case we can just ignore the error since all we care about is that the schema exists at all. + logger.LogWarning($"Failed to create schema '{SchemaName}'. Exception message: {ex.Message} This is informational only, function startup will continue as normal."); + } + else + { + throw; + } + } + + return stopwatch.ElapsedMilliseconds; + } + } + + /// + /// Creates the global state table if it does not already exist. + /// + /// The already-opened connection to use for executing the command + /// The transaction wrapping this command + /// The property bag for telemetry + /// Facilitates logging of messages + /// Cancellation token to pass to the command + /// The time taken in ms to execute the command + internal static async Task CreateGlobalStateTableAsync(SqlConnection connection, SqlTransaction transaction, IDictionary telemetryProps, ILogger logger, CancellationToken cancellationToken) + { + string createGlobalStateTableQuery = $@" + {AppLockStatements} + + IF OBJECT_ID(N'{GlobalStateTableName}', 'U') IS NULL + CREATE TABLE {GlobalStateTableName} ( + UserFunctionID char(16) NOT NULL, + UserTableID int NOT NULL, + LastSyncVersion bigint NOT NULL, + LastAccessTime Datetime NOT NULL DEFAULT GETUTCDATE(), + PRIMARY KEY (UserFunctionID, UserTableID) + ); + ELSE IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'LastAccessTime' + AND Object_ID = Object_ID(N'{GlobalStateTableName}')) + ALTER TABLE {GlobalStateTableName} ADD LastAccessTime Datetime NOT NULL DEFAULT GETUTCDATE(); + "; + + using (var createGlobalStateTableCommand = new SqlCommand(createGlobalStateTableQuery, connection, transaction)) + { + var stopwatch = Stopwatch.StartNew(); + try + { + await createGlobalStateTableCommand.ExecuteNonQueryAsyncWithLogging(logger, cancellationToken); + } + catch (Exception ex) + { + TelemetryInstance.TrackException(TelemetryErrorName.CreateGlobalStateTable, ex, telemetryProps); + var sqlEx = ex as SqlException; + if (sqlEx?.Number == ObjectAlreadyExistsErrorNumber) + { + // This generally shouldn't happen since we check for its existence in the statement but occasionally + // a race condition can make it so that multiple instances will try and create the schema at once. + // In that case we can just ignore the error since all we care about is that the schema exists at all. + logger.LogWarning($"Failed to create global state table '{GlobalStateTableName}'. Exception message: {ex.Message} This is informational only, function startup will continue as normal."); + } + else + { + throw; + } + } + return stopwatch.ElapsedMilliseconds; + } + } + + /// + /// Inserts row for the 'user function and table' inside the global state table, if one does not already exist. + /// + /// The already-opened connection to use for executing the command + /// The transaction wrapping this command + /// The ID of the table being watched + /// The User table being watched for Trigger function + /// deprecated user function id value created using hostId for the user function + /// Unique identifier for the user function + /// Facilitates logging of messages + /// Cancellation token to pass to the command + /// The time taken in ms to execute the command + internal static async Task InsertGlobalStateTableRowAsync(SqlConnection connection, SqlTransaction transaction, int userTableId, SqlObject userTable, string oldUserFunctionId, string userFunctionId, ILogger logger, CancellationToken cancellationToken) + { + object minValidVersion; + string getMinValidVersionQuery = $"SELECT CHANGE_TRACKING_MIN_VALID_VERSION({userTableId});"; + + using (var getMinValidVersionCommand = new SqlCommand(getMinValidVersionQuery, connection, transaction)) + using (SqlDataReader reader = getMinValidVersionCommand.ExecuteReaderWithLogging(logger)) + { + if (!await reader.ReadAsync(cancellationToken)) + { + throw new InvalidOperationException($"Received empty response when querying the 'change tracking min valid version' for table: '{userTable.FullName}'."); + } + + minValidVersion = reader.GetValue(0); + + if (minValidVersion is DBNull) + { + throw new InvalidOperationException($"Could not find change tracking enabled for table: '{userTable.FullName}'."); + } + } + + string insertRowGlobalStateTableQuery = $@" + {AppLockStatements} + -- For back compatibility copy the lastSyncVersion from _oldUserFunctionId if it exists. + IF NOT EXISTS ( + SELECT * FROM {GlobalStateTableName} + WHERE UserFunctionID = '{userFunctionId}' AND UserTableID = {userTableId} + ) + BEGIN + -- Migrate LastSyncVersion from oldUserFunctionId if it exists and delete the record + DECLARE @lastSyncVersion bigint; + SELECT @lastSyncVersion = LastSyncVersion from az_func.GlobalState where UserFunctionID = '{oldUserFunctionId}' AND UserTableID = {userTableId} + IF @lastSyncVersion IS NULL + SET @lastSyncVersion = {(long)minValidVersion}; + ELSE + DELETE FROM az_func.GlobalState WHERE UserFunctionID = '{oldUserFunctionId}' AND UserTableID = {userTableId} + + INSERT INTO {GlobalStateTableName} + VALUES ('{userFunctionId}', {userTableId}, @lastSyncVersion, GETUTCDATE()); + END + "; + + using (var insertRowGlobalStateTableCommand = new SqlCommand(insertRowGlobalStateTableQuery, connection, transaction)) + { + var stopwatch = Stopwatch.StartNew(); + int rowsInserted = await insertRowGlobalStateTableCommand.ExecuteNonQueryAsyncWithLogging(logger, cancellationToken); + if (rowsInserted > 0) + { + TelemetryInstance.TrackEvent(TelemetryEventName.InsertGlobalStateTableRow); + } + return stopwatch.ElapsedMilliseconds; + } + } + + /// + /// Creates the leases table for the 'user function and table', if one does not already exist. + /// + /// The already-opened connection to use for executing the command + /// The transaction wrapping this command + /// The name of the leases table to create + /// The primary keys of the user table this leases table is for + /// deprecated user function id value created using hostId for the user function + /// Unique identifier for the user function + /// + /// Facilitates logging of messages + /// Cancellation token to pass to the command + /// The time taken in ms to execute the command + internal static async Task CreateLeasesTableAsync( + SqlConnection connection, + SqlTransaction transaction, + string leasesTableName, + IReadOnlyList<(string name, string type)> primaryKeyColumns, + string oldUserFunctionId, + string userFunctionId, + IDictionary telemetryProps, + ILogger logger, + CancellationToken cancellationToken) + { + string primaryKeysWithTypes = string.Join(", ", primaryKeyColumns.Select(col => $"{col.name.AsBracketQuotedString()} {col.type}")); + string primaryKeys = string.Join(", ", primaryKeyColumns.Select(col => col.name.AsBracketQuotedString())); + string oldLeasesTableName = leasesTableName.Contains(userFunctionId) ? leasesTableName.Replace(userFunctionId, oldUserFunctionId) : string.Empty; + + string createLeasesTableQuery = string.IsNullOrEmpty(oldLeasesTableName) ? $@" + {AppLockStatements} + + IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL + CREATE TABLE {leasesTableName} ( + {primaryKeysWithTypes}, + {LeasesTableChangeVersionColumnName} bigint NOT NULL, + {LeasesTableAttemptCountColumnName} int NOT NULL, + {LeasesTableLeaseExpirationTimeColumnName} datetime2, + PRIMARY KEY ({primaryKeys}) + ); + " : $@" + {AppLockStatements} + + IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL + BEGIN + CREATE TABLE {leasesTableName} ( + {primaryKeysWithTypes}, + {LeasesTableChangeVersionColumnName} bigint NOT NULL, + {LeasesTableAttemptCountColumnName} int NOT NULL, + {LeasesTableLeaseExpirationTimeColumnName} datetime2, + PRIMARY KEY ({primaryKeys}) + ); + + -- Migrate all data from OldLeasesTable and delete it. + IF OBJECT_ID(N'{oldLeasesTableName}', 'U') IS NOT NULL + BEGIN + INSERT INTO {leasesTableName} + SELECT * FROM {oldLeasesTableName}; + + DROP TABLE {oldLeasesTableName}; + END + End + "; + + using (var createLeasesTableCommand = new SqlCommand(createLeasesTableQuery, connection, transaction)) + { + var stopwatch = Stopwatch.StartNew(); + try + { + await createLeasesTableCommand.ExecuteNonQueryAsyncWithLogging(logger, cancellationToken); + } + catch (Exception ex) + { + TelemetryInstance.TrackException(TelemetryErrorName.CreateLeasesTable, ex, telemetryProps); + var sqlEx = ex as SqlException; + if (sqlEx?.Number == ObjectAlreadyExistsErrorNumber) + { + // This generally shouldn't happen since we check for its existence in the statement but occasionally + // a race condition can make it so that multiple instances will try and create the schema at once. + // In that case we can just ignore the error since all we care about is that the schema exists at all. + logger.LogWarning($"Failed to create leases table '{leasesTableName}'. Exception message: {ex.Message} This is informational only, function startup will continue as normal."); + } + else + { + throw; + } + } + long durationMs = stopwatch.ElapsedMilliseconds; + return durationMs; + } + } } } \ No newline at end of file diff --git a/test/Unit/TriggerBinding/SqlTriggerListenerTests.cs b/test/Unit/TriggerBinding/SqlTriggerListenerTests.cs new file mode 100644 index 000000000..0b81aacbf --- /dev/null +++ b/test/Unit/TriggerBinding/SqlTriggerListenerTests.cs @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Data; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Executors; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Unit +{ + public class SqlTriggerListenerTests + { + [Fact] + public async Task StartAsync_ThrowsIfAlreadyStarted() + { + SqlTriggerListener listener = CreateListener(); + // Simulate already started + typeof(SqlTriggerListener) + .GetField("_listenerState", BindingFlags.NonPublic | BindingFlags.Instance) + .SetValue(listener, 2); + + await Assert.ThrowsAsync(() => listener.StartAsync(CancellationToken.None)); + } + + [Fact] + public void GetUserTableColumns_ThrowsOnUserDefinedType() + { + var mockLogger = new Mock(); + SqlTriggerListener listener = CreateListener(logger: mockLogger.Object); + + var sqlConnection = new SqlConnection(); + var mockReader = new Mock(); + mockReader.SetupSequence(r => r.Read()) + .Returns(true) + .Returns(false); + mockReader.Setup(r => r.GetString(0)).Returns("MyColumn"); + mockReader.Setup(r => r.GetString(1)).Returns("MyType"); + mockReader.Setup(r => r.GetBoolean(2)).Returns(true); + + var mockCommand = new Mock(); + mockCommand.Setup(c => c.ExecuteReader()).Returns(mockReader.Object); + + // Use reflection to call the private method + MethodInfo method = typeof(SqlTriggerListener).GetMethod("GetUserTableColumns", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + + Assert.Throws(() => + method.Invoke(listener, new object[] { sqlConnection, 1, CancellationToken.None }) + ); + } + + [Fact] + public void GetUserTableColumns_ThrowsOnReservedColumnName() + { + var mockLogger = new Mock(); + SqlTriggerListener listener = CreateListener(logger: mockLogger.Object); + + var sqlConnection = new SqlConnection(); + var mockReader = new Mock(); + mockReader.SetupSequence(r => r.Read()) + .Returns(true) + .Returns(false); + mockReader.Setup(r => r.GetString(0)).Returns("SYS_CHANGE_VERSION"); // Reserved name + mockReader.Setup(r => r.GetString(1)).Returns("int"); + mockReader.Setup(r => r.GetBoolean(2)).Returns(false); + + var mockCommand = new Mock(); + mockCommand.Setup(c => c.ExecuteReader()).Returns(mockReader.Object); + + // Use reflection to call the private method + MethodInfo method = typeof(SqlTriggerListener).GetMethod("GetUserTableColumns", BindingFlags.NonPublic | BindingFlags.Instance); + + Assert.Throws(() => + method.Invoke(listener, new object[] { sqlConnection, 1, CancellationToken.None }) + ); + } + + private static SqlTriggerListener CreateListener(ILogger logger = null) + { + return new SqlTriggerListener( + "Server=.;Database=Test;Trusted_Connection=True;", + "TestTable", + null, + "funcId", + "oldFuncId", + Mock.Of(), + new SqlOptions(), + logger ?? Mock.Of(), + new ConfigurationBuilder().SetBasePath(AppContext.BaseDirectory).Build() + ); + } + } +} \ No newline at end of file From 74b6b3be5075d6daa80a52ff9b2d9dbcc18962aa Mon Sep 17 00:00:00 2001 From: MaddyDev Date: Tue, 17 Jun 2025 12:33:44 -0700 Subject: [PATCH 2/5] call the CreateAsync --- src/SqlBindingExtension.cs | 21 ++++++---- src/Telemetry/Telemetry.cs | 3 +- src/TriggerBinding/SqlScalerProvider.cs | 51 ++++++++++++++++--------- 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/src/SqlBindingExtension.cs b/src/SqlBindingExtension.cs index e7648ffac..a59d33eb2 100644 --- a/src/SqlBindingExtension.cs +++ b/src/SqlBindingExtension.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Azure.WebJobs.Host.Scale; +using System.Threading.Tasks; namespace Microsoft.Azure.WebJobs.Extensions.Sql { @@ -33,17 +34,23 @@ public static IWebJobsBuilder AddSql(this IWebJobsBuilder builder, Action(() => new SqlScalerProvider(serviceProvider, triggerMetadata)); - builder.Services.AddSingleton((Func)delegate (IServiceProvider resolvedServiceProvider) + var scalerProviderTask = new Lazy>(() => + SqlScalerProvider.CreateAsync(serviceProvider, triggerMetadata)); + + builder.Services.AddSingleton((Func)(resolvedServiceProvider => { serviceProvider = serviceProvider ?? resolvedServiceProvider; - return scalerProvider.Value; - }); - builder.Services.AddSingleton((Func)delegate (IServiceProvider resolvedServiceProvider) + // Wait for the async initialization to complete + return scalerProviderTask.Value.GetAwaiter().GetResult(); + })); + + builder.Services.AddSingleton((Func)(resolvedServiceProvider => { serviceProvider = serviceProvider ?? resolvedServiceProvider; - return scalerProvider.Value; - }); + // Wait for the async initialization to complete + return scalerProviderTask.Value.GetAwaiter().GetResult(); + })); + return builder; } } diff --git a/src/Telemetry/Telemetry.cs b/src/Telemetry/Telemetry.cs index 2150e8812..9dfda8cf8 100644 --- a/src/Telemetry/Telemetry.cs +++ b/src/Telemetry/Telemetry.cs @@ -350,7 +350,8 @@ public enum TelemetryEventName TriggerMonitorStart, Upsert, InsertGlobalStateTableRow, - BuildRenewLeasesWithEmptyMatchCondtion + BuildRenewLeasesWithEmptyMatchCondtion, + InitializeScaleProvider } /// diff --git a/src/TriggerBinding/SqlScalerProvider.cs b/src/TriggerBinding/SqlScalerProvider.cs index 9c1541e29..645161404 100644 --- a/src/TriggerBinding/SqlScalerProvider.cs +++ b/src/TriggerBinding/SqlScalerProvider.cs @@ -29,6 +29,11 @@ internal class SqlScalerProvider : IScaleMonitorProvider, ITargetScalerProvider private readonly SqlTriggerTargetScaler _targetScaler; private readonly string _connectionString; private readonly IDictionary _telemetryProps = new Dictionary(); + private readonly SqlObject _userTable; + private readonly string _userFunctionId; + private readonly int _maxChangesPerWorker; + private readonly ILogger _logger; + private readonly bool _hasConfiguredMaxChangesPerWorker; /// /// Initializes a new instance of the class. @@ -43,18 +48,21 @@ public SqlScalerProvider(IServiceProvider serviceProvider, TriggerMetadata trigg ILogger logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Sql")); SqlMetaData sqlMetadata = JsonConvert.DeserializeObject(triggerMetadata.Metadata.ToString()); sqlMetadata.ResolveProperties(serviceProvider.GetService()); - var userTable = new SqlObject(sqlMetadata.TableName); + this._userTable = new SqlObject(sqlMetadata.TableName); this._connectionString = GetConnectionString(sqlMetadata.ConnectionStringSetting, config); IOptions options = serviceProvider.GetService>(); int configOptionsMaxChangesPerWorker = options.Value.MaxChangesPerWorker; + this._hasConfiguredMaxChangesPerWorker = configOptionsMaxChangesPerWorker != 0; int configAppSettingsMaxChangesPerWorker = config.GetValue(SqlTriggerConstants.ConfigKey_SqlTrigger_MaxChangesPerWorker); // Override the maxChangesPerWorker value from config if the value is set in the trigger appsettings int maxChangesPerWorker = configAppSettingsMaxChangesPerWorker != 0 ? configAppSettingsMaxChangesPerWorker : configOptionsMaxChangesPerWorker != 0 ? configOptionsMaxChangesPerWorker : SqlOptions.DefaultMaxChangesPerWorker; + this._maxChangesPerWorker = maxChangesPerWorker; + this._logger = logger; string userDefinedLeasesTableName = sqlMetadata.LeasesTableName; - string userFunctionId = sqlMetadata.UserFunctionId; + this._userFunctionId = sqlMetadata.UserFunctionId; - this._scaleMonitor = new SqlTriggerScaleMonitor(userFunctionId, userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger); - this._targetScaler = new SqlTriggerTargetScaler(userFunctionId, userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger); + this._scaleMonitor = new SqlTriggerScaleMonitor(this._userFunctionId, this._userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger); + this._targetScaler = new SqlTriggerTargetScaler(this._userFunctionId, this._userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger); } public IScaleMonitor GetMonitor() @@ -66,28 +74,37 @@ public ITargetScaler GetTargetScaler() { return this._targetScaler; } - internal async Task CreateTriggerTables(SqlObject userTable, string oldUserFunctionId, string userFunctionId, int maxChangesPerWorker, bool hasConfiguredMaxChangesPerWorker, ILogger logger, CancellationToken cancellationToken = default) + + public static async Task CreateAsync(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata) + { + var provider = new SqlScalerProvider(serviceProvider, triggerMetadata); + await provider.InitializeAsync(); + return provider; + } + + + public async Task InitializeAsync(CancellationToken cancellationToken = default) { using (var connection = new SqlConnection(this._connectionString)) { - ServerProperties serverProperties = await GetServerTelemetryProperties(connection, logger, cancellationToken); + ServerProperties serverProperties = await GetServerTelemetryProperties(connection, this._logger, cancellationToken); this._telemetryProps.AddConnectionProps(connection, serverProperties); - await VerifyDatabaseSupported(connection, logger, cancellationToken); + await VerifyDatabaseSupported(connection, this._logger, cancellationToken); - int userTableId = await GetUserTableIdAsync(connection, userTable, logger, cancellationToken); - IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumnsAsync(connection, userTableId, logger, userTable.FullName, cancellationToken); + int userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, cancellationToken); + IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, cancellationToken); - string bracketedLeasesTableName = GetBracketedLeasesTableName(null, userFunctionId, userTableId); + string bracketedLeasesTableName = GetBracketedLeasesTableName(null, this._userFunctionId, userTableId); this._telemetryProps[TelemetryPropertyName.LeasesTableName] = bracketedLeasesTableName; var transactionSw = Stopwatch.StartNew(); long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createLeasesTableDurationMs = 0L; using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead)) { - createdSchemaDurationMs = await CreateSchemaAsync(connection, transaction, this._telemetryProps, logger, cancellationToken); - createGlobalStateTableDurationMs = await CreateGlobalStateTableAsync(connection, transaction, this._telemetryProps, logger, cancellationToken); - insertGlobalStateTableRowDurationMs = await InsertGlobalStateTableRowAsync(connection, transaction, userTableId, userTable, oldUserFunctionId, userFunctionId, logger, cancellationToken); - createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, oldUserFunctionId, userFunctionId, this._telemetryProps, logger, cancellationToken); + createdSchemaDurationMs = await CreateSchemaAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken); + createGlobalStateTableDurationMs = await CreateGlobalStateTableAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken); + insertGlobalStateTableRowDurationMs = await InsertGlobalStateTableRowAsync(connection, transaction, userTableId, this._userTable, null, this._userFunctionId, this._logger, cancellationToken); + createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, null, this._userFunctionId, this._telemetryProps, this._logger, cancellationToken); transaction.Commit(); } @@ -98,13 +115,13 @@ internal async Task CreateTriggerTables(SqlObject userTable, string oldUserFunct [TelemetryMeasureName.InsertGlobalStateTableRowDurationMs] = insertGlobalStateTableRowDurationMs, [TelemetryMeasureName.CreateLeasesTableDurationMs] = createLeasesTableDurationMs, [TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds, - [TelemetryMeasureName.MaxChangesPerWorker] = maxChangesPerWorker + [TelemetryMeasureName.MaxChangesPerWorker] = this._maxChangesPerWorker }; TelemetryInstance.TrackEvent( - TelemetryEventName.StartListener, + TelemetryEventName.InitializeScaleProvider, new Dictionary(this._telemetryProps) { - { TelemetryPropertyName.HasConfiguredMaxChangesPerWorker, hasConfiguredMaxChangesPerWorker.ToString() } + { TelemetryPropertyName.HasConfiguredMaxChangesPerWorker, this._hasConfiguredMaxChangesPerWorker.ToString() } }, measures); } From d1c2add4adc8813cbdbcb5edf4494b5bdc80561b Mon Sep 17 00:00:00 2001 From: MaddyDev Date: Tue, 17 Jun 2025 13:35:30 -0700 Subject: [PATCH 3/5] open connection --- src/TriggerBinding/SqlScalerProvider.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/TriggerBinding/SqlScalerProvider.cs b/src/TriggerBinding/SqlScalerProvider.cs index 645161404..b5480dc24 100644 --- a/src/TriggerBinding/SqlScalerProvider.cs +++ b/src/TriggerBinding/SqlScalerProvider.cs @@ -87,6 +87,7 @@ public async Task InitializeAsync(CancellationToken cancellationToken = default) { using (var connection = new SqlConnection(this._connectionString)) { + await connection.OpenAsyncWithSqlErrorHandling(cancellationToken); ServerProperties serverProperties = await GetServerTelemetryProperties(connection, this._logger, cancellationToken); this._telemetryProps.AddConnectionProps(connection, serverProperties); await VerifyDatabaseSupported(connection, this._logger, cancellationToken); From 3255e899c2509a2d326c511ce89a2933f24c3aad Mon Sep 17 00:00:00 2001 From: MaddyDev Date: Tue, 17 Jun 2025 15:11:10 -0700 Subject: [PATCH 4/5] remove listener start --- test/Integration/SqlTriggerBindingIntegrationTests.cs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index df0350456..bca0939f1 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -609,13 +609,6 @@ public async Task ScaleHostEndToEndTest() this.SetChangeTrackingForTable("Products"); - // Initializing the listener is needed to create relevant lease table to get unprocessed changes. - // We would be using the scale host methods to get the scale status so the configuration values are not needed here. - var listener = new SqlTriggerListener(this.DbConnectionString, "dbo.Products", "", "testFunctionId", "testOldFunctionId", Mock.Of(), Mock.Of(), Mock.Of(), configuration); - await listener.StartAsync(CancellationToken.None); - // Cancel immediately so the listener doesn't start processing the changes - await listener.StopAsync(CancellationToken.None); - IHost host = new HostBuilder().ConfigureServices(services => services.AddAzureClientsCore()).Build(); AzureComponentFactory defaultAzureComponentFactory = host.Services.GetService(); From c926fb97513acc9fa242fb0bd615a8e5221c7cc7 Mon Sep 17 00:00:00 2001 From: MaddyDev Date: Wed, 18 Jun 2025 11:04:53 -0700 Subject: [PATCH 5/5] fix merge errors --- src/TriggerBinding/SqlScalerProvider.cs | 2 +- src/TriggerBinding/SqlTriggerListener.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/TriggerBinding/SqlScalerProvider.cs b/src/TriggerBinding/SqlScalerProvider.cs index b5480dc24..e045ba659 100644 --- a/src/TriggerBinding/SqlScalerProvider.cs +++ b/src/TriggerBinding/SqlScalerProvider.cs @@ -93,7 +93,7 @@ public async Task InitializeAsync(CancellationToken cancellationToken = default) await VerifyDatabaseSupported(connection, this._logger, cancellationToken); int userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, cancellationToken); - IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, cancellationToken); + IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumns(connection, userTableId, this._logger, this._userTable.FullName, cancellationToken); string bracketedLeasesTableName = GetBracketedLeasesTableName(null, this._userFunctionId, userTableId); this._telemetryProps[TelemetryPropertyName.LeasesTableName] = bracketedLeasesTableName; diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index aae45c51b..5aeaa49d3 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -145,8 +145,8 @@ public async Task StartAsync(CancellationToken cancellationToken) { createdSchemaDurationMs = await CreateSchemaAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken); createGlobalStateTableDurationMs = await CreateGlobalStateTableAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken); - insertGlobalStateTableRowDurationMs = await InsertGlobalStateTableRowAsync(connection, transaction, userTableId, this._userTable, this._oldUserFunctionId, this._userFunctionId, this._logger, cancellationToken); - createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, this._oldUserFunctionId, this._userFunctionId, this._telemetryProps, this._logger, cancellationToken); + insertGlobalStateTableRowDurationMs = await InsertGlobalStateTableRowAsync(connection, transaction, userTableId, this._userTable, this._hostIdFunctionId, this._userFunctionId, this._logger, cancellationToken); + createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, this._hostIdFunctionId, this._userFunctionId, this._telemetryProps, this._logger, cancellationToken); transaction.Commit(); }