Skip to content

Commit a63f648

Browse files
authored
Add OnBeforeRequest callback (#8541) (#8543)
1 parent 214de0f commit a63f648

File tree

7 files changed

+118
-84
lines changed

7 files changed

+118
-84
lines changed

src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
</PropertyGroup>
3232

3333
<ItemGroup>
34-
<PackageReference Include="Elastic.Transport" Version="0.8.0" />
34+
<PackageReference Include="Elastic.Transport" Version="0.8.1" />
3535
<PackageReference Include="PolySharp" Version="1.15.0">
3636
<PrivateAssets>all</PrivateAssets>
3737
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>

src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.cs

Lines changed: 57 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,9 @@
66
using System.Collections.Generic;
77
using System.Diagnostics;
88
using System.Linq;
9-
using System.Runtime.CompilerServices;
10-
using System.Text.Json;
119
using System.Threading.Tasks;
1210
using System.Threading;
1311
using Elastic.Transport;
14-
using Elastic.Transport.Diagnostics;
1512

1613
using Elastic.Clients.Elasticsearch.Requests;
1714

@@ -28,18 +25,23 @@ public partial class ElasticsearchClient
2825
private const string OpenTelemetrySchemaVersion = "https://opentelemetry.io/schemas/1.21.0";
2926

3027
private readonly ITransport<IElasticsearchClientSettings> _transport;
31-
internal static ConditionalWeakTable<JsonSerializerOptions, IElasticsearchClientSettings> SettingsTable { get; } = new();
3228

3329
/// <summary>
3430
/// Creates a client configured to connect to http://localhost:9200.
3531
/// </summary>
36-
public ElasticsearchClient() : this(new ElasticsearchClientSettings(new Uri("http://localhost:9200"))) { }
32+
public ElasticsearchClient() :
33+
this(new ElasticsearchClientSettings(new Uri("http://localhost:9200")))
34+
{
35+
}
3736

3837
/// <summary>
3938
/// Creates a client configured to connect to a node reachable at the provided <paramref name="uri" />.
4039
/// </summary>
4140
/// <param name="uri">The <see cref="Uri" /> to connect to.</param>
42-
public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri)) { }
41+
public ElasticsearchClient(Uri uri) :
42+
this(new ElasticsearchClientSettings(uri))
43+
{
44+
}
4345

4446
/// <summary>
4547
/// Creates a client configured to communicate with Elastic Cloud using the provided <paramref name="cloudId" />.
@@ -51,8 +53,8 @@ public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri))
5153
/// </summary>
5254
/// <param name="cloudId">The Cloud ID of an Elastic Cloud deployment.</param>
5355
/// <param name="credentials">The credentials to use for the connection.</param>
54-
public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) : this(
55-
new ElasticsearchClientSettings(cloudId, credentials))
56+
public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) :
57+
this(new ElasticsearchClientSettings(cloudId, credentials))
5658
{
5759
}
5860

@@ -69,8 +71,7 @@ internal ElasticsearchClient(ITransport<IElasticsearchClientSettings> transport)
6971
{
7072
transport.ThrowIfNull(nameof(transport));
7173
transport.Configuration.ThrowIfNull(nameof(transport.Configuration));
72-
transport.Configuration.RequestResponseSerializer.ThrowIfNull(
73-
nameof(transport.Configuration.RequestResponseSerializer));
74+
transport.Configuration.RequestResponseSerializer.ThrowIfNull(nameof(transport.Configuration.RequestResponseSerializer));
7475
transport.Configuration.Inferrer.ThrowIfNull(nameof(transport.Configuration.Inferrer));
7576

7677
_transport = transport;
@@ -96,47 +97,38 @@ private enum ProductCheckStatus
9697

9798
private partial void SetupNamespaces();
9899

99-
internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(TRequest request)
100-
where TRequest : Request<TRequestParameters>
101-
where TResponse : TransportResponse, new()
102-
where TRequestParameters : RequestParameters, new() =>
103-
DoRequest<TRequest, TResponse, TRequestParameters>(request, null);
104-
105100
internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(
106-
TRequest request,
107-
Action<IRequestConfiguration>? forceConfiguration)
101+
TRequest request)
108102
where TRequest : Request<TRequestParameters>
109103
where TResponse : TransportResponse, new()
110104
where TRequestParameters : RequestParameters, new()
111-
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(false, request, forceConfiguration).EnsureCompleted();
112-
113-
internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
114-
TRequest request,
115-
CancellationToken cancellationToken = default)
116-
where TRequest : Request<TRequestParameters>
117-
where TResponse : TransportResponse, new()
118-
where TRequestParameters : RequestParameters, new()
119-
=> DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, null, cancellationToken);
105+
{
106+
return DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(false, request).EnsureCompleted();
107+
}
120108

