Skip to content

Add the trigger tables creation on scale provider initialization #1186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/SqlBindingExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Azure.WebJobs.Host.Scale;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Sql
{
Expand Down Expand Up @@ -33,17 +34,23 @@ public static IWebJobsBuilder AddSql(this IWebJobsBuilder builder, Action<SqlOpt
internal static IWebJobsBuilder AddSqlScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
{
IServiceProvider serviceProvider = null;
var scalerProvider = new Lazy<SqlScalerProvider>(() => new SqlScalerProvider(serviceProvider, triggerMetadata));
builder.Services.AddSingleton((Func<IServiceProvider, IScaleMonitorProvider>)delegate (IServiceProvider resolvedServiceProvider)
var scalerProviderTask = new Lazy<Task<SqlScalerProvider>>(() =>
SqlScalerProvider.CreateAsync(serviceProvider, triggerMetadata));

builder.Services.AddSingleton((Func<IServiceProvider, IScaleMonitorProvider>)(resolvedServiceProvider =>
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});
builder.Services.AddSingleton((Func<IServiceProvider, ITargetScalerProvider>)delegate (IServiceProvider resolvedServiceProvider)
// Wait for the async initialization to complete
return scalerProviderTask.Value.GetAwaiter().GetResult();
}));

builder.Services.AddSingleton((Func<IServiceProvider, ITargetScalerProvider>)(resolvedServiceProvider =>
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});
// Wait for the async initialization to complete
return scalerProviderTask.Value.GetAwaiter().GetResult();
}));
Comment on lines +37 to +52
Copy link
Preview

Copilot AI Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Blocking on an asynchronous call using GetAwaiter().GetResult() can introduce potential deadlocks; consider restructuring the service registration to use an async factory or alternative async initialization pattern if possible.

Copilot uses AI. Check for mistakes.


return builder;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/Telemetry/Telemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ public enum TelemetryEventName
TriggerMonitorStart,
Upsert,
InsertGlobalStateTableRow,
BuildRenewLeasesWithEmptyMatchCondtion
BuildRenewLeasesWithEmptyMatchCondtion,
InitializeScaleProvider
}

/// <summary>
Expand Down
83 changes: 78 additions & 5 deletions src/TriggerBinding/SqlScalerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,38 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerUtils;
using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlBindingUtilities;
using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry;
using System.Diagnostics;


