Skip to content

Commit 7b9539a

Browse files
markdrothpaulosjca
authored andcommitted
[xDS] add per-authority knob to fallback based on reachability only (grpc#39889)
First part of A95 (grpc/proposal#486). Closes grpc#39889 COPYBARA_INTEGRATE_REVIEW=grpc#39889 from markdroth:xds_fallback_criteria_knob 70702bf PiperOrigin-RevId: 777375509
1 parent 21df43f commit 7b9539a

File tree

5 files changed

+292
-5
lines changed

5 files changed

+292
-5
lines changed

src/core/xds/grpc/xds_bootstrap_grpc.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ const JsonLoaderInterface* GrpcXdsBootstrap::GrpcAuthority::JsonLoader(
8484
"client_listener_resource_name_template",
8585
&GrpcAuthority::client_listener_resource_name_template_)
8686
.OptionalField("xds_servers", &GrpcAuthority::servers_)
87+
.OptionalField("fallback_on_reachability_only",
88+
&GrpcAuthority::fallback_on_reachability_only_)
8789
.Finish();
8890
return loader;
8991
}

src/core/xds/grpc/xds_bootstrap_grpc.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ class GrpcXdsBootstrap final : public XdsBootstrap {
8585
return servers;
8686
}
8787

88+
bool FallbackOnReachabilityOnly() const override {
89+
return fallback_on_reachability_only_;
90+
}
91+
8892
const std::string& client_listener_resource_name_template() const {
8993
return client_listener_resource_name_template_;
9094
}
@@ -94,6 +98,7 @@ class GrpcXdsBootstrap final : public XdsBootstrap {
9498
private:
9599
std::vector<GrpcXdsServer> servers_;
96100
std::string client_listener_resource_name_template_;
101+
bool fallback_on_reachability_only_;
97102
};
98103

99104
// Creates bootstrap object from json_string.

src/core/xds/xds_client/xds_bootstrap.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ class XdsBootstrap {
8888
virtual ~Authority() = default;
8989

9090
virtual std::vector<const XdsServer*> servers() const = 0;
91+
92+
virtual bool FallbackOnReachabilityOnly() const = 0;
9193
};
9294

9395
virtual ~XdsBootstrap() = default;

src/core/xds/xds_client/xds_client.cc

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@
4747
#include "src/core/lib/iomgr/exec_ctx.h"
4848
#include "src/core/util/backoff.h"
4949
#include "src/core/util/debug_location.h"
50+
#include "src/core/util/env.h"
5051
#include "src/core/util/orphanable.h"
5152
#include "src/core/util/ref_counted_ptr.h"
53+
#include "src/core/util/string.h"
5254
#include "src/core/util/sync.h"
5355
#include "src/core/util/upb_utils.h"
5456
#include "src/core/util/uri.h"
@@ -68,6 +70,18 @@
6870

6971
namespace grpc_core {
7072

73+
namespace {
74+
75+
bool XdsEndpointFallbackEnabled() {
76+
auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_ENDPOINT_FALLBACK");
77+
if (!value.has_value()) return false;
78+
bool parsed_value;
79+
bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
80+
return parse_succeeded && parsed_value;
81+
}
82+
83+
} // namespace
84+
7185
using ::grpc_event_engine::experimental::EventEngine;
7286

7387
//
@@ -489,13 +503,20 @@ void XdsClient::XdsChannel::UnsubscribeLocked(const XdsResourceType* type,
489503

490504
bool XdsClient::XdsChannel::MaybeFallbackLocked(
491505
const std::string& authority, AuthorityState& authority_state) {
492-
if (!xds_client_->HasUncachedResources(authority_state)) {
493-
return false;
494-
}
495506
std::vector<const XdsBootstrap::XdsServer*> xds_servers;
507+
bool fallback_on_reachability_only = false;
496508
if (authority != kOldStyleAuthority) {
497-
xds_servers =
498-
xds_client_->bootstrap().LookupAuthority(authority)->servers();
509+
auto* bootstrap_authority =
510+
xds_client_->bootstrap().LookupAuthority(authority);
511+
xds_servers = bootstrap_authority->servers();
512+
if (XdsEndpointFallbackEnabled()) {
513+
fallback_on_reachability_only =
514+
bootstrap_authority->FallbackOnReachabilityOnly();
515+
}
516+
}
517+
if (!fallback_on_reachability_only &&
518+
!xds_client_->HasUncachedResources(authority_state)) {
519+
return false;
499520
}
500521
if (xds_servers.empty()) xds_servers = xds_client_->bootstrap().servers();
501522
for (size_t i = authority_state.xds_channels.size(); i < xds_servers.size();

test/core/xds/xds_client_test.cc

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,17 @@ class XdsClientTest : public ::testing::Test {
201201
server_ = std::move(server);
202202
}
203203

204+
bool FallbackOnReachabilityOnly() const override {
205+
return fallback_on_reachability_only_;
206+
}
207+
208+
void SetFallbackOnReachabilityOnly() {
209+
fallback_on_reachability_only_ = true;
210+
}
211+
204212
private:
205213
std::optional<FakeXdsServer> server_;
214+
bool fallback_on_reachability_only_ = false;
206215
};
207216

208217
class Builder {
@@ -6263,6 +6272,254 @@ TEST_F(XdsClientTest, FallbackOnStartup) {
62636272
/*resource_names=*/{"foo1"});
62646273
}
62656274

6275+
TEST_F(XdsClientTest, FallbackOnReachabilityOnly) {
6276+
testing::ScopedExperimentalEnvVar env_var(
6277+
"GRPC_EXPERIMENTAL_XDS_ENDPOINT_FALLBACK");
6278+
constexpr char kAuthority[] = "xds.example.com";
6279+
const std::string kXdstpResourceName = absl::StrCat(
6280+
"xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo1");
6281+
FakeXdsBootstrap::FakeAuthority authority;
6282+
authority.SetFallbackOnReachabilityOnly();
6283+
FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
6284+
FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
6285+
InitXdsClient(FakeXdsBootstrap::Builder()
6286+
.AddAuthority(kAuthority, authority)
6287+
.SetServers({primary_server, fallback_server}));
6288+
// Start a watch.
6289+
auto watcher = StartFooWatch(kXdstpResourceName);
6290+
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
6291+
kDefaultXdsServerUrl, true)));
6292+
EXPECT_THAT(
6293+
GetResourceCounts(),
6294+
::testing::ElementsAre(::testing::Pair(
6295+
ResourceCountLabelsEq(
6296+
kAuthority, XdsFooResourceType::Get()->type_url(), "requested"),
6297+
1)));
6298+
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
6299+
::testing::IsEmpty(), ::testing::_, ::testing::ElementsAre()));
6300+
// CSDS should show that the resource has been requested.
6301+
ClientConfig csds = DumpCsds();
6302+
EXPECT_THAT(csds.generic_xds_configs(),
6303+
::testing::ElementsAre(CsdsResourceRequested(
6304+
XdsFooResourceType::Get()->type_url(), kXdstpResourceName)));
6305+
// XdsClient should have created an ADS stream to the primary server.
6306+
auto stream = WaitForAdsStream();
6307+
ASSERT_TRUE(stream != nullptr);
6308+
// XdsClient should have sent a subscription request on the ADS stream.
6309+
auto request = WaitForRequest(stream.get());
6310+
ASSERT_TRUE(request.has_value());
6311+
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
6312+
/*version_info=*/"", /*response_nonce=*/"",
6313+
/*error_detail=*/absl::OkStatus(),
6314+
/*resource_names=*/{kXdstpResourceName});
6315+
// Primary server sends initial response.
6316+
stream->SendMessageToClient(
6317+
ResponseBuilder(XdsFooResourceType::Get()->type_url())
6318+
.set_version_info("20")
6319+
.set_nonce("O")
6320+
.AddFooResource(XdsFooResource(kXdstpResourceName, 6))
6321+
.Serialize());
6322+
// Resource should be delivered to watcher.
6323+
auto resource = watcher->WaitForNextResource();
6324+
ASSERT_NE(resource, nullptr);
6325+
EXPECT_EQ(resource->name, kXdstpResourceName);
6326+
EXPECT_EQ(resource->value, 6);
6327+
// Metrics should show 1 resource update and 1 cached resource.
6328+
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
6329+
::testing::ElementsAre(::testing::Pair(
6330+
::testing::Pair(kDefaultXdsServerUrl,
6331+
XdsFooResourceType::Get()->type_url()),
6332+
1)),
6333+
::testing::_, ::testing::_));
6334+
EXPECT_THAT(
6335+
GetResourceCounts(),
6336+
::testing::ElementsAre(::testing::Pair(
6337+
ResourceCountLabelsEq(kAuthority,
6338+
XdsFooResourceType::Get()->type_url(), "acked"),
6339+
1)));
6340+
// Check CSDS data.
6341+
csds = DumpCsds();
6342+
EXPECT_THAT(csds.generic_xds_configs(),
6343+
::testing::UnorderedElementsAre(CsdsResourceAcked(
6344+
XdsFooResourceType::Get()->type_url(), kXdstpResourceName,
6345+
resource->AsJsonString(), "20", TimestampProtoEq(kTime0))));
6346+
// Client should send ACK to server.
6347+
request = WaitForRequest(stream.get());
6348+
ASSERT_TRUE(request.has_value());
6349+
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
6350+
/*version_info=*/"20", /*response_nonce=*/"O",
6351+
/*error_detail=*/absl::OkStatus(),
6352+
/*resource_names=*/{kXdstpResourceName});
6353+
// Trigger connection failure to primary.
6354+
TriggerConnectionFailure(primary_server,
6355+
absl::UnavailableError("Server down"));
6356+
// This should trigger fallback.
6357+
auto fallback_stream = WaitForAdsStream(fallback_server);
6358+
ASSERT_NE(fallback_stream, nullptr);
6359+
request = WaitForRequest(fallback_stream.get());
6360+
ASSERT_TRUE(request.has_value());
6361+
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
6362+
/*version_info=*/"", /*response_nonce=*/"",
6363+
/*error_detail=*/absl::OkStatus(),
6364+
/*resource_names=*/{kXdstpResourceName});
6365+
// Metrics should show primary channel failing and fallback channel working.
6366+
EXPECT_THAT(
6367+
GetServerConnections(),
6368+
::testing::ElementsAre(
6369+
::testing::Pair(kDefaultXdsServerUrl, false),
6370+
::testing::Pair(fallback_server.target()->server_uri(), true)));
6371+
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
6372+
::testing::_, ::testing::_,
6373+
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
6374+
// Fallback server sends a response.
6375+
fallback_stream->SendMessageToClient(
6376+
ResponseBuilder(XdsFooResourceType::Get()->type_url())
6377+
.set_version_info("5")
6378+
.set_nonce("A")
6379+
.AddFooResource(XdsFooResource(kXdstpResourceName, 30))
6380+
.Serialize());
6381+
// Resource is delivered to watcher.
6382+
resource = watcher->WaitForNextResource();
6383+
ASSERT_NE(resource, nullptr);
6384+
EXPECT_EQ(resource->name, kXdstpResourceName);
6385+
EXPECT_EQ(resource->value, 30);
6386+
// Metrics show update.
6387+
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
6388+
::testing::ElementsAre(
6389+
::testing::Pair(
6390+
::testing::Pair(kDefaultXdsServerUrl,
6391+
XdsFooResourceType::Get()->type_url()),
6392+
1),
6393+
::testing::Pair(
6394+
::testing::Pair(fallback_server.target()->server_uri(),
6395+
XdsFooResourceType::Get()->type_url()),
6396+
1)),
6397+
::testing::_, ::testing::_));
6398+
EXPECT_THAT(
6399+
GetResourceCounts(),
6400+
::testing::ElementsAre(::testing::Pair(
6401+
ResourceCountLabelsEq(kAuthority,
6402+
XdsFooResourceType::Get()->type_url(), "acked"),
6403+
1)));
6404+
// Check CSDS data.
6405+
csds = DumpCsds();
6406+
EXPECT_THAT(csds.generic_xds_configs(),
6407+
::testing::UnorderedElementsAre(CsdsResourceAcked(
6408+
XdsFooResourceType::Get()->type_url(), kXdstpResourceName,
6409+
resource->AsJsonString(), "5", TimestampProtoEq(kTime0))));
6410+
// Client should send ACK to fallback server.
6411+
request = WaitForRequest(fallback_stream.get());
6412+
ASSERT_TRUE(request.has_value());
6413+
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
6414+
/*version_info=*/"5", /*response_nonce=*/"A",
6415+
/*error_detail=*/absl::OkStatus(),
6416+
/*resource_names=*/{kXdstpResourceName});
6417+
// Clean up.
6418+
CancelFooWatch(watcher.get(), kXdstpResourceName);
6419+
EXPECT_TRUE(stream->IsOrphaned());
6420+
EXPECT_TRUE(fallback_stream->IsOrphaned());
6421+
}
6422+
6423+
TEST_F(XdsClientTest, FallbackOnReachabilityOnlyNotEnabled) {
6424+
constexpr char kAuthority[] = "xds.example.com";
6425+
const std::string kXdstpResourceName = absl::StrCat(
6426+
"xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo1");
6427+
FakeXdsBootstrap::FakeAuthority authority;
6428+
authority.SetFallbackOnReachabilityOnly();
6429+
FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
6430+
FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
6431+
InitXdsClient(FakeXdsBootstrap::Builder()
6432+
.AddAuthority(kAuthority, authority)
6433+
.SetServers({primary_server, fallback_server}));
6434+
// Start a watch.
6435+
auto watcher = StartFooWatch(kXdstpResourceName);
6436+
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
6437+
kDefaultXdsServerUrl, true)));
6438+
EXPECT_THAT(
6439+
GetResourceCounts(),
6440+
::testing::ElementsAre(::testing::Pair(
6441+
ResourceCountLabelsEq(
6442+
kAuthority, XdsFooResourceType::Get()->type_url(), "requested"),
6443+
1)));
6444+
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
6445+
::testing::IsEmpty(), ::testing::_, ::testing::ElementsAre()));
6446+
// CSDS should show that the resource has been requested.
6447+
ClientConfig csds = DumpCsds();
6448+
EXPECT_THAT(csds.generic_xds_configs(),
6449+
::testing::ElementsAre(CsdsResourceRequested(
6450+
XdsFooResourceType::Get()->type_url(), kXdstpResourceName)));
6451+
// XdsClient should have created an ADS stream.
6452+
auto stream = WaitForAdsStream();
6453+
ASSERT_TRUE(stream != nullptr);
6454+
// XdsClient should have sent a subscription request on the ADS stream.
6455+
auto request = WaitForRequest(stream.get());
6456+
ASSERT_TRUE(request.has_value());
6457+
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
6458+
/*version_info=*/"", /*response_nonce=*/"",
6459+
/*error_detail=*/absl::OkStatus(),
6460+
/*resource_names=*/{kXdstpResourceName});
6461+
// Server sends resource.
6462+
stream->SendMessageToClient(
6463+
ResponseBuilder(XdsFooResourceType::Get()->type_url())
6464+
.set_version_info("20")
6465+
.set_nonce("O")
6466+
.AddFooResource(XdsFooResource(kXdstpResourceName, 6))
6467+
.Serialize());
6468+
// Resource should be delivered to watcher.
6469+
auto resource = watcher->WaitForNextResource();
6470+
ASSERT_NE(resource, nullptr);
6471+
EXPECT_EQ(resource->name, kXdstpResourceName);
6472+
EXPECT_EQ(resource->value, 6);
6473+
// Metrics should show 1 resource update and 1 cached resource.
6474+
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
6475+
::testing::ElementsAre(::testing::Pair(
6476+
::testing::Pair(kDefaultXdsServerUrl,
6477+
XdsFooResourceType::Get()->type_url()),
6478+
1)),
6479+
::testing::_, ::testing::_));
6480+
EXPECT_THAT(
6481+
GetResourceCounts(),
6482+
::testing::ElementsAre(::testing::Pair(
6483+
ResourceCountLabelsEq(kAuthority,
6484+
XdsFooResourceType::Get()->type_url(), "acked"),
6485+
1)));
6486+
// Check CSDS data.
6487+
csds = DumpCsds();
6488+
EXPECT_THAT(csds.generic_xds_configs(),
6489+
::testing::UnorderedElementsAre(CsdsResourceAcked(
6490+
XdsFooResourceType::Get()->type_url(), kXdstpResourceName,
6491+
resource->AsJsonString(), "20", TimestampProtoEq(kTime0))));
6492+
// Client should send ACK to server.
6493+
request = WaitForRequest(stream.get());
6494+
ASSERT_TRUE(request.has_value());
6495+
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
6496+
/*version_info=*/"20", /*response_nonce=*/"O",
6497+
/*error_detail=*/absl::OkStatus(),
6498+
/*resource_names=*/{kXdstpResourceName});
6499+
// Trigger connection failure to primary.
6500+
TriggerConnectionFailure(primary_server,
6501+
absl::UnavailableError("Server down"));
6502+
// Error should be reported to the watcher.
6503+
auto error = watcher->WaitForNextAmbientError();
6504+
ASSERT_TRUE(error.has_value());
6505+
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
6506+
EXPECT_EQ(error->message(),
6507+
"xDS channel for server default_xds_server: Server down (node "
6508+
"ID:xds_client_test)");
6509+
// This should NOT trigger fallback.
6510+
auto fallback_stream = WaitForAdsStream(fallback_server);
6511+
ASSERT_EQ(fallback_stream, nullptr);
6512+
// Metrics should show primary channel failing, but no fallback channel.
6513+
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
6514+
kDefaultXdsServerUrl, false)));
6515+
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
6516+
::testing::_, ::testing::_,
6517+
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
6518+
// Clean up.
6519+
CancelFooWatch(watcher.get(), kXdstpResourceName);
6520+
EXPECT_TRUE(stream->IsOrphaned());
6521+
}
6522+
62666523
} // namespace
62676524
} // namespace testing
62686525
} // namespace grpc_core

0 commit comments

Comments
 (0)