121109
internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
122110
TRequest request,
123-
Action<IRequestConfiguration>? forceConfiguration,
124111
CancellationToken cancellationToken = default)
125112
where TRequest : Request<TRequestParameters>
126113
where TResponse : TransportResponse, new()
127114
where TRequestParameters : RequestParameters, new()
128-
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(true, request, forceConfiguration, cancellationToken).AsTask();
115+
{
116+
return DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(true, request, cancellationToken).AsTask();
117+
}
129118

130119
private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(
131120
bool isAsync,
132121
TRequest request,
133-
Action<IRequestConfiguration>? forceConfiguration,
134122
CancellationToken cancellationToken = default)
135123
where TRequest : Request<TRequestParameters>
136124
where TResponse : TransportResponse, new()
137125
where TRequestParameters : RequestParameters, new()
138126
{
139-
// The product check modifies request parameters and therefore must not be executed concurrently.
127+
if (request is null)
128+
{
129+
throw new ArgumentNullException(nameof(request));
130+
}
131+
140132
// We use a lockless CAS approach to make sure that only a single product check request is executed at a time.
141133
// We do not guarantee that the product check is always performed on the first request.
142134

@@ -157,12 +149,12 @@ private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestPar
157149

158150
ValueTask<TResponse> SendRequest()
159151
{
160-
var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
161-
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, resolvedRouteValues);
152+
PrepareRequest<TRequest, TRequestParameters>(request, out var endpointPath, out var postData, out var requestConfiguration, out var routeValues);
153+
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, routeValues);
162154

163155
return isAsync
164-
? new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration, cancellationToken))
165-
: new ValueTask<TResponse>(_transport.Request<TResponse>(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration));
156+
? new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(endpointPath, postData, openTelemetryDataMutator, requestConfiguration, cancellationToken))
157+
: new ValueTask<TResponse>(_transport.Request<TResponse>(endpointPath, postData, openTelemetryDataMutator, requestConfiguration));
166158
}
167159

168160
async ValueTask<TResponse> SendRequestWithProductCheck()
@@ -178,34 +170,35 @@ async ValueTask<TResponse> SendRequestWithProductCheck()
178170
// 32-bit read/write operations are atomic and due to the initial memory barrier, we can be sure that
179171
// no other thread executes the product check at the same time. Locked access is not required here.
180172
if (_productCheckStatus is (int)ProductCheckStatus.InProgress)
173+
{
181174
_productCheckStatus = (int)ProductCheckStatus.NotChecked;
175+
}
182176

183177
throw;
184178
}
185179
}
186180

187181
async ValueTask<TResponse> SendRequestWithProductCheckCore()
188182
{
183+
PrepareRequest<TRequest, TRequestParameters>(request, out var endpointPath, out var postData, out var requestConfiguration, out var routeValues);
184+
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, routeValues);
185+
189186
// Attach product check header
190187

191-
// TODO: The copy constructor should accept null values
192-
var requestConfig = (request.RequestConfiguration is null)
193-
? new RequestConfiguration()
188+
var requestConfig = (requestConfiguration is null)
189+
? new RequestConfiguration
194190
{
195191
ResponseHeadersToParse = new HeadersList("x-elastic-product")
196192
}
197-
: new RequestConfiguration(request.RequestConfiguration)
193+
: new RequestConfiguration(requestConfiguration)
198194
{
199-
ResponseHeadersToParse = (request.RequestConfiguration.ResponseHeadersToParse is { Count: > 0 })
200-
? new HeadersList(request.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product")
195+
ResponseHeadersToParse = (requestConfiguration.ResponseHeadersToParse is { Count: > 0 })
196+
? new HeadersList(requestConfiguration.ResponseHeadersToParse, "x-elastic-product")
201197
: new HeadersList("x-elastic-product")
202198
};
203199

204200
// Send request
205201

206-
var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
207-
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, resolvedRouteValues);
208-
209202
TResponse response;
210203

211204
if (isAsync)
@@ -239,7 +232,9 @@ async ValueTask<TResponse> SendRequestWithProductCheckCore()
239232
: (int)ProductCheckStatus.Failed;
240233

241234
if (_productCheckStatus == (int)ProductCheckStatus.Failed)
235+
{
242236
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);
237+
}
243238