namespace Microsoft.Azure.WebJobs.Extensions.Sql
{
internal class SqlScalerProvider : IScaleMonitorProvider, ITargetScalerProvider
{
private readonly SqlTriggerScaleMonitor _scaleMonitor;
private readonly SqlTriggerTargetScaler _targetScaler;
private readonly string _connectionString;
private readonly IDictionary<TelemetryPropertyName, string> _telemetryProps = new Dictionary<TelemetryPropertyName, string>();
private readonly SqlObject _userTable;
private readonly string _userFunctionId;
private readonly int _maxChangesPerWorker;
private readonly ILogger _logger;
private readonly bool _hasConfiguredMaxChangesPerWorker;

/// <summary>
/// Initializes a new instance of the <see cref="SqlScalerProvider"/> class.
Expand All @@ -31,18 +48,21 @@ public SqlScalerProvider(IServiceProvider serviceProvider, TriggerMetadata trigg
ILogger logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Sql"));
SqlMetaData sqlMetadata = JsonConvert.DeserializeObject<SqlMetaData>(triggerMetadata.Metadata.ToString());
sqlMetadata.ResolveProperties(serviceProvider.GetService<INameResolver>());
var userTable = new SqlObject(sqlMetadata.TableName);
string connectionString = SqlBindingUtilities.GetConnectionString(sqlMetadata.ConnectionStringSetting, config);
this._userTable = new SqlObject(sqlMetadata.TableName);
this._connectionString = GetConnectionString(sqlMetadata.ConnectionStringSetting, config);
IOptions<SqlOptions> options = serviceProvider.GetService<IOptions<SqlOptions>>();
int configOptionsMaxChangesPerWorker = options.Value.MaxChangesPerWorker;
this._hasConfiguredMaxChangesPerWorker = configOptionsMaxChangesPerWorker != 0;
int configAppSettingsMaxChangesPerWorker = config.GetValue<int>(SqlTriggerConstants.ConfigKey_SqlTrigger_MaxChangesPerWorker);
// Override the maxChangesPerWorker value from config if the value is set in the trigger appsettings
int maxChangesPerWorker = configAppSettingsMaxChangesPerWorker != 0 ? configAppSettingsMaxChangesPerWorker : configOptionsMaxChangesPerWorker != 0 ? configOptionsMaxChangesPerWorker : SqlOptions.DefaultMaxChangesPerWorker;
this._maxChangesPerWorker = maxChangesPerWorker;
this._logger = logger;
string userDefinedLeasesTableName = sqlMetadata.LeasesTableName;
string userFunctionId = sqlMetadata.UserFunctionId;
this._userFunctionId = sqlMetadata.UserFunctionId;

this._scaleMonitor = new SqlTriggerScaleMonitor(userFunctionId, userTable, userDefinedLeasesTableName, connectionString, maxChangesPerWorker, logger);
this._targetScaler = new SqlTriggerTargetScaler(userFunctionId, userTable, userDefinedLeasesTableName, connectionString, maxChangesPerWorker, logger);
this._scaleMonitor = new SqlTriggerScaleMonitor(this._userFunctionId, this._userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger);
this._targetScaler = new SqlTriggerTargetScaler(this._userFunctionId, this._userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger);
}

public IScaleMonitor GetMonitor()
Expand All @@ -55,6 +75,59 @@ public ITargetScaler GetTargetScaler()
return this._targetScaler;
}

public static async Task<SqlScalerProvider> CreateAsync(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata)
{
var provider = new SqlScalerProvider(serviceProvider, triggerMetadata);
await provider.InitializeAsync();
return provider;
}


public async Task InitializeAsync(CancellationToken cancellationToken = default)
{
using (var connection = new SqlConnection(this._connectionString))
{
await connection.OpenAsyncWithSqlErrorHandling(cancellationToken);
ServerProperties serverProperties = await GetServerTelemetryProperties(connection, this._logger, cancellationToken);
this._telemetryProps.AddConnectionProps(connection, serverProperties);
await VerifyDatabaseSupported(connection, this._logger, cancellationToken);

int userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had discussed NOT doing this here and instead having the scaler return the total count of all changes if the tables don't exist. This would cause it to scale up, which would create the actual instance that would then create the tables as normal.

Is there a reason you decided not to do that? I'm not thrilled about having table creation happen in multiple places in our code. It makes it more likely that if we ever make any changes to how they're created that we'll miss updating one or the other and cause problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alrod Could you please explain why this is not possible? We discussed this and you suggested that this is the better route.

Copy link
Member

@alrod alrod Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it would also work.

You need to catch an exception here (schema does not exist, GlobalStateTable does not exists or GlobalStateTableRow does not exists):
https://github.com/Azure/azure-functions-sql-extension/blob/main/src/TriggerBinding/SqlTriggerTargetScaler.cs#L29

and return:

            return new TargetScalerResult
            {
                TargetWorkerCount = 1
            };

Then a worker will be assigned, and listener will create required stuff.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small change I'd suggest making though is that we actually have the T-SQL query check if there's any changes on the table in those cases, and if there aren't we shouldn't scale up. No reason to spin stuff up unless there's actually changes IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it’s better for the SC to attempt creating the GlobalState, since we only try once.
Otherwise, we might end up with worker 1 being permanently assigned, as the SC will keep voting for it if GlobalState creation fails for any reason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the worker gets in a bad state (e.g. if global state table creation fails in the worker it will throw an exception and exit) will the scaler make a new one? If so then keeping 1 worker around seems like the behavior we want then - since it'll just keep retrying to make the table until it succeeds - which we need for the trigger to work correctly.

Copy link
Member

@alrod alrod Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not want to keep a worker around just waiting until the issue with creation is fixed as it would affect customer's bill.
instead, we can try to create GlobalState from SC not on ScaleProver creation but each time when SC tries to get a vote

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would a worker just be "waiting around"? If it fails to create the global state table it will just throw and the worker will exit.

Having two separate pieces of functionality trying to work together to create the tables is a recipe for disaster and adds extra complication. What happens if we need to update how the table is created? Now we have to try and coordinate updating both the scale controller and the users app at the same time. And it severely limits our ability to make changes to the state tables.

How do other extensions handle this - do they already have the scale controller create necessary resources such as state tables? I believe the MySQL extension is doing something similar so presumably is in the same situation...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to scale out only once right after monitoring started for MSSQL trigger - yes, the bad worker will exit and will never add again until customer explicitly changed the function app.

EventHubs have the same scenario - creating checkpoint blob to store position client position in the stream. The blob is created on SDK level by EventProcessorClient. There is no such thing as SQL SDK for monitoring changes, so the code lives in MSSQL azure functions extension.

Copy link
Member

@alrod alrod Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I am ok with to scale out 1 instance but only once.

IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumns(connection, userTableId, this._logger, this._userTable.FullName, cancellationToken);

string bracketedLeasesTableName = GetBracketedLeasesTableName(null, this._userFunctionId, userTableId);
this._telemetryProps[TelemetryPropertyName.LeasesTableName] = bracketedLeasesTableName;

var transactionSw = Stopwatch.StartNew();
long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createLeasesTableDurationMs = 0L;
using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead))
{
createdSchemaDurationMs = await CreateSchemaAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken);
createGlobalStateTableDurationMs = await CreateGlobalStateTableAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken);
insertGlobalStateTableRowDurationMs = await InsertGlobalStateTableRowAsync(connection, transaction, userTableId, this._userTable, null, this._userFunctionId, this._logger, cancellationToken);
createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, null, this._userFunctionId, this._telemetryProps, this._logger, cancellationToken);
transaction.Commit();
}

var measures = new Dictionary<TelemetryMeasureName, double>
{
[TelemetryMeasureName.CreatedSchemaDurationMs] = createdSchemaDurationMs,
[TelemetryMeasureName.CreateGlobalStateTableDurationMs] = createGlobalStateTableDurationMs,
[TelemetryMeasureName.InsertGlobalStateTableRowDurationMs] = insertGlobalStateTableRowDurationMs,
[TelemetryMeasureName.CreateLeasesTableDurationMs] = createLeasesTableDurationMs,
[TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds,
[TelemetryMeasureName.MaxChangesPerWorker] = this._maxChangesPerWorker
};

TelemetryInstance.TrackEvent(
TelemetryEventName.InitializeScaleProvider,
new Dictionary<TelemetryPropertyName, string>(this._telemetryProps) {
{ TelemetryPropertyName.HasConfiguredMaxChangesPerWorker, this._hasConfiguredMaxChangesPerWorker.ToString() }
},
measures);
}
}

internal class SqlMetaData
{
[JsonProperty]
Expand Down
Loading