Skip to content

POC #131168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 33 commits into
base: main
Choose a base branch
from
Draft

POC #131168

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch;

import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.core.Nullable;

import java.util.List;

public interface CrossProjectAwareRequest extends IndicesRequest {
/**
* Can be used to determine if this should be processed in cross-project mode vs. stateful CCS.
*/
boolean crossProjectModeEnabled();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be using this method? The javadoc describes what it does but it's unclear to me which parts needs to know that.


/**
* Only called if cross-project rewriting (flat-world, linked project filtering) was applied
*/
void qualified(List<QualifiedExpression> qualifiedExpressions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename this method? This is a mutator, and QualifiedExpression below has qualified() which is a getter, and it creates some congnitive load to try and figure out which one is meant each time. Maybe something like updateIndices or something if we expect it to be used to update indices?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we for sure want a better name -- I'm also wondering if this should go in the IndicesRequest.Replaceable interface which has an indices(...) method on it

Copy link
Contributor Author

@n1v0lg n1v0lg Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require a new method like isCrossProjectAware() on the IndicesRequest.Replaceable interface -- not sure yet which approach I prefer


@Nullable
String queryRouting();

/**
* Used to track a mapping from original expression (potentially flat-world) to canonicalized CCS expressions.
* e.g. for an original index expression `logs-*`, this would be:
* original=logs-*
* qualified=[(logs-*, _local), (my-remote:logs-*, my-remote)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're storing the remote name twice here, as I understand? What's the reason for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ya, we don't really need to -- I added it to clarify that that info will be available but in a production ready version of this, we wouldn't want to duplicate that info, you're right

*/
record QualifiedExpression(String original, List<ExpressionWithProject> qualified) {
public boolean hasFlatOriginalExpression() {
return true;
}
}

record ExpressionWithProject(String expression, String project) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.CrossProjectAwareRequest;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
Expand Down Expand Up @@ -36,11 +37,16 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public final class FieldCapabilitiesRequest extends LegacyActionRequest implements IndicesRequest.Replaceable, ToXContentObject {
public final class FieldCapabilitiesRequest extends LegacyActionRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some way to tell field caps not to do the rewrite, because when processing e.g. lookups we probably don't need that. Alternatively, we'd need to change lookup code so it qualifies all the indices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think that's true but I want to think through how that actually looks, I don't know if we want just a boolean flag

implements
CrossProjectAwareRequest,
IndicesRequest.Replaceable,
ToXContentObject {
public static final String NAME = "field_caps_request";
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();

Expand All @@ -58,6 +64,7 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen
private QueryBuilder indexFilter;
private Map<String, Object> runtimeFields = Collections.emptyMap();
private Long nowInMillis;
private List<QualifiedExpression> qualifiedExpressions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this is meant to be used? We store the result of flat resolution in the indices anyway, so we essentially storing it twice. Not sure I understand why?


public FieldCapabilitiesRequest(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -373,4 +380,25 @@ public String getDescription() {
}
};
}

@Override
public boolean crossProjectModeEnabled() {
return qualifiedExpressions != null;
}

@Override
public void qualified(List<QualifiedExpression> qualifiedExpressions) {
this.qualifiedExpressions = qualifiedExpressions;
indices(
qualifiedExpressions.stream()
.flatMap(indexExpression -> indexExpression.qualified().stream().map(ExpressionWithProject::expression))
.toArray(String[]::new)
);
}

@Override
public String queryRouting() {
// TODO how would this look in ES|QL?
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.search;

import org.elasticsearch.CrossProjectAwareRequest;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
Expand Down Expand Up @@ -53,7 +54,11 @@
* @see Client#search(SearchRequest)
* @see SearchResponse
*/
public class SearchRequest extends LegacyActionRequest implements IndicesRequest.Replaceable, Rewriteable<SearchRequest> {
public class SearchRequest extends LegacyActionRequest
implements
CrossProjectAwareRequest,
IndicesRequest.Replaceable,
Rewriteable<SearchRequest> {

public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

Expand All @@ -70,6 +75,12 @@ public class SearchRequest extends LegacyActionRequest implements IndicesRequest

private String[] indices = Strings.EMPTY_ARRAY;

@Nullable
private String queryRouting = null;

@Nullable
private List<QualifiedExpression> qualifiedExpressions;

@Nullable
private String routing;
@Nullable
Expand Down Expand Up @@ -400,6 +411,11 @@ public SearchRequest indices(String... indices) {
return this;
}

public SearchRequest queryRouting(String queryRouting) {
this.queryRouting = queryRouting;
return this;
}

private static void validateIndices(String... indices) {
Objects.requireNonNull(indices, "indices must not be null");
for (String index : indices) {
Expand Down Expand Up @@ -853,4 +869,24 @@ public String toString() {
+ source
+ '}';
}

@Override
public boolean crossProjectModeEnabled() {
return qualifiedExpressions != null;
}

@Override
public void qualified(List<QualifiedExpression> qualifiedExpressions) {
this.qualifiedExpressions = qualifiedExpressions;
indices(
qualifiedExpressions.stream()
.flatMap(indexExpression -> indexExpression.qualified().stream().map(ExpressionWithProject::expression))
.toArray(String[]::new)
);
}

@Override
public String queryRouting() {
return queryRouting;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CLUSTER_TAGS,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.rest.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -63,6 +65,7 @@ public class RestSearchAction extends BaseRestHandler {
public static final String TYPED_KEYS_PARAM = "typed_keys";
public static final String INCLUDE_NAMED_QUERIES_SCORE_PARAM = "include_named_queries_score";
public static final Set<String> RESPONSE_PARAMS = Set.of(TYPED_KEYS_PARAM, TOTAL_HITS_AS_INT_PARAM, INCLUDE_NAMED_QUERIES_SCORE_PARAM);
private static final Logger log = LogManager.getLogger(RestSearchAction.class);

private final SearchUsageHolder searchUsageHolder;
private final Predicate<NodeFeature> clusterSupportsFeature;
Expand Down Expand Up @@ -98,6 +101,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
client.threadPool().getThreadContext().setErrorTraceTransportHeader(request);
}
SearchRequest searchRequest = new SearchRequest();

// access the BwC param, but just drop it
// this might be set by old clients
request.param("min_compatible_shard_node");
Expand Down Expand Up @@ -167,6 +171,14 @@ public static void parseSearchRequest(
searchRequest.source(new SearchSourceBuilder());
}
searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));

String queryRouting = request.param("query_routing", null);
if (queryRouting != null) {
searchRequest.queryRouting(queryRouting);
} else {
log.info("No query routing defined");
}

if (requestContentParser != null) {
if (searchUsageHolder == null) {
searchRequest.source().parseXContent(requestContentParser, true, clusterSupportsFeature);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -97,6 +98,33 @@ public final class RemoteClusterService extends RemoteClusterAware
(ns, key) -> boolSetting(key, true, new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope)
);

public record Metadata(String key, String value) {
public static Metadata fromString(String tag) {
if (tag == null || tag.isEmpty()) {
throw new IllegalArgumentException("Remote tag must not be null or empty");
}
// - as a separator to simplify search path param parsing; won't be like this in the real implementation
int idx = tag.indexOf('-');
if (idx < 0) {
return new Metadata(tag, "");
} else {
return new Metadata(tag.substring(0, idx), tag.substring(idx + 1));
}
}
}

public static final Setting.AffixSetting<List<Metadata>> REMOTE_CLUSTER_TAGS = Setting.affixKeySetting(
"cluster.remote.",
"tags",
(ns, key) -> Setting.listSetting(
key,
Collections.emptyList(),
Metadata::fromString,
Setting.Property.Dynamic,
Setting.Property.NodeScope
)
);

public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
"cluster.remote.",
"transport.ping_schedule",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.security;

import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;

public interface CrossProjectRemoteServerTransportInterceptor {
// TODO probably don't want this
boolean enabled();

// TODO this should be a wrapper around TransportInterceptor.AsyncSender instead
<T extends TransportResponse> void sendRequest(
TransportInterceptor.AsyncSender sender,
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
);

CustomServerTransportFilter getFilter();

class Default implements CrossProjectRemoteServerTransportInterceptor {
@Override
public boolean enabled() {
return false;
}

@Override
public <T extends TransportResponse> void sendRequest(
TransportInterceptor.AsyncSender sender,
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
sender.sendRequest(connection, action, request, options, handler);
}

@Override
public CustomServerTransportFilter getFilter() {
return new CustomServerTransportFilter.Default();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.security;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.TransportRequest;

public interface CustomServerTransportFilter {
void filter(String securityAction, TransportRequest request, ActionListener<Void> authenticationListener);

class Default implements CustomServerTransportFilter {
@Override
public void filter(String securityAction, TransportRequest request, ActionListener<Void> listener) {
listener.onResponse(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.core.security.authc.service.ServiceAccountTokenStore;
import org.elasticsearch.xpack.core.security.authc.support.UserRoleMapper;
import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine;
import org.elasticsearch.xpack.core.security.authz.CrossProjectTargetResolver;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.store.RoleRetrievalResult;

Expand Down Expand Up @@ -133,6 +134,14 @@ default CustomApiKeyAuthenticator getCustomApiKeyAuthenticator(SecurityComponent
return null;
}

default CrossProjectTargetResolver getCrossProjectTargetResolver(SecurityComponents components) {
return null;
}

default CrossProjectRemoteServerTransportInterceptor getCustomRemoteServerTransportInterceptor(SecurityComponents components) {
return null;
}

/**
* Returns a authorization engine for authorizing requests, or null to use the default authorization mechanism.
*
Expand Down
Loading