244239
return response;
245240
}
@@ -249,15 +244,17 @@ async ValueTask<TResponse> SendRequestWithProductCheckCore()
249244
where TRequest : Request<TRequestParameters>
250245
where TRequestParameters : RequestParameters, new()
251246
{
252-
// If there are no subscribed listeners, we avoid some work and allocations
247+
// If there are no subscribed listeners, we avoid some work and allocations.
253248
if (!Elastic.Transport.Diagnostics.OpenTelemetry.ElasticTransportActivitySourceHasListeners)
249+
{
254250
return null;
251+
}
255252

256253
return OpenTelemetryDataMutator;
257254

258255
void OpenTelemetryDataMutator(Activity activity)
259256
{
260-
// We fall back to a general operation name in cases where the derived request fails to override the property
257+
// We fall back to a general operation name in cases where the derived request fails to override the property.
261258
var operationName = !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : request.HttpMethod.GetStringValue();
262259

263260
// TODO: Optimisation: We should consider caching these, either for cases where resolvedRouteValues is null, or
@@ -267,7 +264,7 @@ void OpenTelemetryDataMutator(Activity activity)
267264
// The latter may bloat the cache as some combinations of path parts may rarely re-occur.
268265

269266
activity.DisplayName = operationName;
270-
267+
271268
activity.SetTag(OpenTelemetry.SemanticConventions.DbOperation, !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : "unknown");
272269
activity.SetTag($"{OpenTelemetrySpanAttributePrefix}schema_url", OpenTelemetrySchemaVersion);
273270

@@ -282,21 +279,26 @@ void OpenTelemetryDataMutator(Activity activity)
282279
}
283280
}
284281

285-
private (EndpointPath endpointPath, Dictionary<string, string>? resolvedRouteValues, PostData data) PrepareRequest<TRequest, TRequestParameters>(TRequest request)
282+
private void PrepareRequest<TRequest, TRequestParameters>(
283+
TRequest request,
284+
out EndpointPath endpointPath,
285+
out PostData? postData,
286+
out IRequestConfiguration? requestConfiguration,
287+
out Dictionary<string, string>? routeValues)
286288
where TRequest : Request<TRequestParameters>
287289
where TRequestParameters : RequestParameters, new()
288290
{
289-
request.ThrowIfNull(nameof(request), "A request is required.");
290-
291-
var (resolvedUrl, _, routeValues) = request.GetUrl(ElasticsearchClientSettings);
291+
var (resolvedUrl, _, resolvedRouteValues) = request.GetUrl(ElasticsearchClientSettings);
292292
var pathAndQuery = request.RequestParameters.CreatePathWithQueryStrings(resolvedUrl, ElasticsearchClientSettings);
293293

294-
var postData =
295-
request.HttpMethod == HttpMethod.GET ||
296-
request.HttpMethod == HttpMethod.HEAD || !request.SupportsBody
294+
routeValues = resolvedRouteValues;
295+
endpointPath = new EndpointPath(request.HttpMethod, pathAndQuery);
296+
postData =
297+
request.HttpMethod is HttpMethod.GET or HttpMethod.HEAD || !request.SupportsBody
297298
? null
298299
: PostData.Serializable(request);
299300

300-
return (new EndpointPath(request.HttpMethod, pathAndQuery), routeValues, postData);
301+
requestConfiguration = request.RequestConfiguration;
302+
ElasticsearchClientSettings.OnBeforeRequest?.Invoke(this, request, endpointPath, ref postData, ref requestConfiguration);
301303
}
302304
}

src/Elastic.Clients.Elasticsearch/_Shared/Client/NamespacedClientProxy.cs

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,64 +5,55 @@
55
using System;
66
using System.Threading;
77
using System.Threading.Tasks;
8+
89
using Elastic.Clients.Elasticsearch.Requests;
910
using Elastic.Transport;
10-
using Elastic.Transport.Products.Elasticsearch;
1111

1212
namespace Elastic.Clients.Elasticsearch;
1313

