-
Notifications
You must be signed in to change notification settings - Fork 65
Add the trigger tables creation on scale provider initialization #1186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9ef7b36
74b6b3b
d1c2add
3255e89
95d534a
c926fb9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,21 +2,38 @@ | |
// Licensed under the MIT License. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry; | ||
using Microsoft.Azure.WebJobs.Host; | ||
using Microsoft.Azure.WebJobs.Host.Scale; | ||
using Microsoft.Azure.WebJobs.Logging; | ||
using Microsoft.Data.SqlClient; | ||
using Microsoft.Extensions.Configuration; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Microsoft.Extensions.Logging; | ||
using Microsoft.Extensions.Options; | ||
using Newtonsoft.Json; | ||
using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerUtils; | ||
using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlBindingUtilities; | ||
using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry; | ||
using System.Diagnostics; | ||
|
||
|
||
namespace Microsoft.Azure.WebJobs.Extensions.Sql | ||
{ | ||
internal class SqlScalerProvider : IScaleMonitorProvider, ITargetScalerProvider | ||
{ | ||
private readonly SqlTriggerScaleMonitor _scaleMonitor; | ||
private readonly SqlTriggerTargetScaler _targetScaler; | ||
private readonly string _connectionString; | ||
private readonly IDictionary<TelemetryPropertyName, string> _telemetryProps = new Dictionary<TelemetryPropertyName, string>(); | ||
private readonly SqlObject _userTable; | ||
private readonly string _userFunctionId; | ||
private readonly int _maxChangesPerWorker; | ||
private readonly ILogger _logger; | ||
private readonly bool _hasConfiguredMaxChangesPerWorker; | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="SqlScalerProvider"/> class. | ||
|
@@ -31,18 +48,21 @@ public SqlScalerProvider(IServiceProvider serviceProvider, TriggerMetadata trigg | |
ILogger logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Sql")); | ||
SqlMetaData sqlMetadata = JsonConvert.DeserializeObject<SqlMetaData>(triggerMetadata.Metadata.ToString()); | ||
sqlMetadata.ResolveProperties(serviceProvider.GetService<INameResolver>()); | ||
var userTable = new SqlObject(sqlMetadata.TableName); | ||
string connectionString = SqlBindingUtilities.GetConnectionString(sqlMetadata.ConnectionStringSetting, config); | ||
this._userTable = new SqlObject(sqlMetadata.TableName); | ||
this._connectionString = GetConnectionString(sqlMetadata.ConnectionStringSetting, config); | ||
IOptions<SqlOptions> options = serviceProvider.GetService<IOptions<SqlOptions>>(); | ||
int configOptionsMaxChangesPerWorker = options.Value.MaxChangesPerWorker; | ||
this._hasConfiguredMaxChangesPerWorker = configOptionsMaxChangesPerWorker != 0; | ||
int configAppSettingsMaxChangesPerWorker = config.GetValue<int>(SqlTriggerConstants.ConfigKey_SqlTrigger_MaxChangesPerWorker); | ||
// Override the maxChangesPerWorker value from config if the value is set in the trigger appsettings | ||
int maxChangesPerWorker = configAppSettingsMaxChangesPerWorker != 0 ? configAppSettingsMaxChangesPerWorker : configOptionsMaxChangesPerWorker != 0 ? configOptionsMaxChangesPerWorker : SqlOptions.DefaultMaxChangesPerWorker; | ||
this._maxChangesPerWorker = maxChangesPerWorker; | ||
this._logger = logger; | ||
string userDefinedLeasesTableName = sqlMetadata.LeasesTableName; | ||
string userFunctionId = sqlMetadata.UserFunctionId; | ||
this._userFunctionId = sqlMetadata.UserFunctionId; | ||
|
||
this._scaleMonitor = new SqlTriggerScaleMonitor(userFunctionId, userTable, userDefinedLeasesTableName, connectionString, maxChangesPerWorker, logger); | ||
this._targetScaler = new SqlTriggerTargetScaler(userFunctionId, userTable, userDefinedLeasesTableName, connectionString, maxChangesPerWorker, logger); | ||
this._scaleMonitor = new SqlTriggerScaleMonitor(this._userFunctionId, this._userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger); | ||
this._targetScaler = new SqlTriggerTargetScaler(this._userFunctionId, this._userTable, userDefinedLeasesTableName, this._connectionString, maxChangesPerWorker, logger); | ||
} | ||
|
||
public IScaleMonitor GetMonitor() | ||
|
@@ -55,6 +75,59 @@ public ITargetScaler GetTargetScaler() | |
return this._targetScaler; | ||
} | ||
|
||
public static async Task<SqlScalerProvider> CreateAsync(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata) | ||
{ | ||
var provider = new SqlScalerProvider(serviceProvider, triggerMetadata); | ||
await provider.InitializeAsync(); | ||
return provider; | ||
} | ||
|
||
|
||
public async Task InitializeAsync(CancellationToken cancellationToken = default) | ||
{ | ||
using (var connection = new SqlConnection(this._connectionString)) | ||
{ | ||
await connection.OpenAsyncWithSqlErrorHandling(cancellationToken); | ||
ServerProperties serverProperties = await GetServerTelemetryProperties(connection, this._logger, cancellationToken); | ||
this._telemetryProps.AddConnectionProps(connection, serverProperties); | ||
await VerifyDatabaseSupported(connection, this._logger, cancellationToken); | ||
|
||
int userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, cancellationToken); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had discussed NOT doing this here and instead having the scaler return the total count of all changes if the tables don't exist. This would cause it to scale up, which would create the actual instance that would then create the tables as normal. Is there a reason you decided not to do that? I'm not thrilled about having table creation happen in multiple places in our code. It makes it more likely that if we ever make any changes to how they're created that we'll miss updating one or the other and cause problems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alrod Could you please explain why this is not possible? We discussed this and you suggested that this is the better route. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it would also work. You need to catch an exception here (schema does not exist, GlobalStateTable does not exists or GlobalStateTableRow does not exists): and return:
Then a worker will be assigned, and listener will create required stuff. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One small change I'd suggest making though is that we actually have the T-SQL query check if there's any changes on the table in those cases, and if there aren't we shouldn't scale up. No reason to spin stuff up unless there's actually changes IMO. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it’s better for the SC to attempt creating the GlobalState, since we only try once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the worker gets in a bad state (e.g. if global state table creation fails in the worker it will throw an exception and exit) will the scaler make a new one? If so then keeping 1 worker around seems like the behavior we want then - since it'll just keep retrying to make the table until it succeeds - which we need for the trigger to work correctly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do not want to keep a worker around just waiting until the issue with creation is fixed as it would affect customer's bill. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would a worker just be "waiting around"? If it fails to create the global state table it will just throw and the worker will exit. Having two separate pieces of functionality trying to work together to create the tables is a recipe for disaster and adds extra complication. What happens if we need to update how the table is created? Now we have to try and coordinate updating both the scale controller and the users app at the same time. And it severely limits our ability to make changes to the state tables. How do other extensions handle this - do they already have the scale controller create necessary resources such as state tables? I believe the MySQL extension is doing something similar so presumably is in the same situation... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to scale out only once right after monitoring started for MSSQL trigger - yes, the bad worker will exit and will never add again until customer explicitly changed the function app. EventHubs have the same scenario - creating checkpoint blob to store position client position in the stream. The blob is created on SDK level by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, I am ok with to scale out 1 instance but only once. |
||
IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumns(connection, userTableId, this._logger, this._userTable.FullName, cancellationToken); | ||
|
||
string bracketedLeasesTableName = GetBracketedLeasesTableName(null, this._userFunctionId, userTableId); | ||
this._telemetryProps[TelemetryPropertyName.LeasesTableName] = bracketedLeasesTableName; | ||
|
||
var transactionSw = Stopwatch.StartNew(); | ||
long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createLeasesTableDurationMs = 0L; | ||
using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead)) | ||
{ | ||
createdSchemaDurationMs = await CreateSchemaAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken); | ||
createGlobalStateTableDurationMs = await CreateGlobalStateTableAsync(connection, transaction, this._telemetryProps, this._logger, cancellationToken); | ||
insertGlobalStateTableRowDurationMs = await InsertGlobalStateTableRowAsync(connection, transaction, userTableId, this._userTable, null, this._userFunctionId, this._logger, cancellationToken); | ||
createLeasesTableDurationMs = await CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, null, this._userFunctionId, this._telemetryProps, this._logger, cancellationToken); | ||
transaction.Commit(); | ||
} | ||
|
||
var measures = new Dictionary<TelemetryMeasureName, double> | ||
{ | ||
[TelemetryMeasureName.CreatedSchemaDurationMs] = createdSchemaDurationMs, | ||
[TelemetryMeasureName.CreateGlobalStateTableDurationMs] = createGlobalStateTableDurationMs, | ||
[TelemetryMeasureName.InsertGlobalStateTableRowDurationMs] = insertGlobalStateTableRowDurationMs, | ||
[TelemetryMeasureName.CreateLeasesTableDurationMs] = createLeasesTableDurationMs, | ||
[TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds, | ||
[TelemetryMeasureName.MaxChangesPerWorker] = this._maxChangesPerWorker | ||
}; | ||
|
||
TelemetryInstance.TrackEvent( | ||
TelemetryEventName.InitializeScaleProvider, | ||
new Dictionary<TelemetryPropertyName, string>(this._telemetryProps) { | ||
{ TelemetryPropertyName.HasConfiguredMaxChangesPerWorker, this._hasConfiguredMaxChangesPerWorker.ToString() } | ||
}, | ||
measures); | ||
} | ||
} | ||
|
||
internal class SqlMetaData | ||
{ | ||
[JsonProperty] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Blocking on an asynchronous call using GetAwaiter().GetResult() can introduce potential deadlocks; consider restructuring the service registration to use an async factory or alternative async initialization pattern if possible.
Copilot uses AI. Check for mistakes.