Skip to content

Commit 3035eba

Browse files
[BRC-1778] Add mechanism to compute_ctl to pull a new config
## Problem We have been dealing with a number of issues with the SC compute notification mechanism. Various race conditions exist in the PG/HCC/cplane/PS distributed system, and relying on the SC to send notifications to the compute node to notify it of PS changes is not robust. We decided to pursue a more robust option where the compute node itself discovers whether it may be pointing to the incorrect PSs and proactively reconfigure itself if issues are suspected. ## Summary of changes To support this self-healing reconfiguration mechanism several pieces are needed. This PR adds a mechanism to `compute_ctl` called "refresh configuration", where the compute node reaches out to the control plane to pull a new config and reconfigure PG using the new config, instead of listening for a notification message containing a config to arrive from the control plane. Main changes to compute_ctl: 1. The `compute_ctl` state machine now has a new State, `RefreshConfigurationPending`. The compute node may enter this state upon receiving a signal that it may be using the incorrect page servers. 2. Upon entering the `RefreshConfigurationPending` state, the background configurator thread in `compute_ctl` wakes up, pulls a new config from the control plane, and reconfigures PG (with `pg_ctl reload`) according to the new config. 3. The compute node may enter the new `RefreshConfigurationPending` state from `Running` or `Failed` states. If the configurator managed to configure the compute node successfully, it will enter the `Running` state, otherwise, it stays in `RefreshConfigurationPending` and the configurator thread will wait for the next notification if an incorrect config is still suspected. 4. Added various plumbing in `compute_ctl` data structures to allow the configurator thread to perform the config fetch. The "incorrect config suspected" notification is delivered using a HTTP endpoint, `/refresh_configuration`, on `compute_ctl`. This endpoint is currently not called by anyone other than the tests. In a follow up PR I will set up some code in the PG extension/libpagestore to call this HTTP endpoint whenever PG suspects that it is pointing to the wrong page servers. ## How is this tested? Modified `test_runner/regress/test_change_pageserver.py` to add a scenario where we use the new `/refresh_configuration` mechanism instead of the existing `/configure` mechanism (which requires us sending a full config to compute_ctl) to have the compute node reload and reconfigure its pageservers. I took one shortcut to reduce the scope of this change when it comes to testing: the compute node uses a local config file instead of pulling a config over the network from the HCC. This simplifies the test setup in the following ways: * The existing test framework is set up to use local config files for compute nodes only, so it's convenient if I just stick with it. * The HCC today generates a compute config with production settings (e.g., assuming 4 CPUs, 16GB RAM, with local file caches), which is probably not suitable in tests. We may need to add another test-only endpoint config to the control plane to make this work. The config-fetch part of the code is relatively straightforward (and well-covered in both production and the KIND test) so it is probably fine to replace it with loading from the local config file for these integration tests. In addition to making sure that the tests pass, I also manually inspected the logs to make sure that the compute node is indeed reloading the config using the new mechanism instead of going down the old `/configure` path (it turns out the test has bugs which causes compute `/configure` messages to be sent despite the test intending to disable/blackhole them). ```test 2024-09-24T18:53:29.573650Z INFO http request{otel.name=/refresh_configuration http.method=POST}: serving /refresh_configuration POST request 2024-09-24T18:53:29.573689Z INFO configurator_main_loop: compute node suspects its configuration is out of date, now refreshing configuration 2024-09-24T18:53:29.573706Z INFO configurator_main_loop: reloading config.json from path: /workspaces/hadron/test_output/test_change_pageserver_using_refresh[release-pg16]/repo/endpoints/ep-1/spec.json PG:2024-09-24 18:53:29.574 GMT [52799] LOG: received SIGHUP, reloading configuration files PG:2024-09-24 18:53:29.575 GMT [52799] LOG: parameter "neon.extension_server_port" cannot be changed without restarting the server PG:2024-09-24 18:53:29.575 GMT [52799] LOG: parameter "neon.pageserver_connstring" changed to "postgresql://no_user@localhost:15008" ... ``` Co-authored-by: Tristan Partin <tristan.partin@databricks.com>
1 parent a56afee commit 3035eba