1414
public abstract class NamespacedClientProxy
1515
{
16-
private const string InvalidOperation = "The client has not been initialised for proper usage as may have been partially mocked. Ensure you are using a " +
16+
private const string InvalidOperation =
17+
"The client has not been initialised for proper usage as may have been partially mocked. Ensure you are using a " +
1718
"new instance of ElasticsearchClient to perform requests over a network to Elasticsearch.";
1819

1920
protected ElasticsearchClient Client { get; }
2021

2122
/// <summary>
2223
/// Initializes a new instance for mocking.
2324
/// </summary>
24-
protected NamespacedClientProxy() { }
25+
protected NamespacedClientProxy()
26+
{
27+
}
2528

2629
internal NamespacedClientProxy(ElasticsearchClient client) => Client = client;
2730

28-
internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(TRequest request)
29-
where TRequest : Request<TRequestParameters>
30-
where TResponse : TransportResponse, new()
31-
where TRequestParameters : RequestParameters, new()
32-
=> DoRequest<TRequest, TResponse, TRequestParameters>(request, null);
33-
3431
internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(
35-
TRequest request,
36-
Action<IRequestConfiguration>? forceConfiguration)
32+
TRequest request)
3733
where TRequest : Request<TRequestParameters>
3834
where TResponse : TransportResponse, new()
3935
where TRequestParameters : RequestParameters, new()
4036
{
4137
if (Client is null)
42-
ThrowHelper.ThrowInvalidOperationException(InvalidOperation);
38+
{
39+
throw new InvalidOperationException(InvalidOperation);
40+
}
4341

44-
return Client.DoRequest<TRequest, TResponse, TRequestParameters>(request, forceConfiguration);
42+
return Client.DoRequest<TRequest, TResponse, TRequestParameters>(request);
4543
}
4644

4745
internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
4846
TRequest request,
4947
CancellationToken cancellationToken = default)
5048
where TRequest : Request<TRequestParameters>
5149
where TResponse : TransportResponse, new()
52-
where TRequestParameters : RequestParameters, new()
53-
=> DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, null, cancellationToken);
54-
55-
internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
56-
TRequest request,
57-
Action<IRequestConfiguration>? forceConfiguration,
58-
CancellationToken cancellationToken = default)
59-
where TRequest : Request<TRequestParameters>
60-
where TResponse : TransportResponse, new()
6150
where TRequestParameters : RequestParameters, new()
6251
{
6352
if (Client is null)
64-
ThrowHelper.ThrowInvalidOperationException(InvalidOperation);
53+
{
54+
throw new InvalidOperationException(InvalidOperation);
55+
}
6556

66-
return Client.DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, forceConfiguration, cancellationToken);
57+
return Client.DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, cancellationToken);
6758
}
6859
}

src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
using System.Linq;
99
using System.Linq.Expressions;
1010
using System.Reflection;
11+
using System.Runtime.InteropServices;
1112

1213
using Elastic.Clients.Elasticsearch.Esql;
13-
14+
using Elastic.Clients.Elasticsearch.Requests;
1415
using Elastic.Clients.Elasticsearch.Serialization;
1516

1617
using Elastic.Transport;
@@ -110,6 +111,7 @@ public abstract class ElasticsearchClientSettingsBase<TConnectionSettings> :
110111
private readonly FluentDictionary<MemberInfo, PropertyMapping> _propertyMappings = new();
111112
private readonly FluentDictionary<Type, string> _routeProperties = new();
112113
private readonly Serializer _sourceSerializer;
114+
private BeforeRequestEvent? _onBeforeRequest;
113115
private bool _experimentalEnableSerializeNullInferredValues;
114116
private ExperimentalSettings _experimentalSettings = new();
115117

@@ -158,7 +160,7 @@ protected ElasticsearchClientSettingsBase(
158160

159161
FluentDictionary<Type, string> IElasticsearchClientSettings.RouteProperties => _routeProperties;
160162
Serializer IElasticsearchClientSettings.SourceSerializer => _sourceSerializer;
161-
163+
BeforeRequestEvent? IElasticsearchClientSettings.OnBeforeRequest => _onBeforeRequest;
162164
ExperimentalSettings IElasticsearchClientSettings.Experimental => _experimentalSettings;
163165

164166
bool IElasticsearchClientSettings.ExperimentalEnableSerializeNullInferredValues => _experimentalEnableSerializeNullInferredValues;
@@ -322,6 +324,20 @@ public TConnectionSettings DefaultMappingFor(IEnumerable<ClrTypeMapping> typeMap
322324

323325
return (TConnectionSettings)this;
324326
}
327+
328+
/// <inheritdoc cref="IElasticsearchClientSettings.OnBeforeRequest"/>
329+
public TConnectionSettings OnBeforeRequest(BeforeRequestEvent handler)
330+
{
331+
return Assign(handler, static (a, v) => a._onBeforeRequest += v ?? DefaultBeforeRequestHandler);
332+
}
333+
334+
private static void DefaultBeforeRequestHandler(ElasticsearchClient client,
335+
Request request,
336+
EndpointPath endpointPath,
337+
ref PostData? postData,
338+
ref IRequestConfiguration? requestConfiguration)
339+
{
340+
}
325341
}
326342

327343
/// <inheritdoc cref="TransportClientConfigurationValues" />

0 commit comments

Comments
 (0)