File tree

12 files changed

+299
-26
lines changed

12 files changed

+299
-26
lines changed

compute_tools/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,14 @@ stateDiagram-v2
5252
Init --> Running : Started Postgres
5353
Running --> TerminationPendingFast : Requested termination
5454
Running --> TerminationPendingImmediate : Requested termination
55+
Running --> ConfigurationPending : Received a /configure request with spec
56+
Running --> RefreshConfigurationPending : Received a /refresh_configuration request, compute node will pull a new spec and reconfigure
57+
RefreshConfigurationPending --> Running : Compute has been re-configured
5558
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
5659
TerminationPendingImmediate --> Terminated : Terminated compute immediately
60+
Running --> TerminationPending : Requested termination
61+
TerminationPending --> Terminated : Terminated compute
62+
Failed --> RefreshConfigurationPending : Received a /refresh_configuration request
5763
Failed --> [*] : Compute exited
5864
Terminated --> [*] : Compute exited
5965
```

compute_tools/src/bin/compute_ctl.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,9 @@ fn main() -> Result<()> {
235235
pg_isready_bin: get_pg_isready_bin(&cli.pgbin),
236236
instance_id: std::env::var("INSTANCE_ID").ok(),
237237
lakebase_mode: cli.lakebase_mode,
238+
build_tag: BUILD_TAG.to_string(),
239+
control_plane_uri: cli.control_plane_uri,
240+
config_path_test_only: cli.config,
238241
},
239242
config,
240243
)?;

compute_tools/src/compute.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use postgres::NoTls;
2121
use postgres::error::SqlState;
2222
use remote_storage::{DownloadError, RemotePath};
2323
use std::collections::{HashMap, HashSet};
24+
use std::ffi::OsString;
2425
use std::os::unix::fs::{PermissionsExt, symlink};
2526
use std::path::Path;
2627
use std::process::{Command, Stdio};
@@ -120,6 +121,10 @@ pub struct ComputeNodeParams {
120121
// Path to the `pg_isready` binary.
121122
pub pg_isready_bin: String,
122123
pub lakebase_mode: bool,
124+
125+
pub build_tag: String,
126+
pub control_plane_uri: Option<String>,
127+
pub config_path_test_only: Option<OsString>,
123128
}
124129

125130
type TaskHandle = Mutex<Option<JoinHandle<()>>>;
@@ -1796,12 +1801,12 @@ impl ComputeNode {
17961801
let states_allowing_configuration_refresh = [
17971802
ComputeStatus::Running,
17981803
ComputeStatus::Failed,
1799-
// ComputeStatus::RefreshConfigurationPending,
1804+
ComputeStatus::RefreshConfigurationPending,
18001805
];
18011806

1802-
let state = self.state.lock().expect("state lock poisoned");
1807+
let mut state = self.state.lock().expect("state lock poisoned");
18031808
if states_allowing_configuration_refresh.contains(&state.status) {
1804-
// state.status = ComputeStatus::RefreshConfigurationPending;
1809+
state.status = ComputeStatus::RefreshConfigurationPending;
18051810
self.state_changed.notify_all();
18061811
Ok(())
18071812
} else if state.status == ComputeStatus::Init {
@@ -1988,6 +1993,7 @@ impl ComputeNode {
19881993
// wait
19891994
ComputeStatus::Init
19901995
| ComputeStatus::Configuration
1996+
| ComputeStatus::RefreshConfigurationPending
19911997
| ComputeStatus::Empty => {
19921998
state = self.state_changed.wait(state).unwrap();
19931999
}

compute_tools/src/configurator.rs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
use std::sync::Arc;
1+
use std::fs::File;
22
use std::thread;
3+
use std::{path::Path, sync::Arc};
34

4-
use compute_api::responses::ComputeStatus;
5+
use compute_api::responses::{ComputeConfig, ComputeStatus};
56
use tracing::{error, info, instrument};
67

7-
use crate::compute::ComputeNode;
8+
use crate::compute::{ComputeNode, ParsedSpec};
9+
use crate::spec::get_config_from_control_plane;
810

911
#[instrument(skip_all)]
1012
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
@@ -38,6 +40,80 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
3840
// std::thread::sleep(std::time::Duration::from_millis(10000));
3941

4042
compute.set_status(new_status);
43+
} else if state.status == ComputeStatus::RefreshConfigurationPending {
44+
info!(
45+
"compute node suspects its configuration is out of date, now refreshing configuration"
46+
);
47+
// Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC.
48+
// This is the only thread that can move compute_ctl out of the `RefreshConfigurationPending` state, so it
49+
// is safe to drop the lock like this.
50+
drop(state);
51+
52+
let spec = if let Some(config_path) = &compute.params.config_path_test_only {
53+
// This path is only to make testing easier. In production we always get the spec from the HCC.
54+
info!(
55+
"reloading config.json from path: {}",
56+
config_path.to_string_lossy()
57+
);
58+
let path = Path::new(config_path);
59+
if let Ok(file) = File::open(path) {
60+
match serde_json::from_reader::<File, ComputeConfig>(file) {
61+
Ok(config) => config.spec,
62+
Err(e) => {
63+
error!("could not parse spec file: {}", e);
64+
None
65+
}
66+
}
67+
} else {
68+
error!(
69+
"could not open config file at path: {}",
70+
config_path.to_string_lossy()
71+
);
72+
None
73+
}
74+
} else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
75+
match get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) {
76+
Ok(config) => config.spec,
77+
Err(e) => {
78+
error!("could not get config from control plane: {}", e);
79+
None
80+
}
81+
}
82+
} else {
83+
None
84+
};
85+
86+
if let Some(spec) = spec {
87+
if let Ok(pspec) = ParsedSpec::try_from(spec) {
88+
{
89+
let mut state = compute.state.lock().unwrap();
90+
// Defensive programming to make sure this thread is indeed the only one that can move the compute
91+
// node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant
92+
// into the type system.
93+
assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending);
94+
// state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
95+
// the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
96+
// "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
97+
// but it's not worth forking the codebase too much for this minor point alone right now.
98+
state.pspec = Some(pspec);
99+
}
100+
match compute.reconfigure() {
101+
Ok(_) => {
102+
info!("Refresh configuration: compute node configured");
103+
compute.set_status(ComputeStatus::Running);
104+
}
105+
Err(e) => {
106+
error!(
107+
"Refresh configuration: could not configure compute node: {}",
108+
e
109+
);
110+
// Leave the compute node in the `RefreshConfigurationPending` state if the configuration
111+
// was not successful. It should be okay to treat this situation the same as if the loop
112+
// hasn't executed yet as long as the detection side keeps notifying.
113+
}
114+
}
115+
}
116+
}
41117
} else if state.status == ComputeStatus::Failed {
42118
info!("compute node is now in Failed state, exiting");
43119
break;

compute_tools/src/http/routes/refresh_configuration.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,22 @@ use axum::{
77
response::{IntoResponse, Response},
88
};
99
use http::StatusCode;
10-
use tracing::debug;
1110

1211
use crate::compute::ComputeNode;
1312
// use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS;
1413
use crate::http::JsonResponse;
1514

16-
// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec
17-
// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait
18-
// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause
19-
// configuration to be reloaded in a best effort manner. Invocation of this method does not
20-
// guarantee that a reconfiguration will occur. The caller should consider keep sending this
21-
// request while it believes that the compute configuration is out of date.
15+
/// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec
16+
/// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait
17+
/// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause
18+
/// configuration to be reloaded in a best effort manner. Invocation of this method does not
19+
/// guarantee that a reconfiguration will occur. The caller should consider keep sending this
20+
/// request while it believes that the compute configuration is out of date.
2221
pub(in crate::http) async fn refresh_configuration(
2322
State(compute): State<Arc<ComputeNode>>,
2423
) -> Response {
25-
debug!("serving /refresh_configuration POST request");
26-
// POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc();
2724
match compute.signal_refresh_configuration().await {
2825
Ok(_) => StatusCode::OK.into_response(),
29-
Err(e) => {
30-
tracing::error!("error handling /refresh_configuration request: {}", e);
31-
JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e)
32-
}
26+
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
3327
}
3428
}

compute_tools/src/http/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ use super::{
2323
middleware::authorize::Authorize,
2424
routes::{
2525
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
26-
grants, insights, lfc, metrics, metrics_json, promote, status, terminate,
26+
grants, hadron_liveness_probe, insights, lfc, metrics, metrics_json, promote,
27+
refresh_configuration, status, terminate,
2728
},
2829
};
2930
use crate::compute::ComputeNode;
30-
use crate::http::routes::{hadron_liveness_probe, refresh_configuration};
3131

3232
/// `compute_ctl` has two servers: internal and external. The internal server
3333
/// binds to the loopback interface and handles communication from clients on

control_plane/src/bin/neon_local.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,9 @@ enum EndpointCmd {
560560
Create(EndpointCreateCmdArgs),
561561
Start(EndpointStartCmdArgs),
562562
Reconfigure(EndpointReconfigureCmdArgs),
563+
RefreshConfiguration(EndpointRefreshConfigurationArgs),
563564
Stop(EndpointStopCmdArgs),
565+
UpdatePageservers(EndpointUpdatePageserversCmdArgs),
564566
GenerateJwt(EndpointGenerateJwtCmdArgs),
565567
}
566568

@@ -721,6 +723,13 @@ struct EndpointReconfigureCmdArgs {
721723
safekeepers: Option<String>,
722724
}
723725

726+
#[derive(clap::Args)]
727+
#[clap(about = "Refresh the endpoint's configuration by forcing it reload it's spec")]
728+
struct EndpointRefreshConfigurationArgs {
729+
#[clap(help = "Postgres endpoint id")]
730+
endpoint_id: String,
731+
}
732+
724733
#[derive(clap::Args)]
725734
#[clap(about = "Stop an endpoint")]
726735
struct EndpointStopCmdArgs {
@@ -738,6 +747,16 @@ struct EndpointStopCmdArgs {
738747
mode: EndpointTerminateMode,
739748
}
740749

750+
#[derive(clap::Args)]
751+
#[clap(about = "Update the pageservers in the spec file of the compute endpoint")]
752+
struct EndpointUpdatePageserversCmdArgs {
753+
#[clap(help = "Postgres endpoint id")]
754+
endpoint_id: String,
755+
756+
#[clap(short = 'p', long, help = "Specified pageserver id")]
757+
pageserver_id: Option<NodeId>,
758+
}
759+
741760
#[derive(clap::Args)]
742761
#[clap(about = "Generate a JWT for an endpoint")]
743762
struct EndpointGenerateJwtCmdArgs {
@@ -1625,6 +1644,44 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
16251644
println!("Starting existing endpoint {endpoint_id}...");
16261645
endpoint.start(args).await?;
16271646
}
1647+
EndpointCmd::UpdatePageservers(args) => {
1648+
let endpoint_id = &args.endpoint_id;
1649+
let endpoint = cplane
1650+
.endpoints
1651+
.get(endpoint_id.as_str())
1652+
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1653+
let pageservers = match args.pageserver_id {
1654+
Some(pageserver_id) => {
1655+
let pageserver =
1656+
PageServerNode::from_env(env, env.get_pageserver_conf(pageserver_id)?);
1657+
1658+
vec![(
1659+
PageserverProtocol::Libpq,
1660+
pageserver.pg_connection_config.host().clone(),
1661+
pageserver.pg_connection_config.port(),
1662+
)]
1663+
}
1664+
None => {
1665+
let storage_controller = StorageController::from_env(env);
1666+
storage_controller
1667+
.tenant_locate(endpoint.tenant_id)
1668+
.await?
1669+
.shards
1670+
.into_iter()
1671+
.map(|shard| {
1672+
(
1673+
PageserverProtocol::Libpq,
1674+
Host::parse(&shard.listen_pg_addr)
1675+
.expect("Storage controller reported malformed host"),
1676+
shard.listen_pg_port,
1677+
)
1678+
})
1679+
.collect::<Vec<_>>()
1680+
}
1681+
};
1682+
1683+
endpoint.update_pageservers_in_config(pageservers).await?;
1684+
}
16281685
EndpointCmd::Reconfigure(args) => {
16291686
let endpoint_id = &args.endpoint_id;
16301687
let endpoint = cplane
@@ -1678,6 +1735,14 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
16781735
.reconfigure(Some(pageservers), None, safekeepers, None)
16791736
.await?;
16801737
}
1738+
EndpointCmd::RefreshConfiguration(args) => {
1739+
let endpoint_id = &args.endpoint_id;
1740+
let endpoint = cplane
1741+
.endpoints
1742+
.get(endpoint_id.as_str())
1743+
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1744+
endpoint.refresh_configuration().await?;
1745+
}
16811746
EndpointCmd::Stop(args) => {
16821747
let endpoint_id = &args.endpoint_id;
16831748
let endpoint = cplane

control_plane/src/endpoint.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -937,7 +937,8 @@ impl Endpoint {
937937
| ComputeStatus::Configuration
938938
| ComputeStatus::TerminationPendingFast
939939
| ComputeStatus::TerminationPendingImmediate
940-
| ComputeStatus::Terminated => {
940+
| ComputeStatus::Terminated
941+
| ComputeStatus::RefreshConfigurationPending => {
941942
bail!("unexpected compute status: {:?}", state.status)
942943
}
943944
}
@@ -960,6 +961,29 @@ impl Endpoint {
960961
Ok(())
961962
}
962963

964+
// Update the pageservers in the spec file of the endpoint. This is useful to test the spec refresh scenario.
965+
pub async fn update_pageservers_in_config(
966+
&self,
967+
pageservers: Vec<(PageserverProtocol, Host, u16)>,
968+
) -> Result<()> {
969+
let config_path = self.endpoint_path().join("config.json");
970+
let mut config: ComputeConfig = {
971+
let file = std::fs::File::open(&config_path)?;
972+
serde_json::from_reader(file)?
973+
};
974+
975+
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
976+
assert!(!pageserver_connstring.is_empty());
977+
let mut spec = config.spec.unwrap();
978+
spec.pageserver_connstring = Some(pageserver_connstring);
979+
config.spec = Some(spec);
980+
981+
let file = std::fs::File::create(&config_path)?;
982+
serde_json::to_writer_pretty(file, &config)?;
983+
984+
Ok(())
985+
}
986+
963987
// Call the /status HTTP API
964988
pub async fn get_status(&self) -> Result<ComputeStatusResponse> {
965989
let client = reqwest::Client::new();
@@ -1125,6 +1149,33 @@ impl Endpoint {
11251149
Ok(response)
11261150
}
11271151

1152+
pub async fn refresh_configuration(&self) -> Result<()> {
1153+
let client = reqwest::Client::builder()
1154+
.timeout(Duration::from_secs(30))
1155+
.build()
1156+
.unwrap();
1157+
let response = client
1158+
.post(format!(
1159+
"http://{}:{}/refresh_configuration",
1160+
self.internal_http_address.ip(),
1161+
self.internal_http_address.port()
1162+
))
1163+
.send()
1164+
.await?;
1165+
1166+
let status = response.status();
1167+
if !(status.is_client_error() || status.is_server_error()) {
1168+
Ok(())
1169+
} else {
1170+
let url = response.url().to_owned();
1171+
let msg = match response.text().await {
1172+
Ok(err_body) => format!("Error: {err_body}"),
1173+
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
1174+
};
1175+
Err(anyhow::anyhow!(msg))
1176+
}
1177+
}
1178+
11281179
pub fn connstr(&self, user: &str, db_name: &str) -> String {
11291180
format!(
11301181
"postgresql://{}@{}:{}/{}",

0 commit comments

Comments
 (0)