Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions compute_tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,14 @@ stateDiagram-v2
Init --> Running : Started Postgres
Running --> TerminationPendingFast : Requested termination
Running --> TerminationPendingImmediate : Requested termination
Running --> ConfigurationPending : Received a /configure request with spec
Running --> RefreshConfigurationPending : Received a /refresh_configuration request, compute node will pull a new spec and reconfigure
RefreshConfigurationPending --> Running : Compute has been re-configured
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
TerminationPendingImmediate --> Terminated : Terminated compute immediately
Running --> TerminationPending : Requested termination
TerminationPending --> Terminated : Terminated compute
Failed --> RefreshConfigurationPending : Received a /refresh_configuration request
Failed --> [*] : Compute exited
Terminated --> [*] : Compute exited
```
Expand Down
3 changes: 3 additions & 0 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ fn main() -> Result<()> {
pg_isready_bin: get_pg_isready_bin(&cli.pgbin),
instance_id: std::env::var("INSTANCE_ID").ok(),
lakebase_mode: cli.lakebase_mode,
build_tag: BUILD_TAG.to_string(),
control_plane_uri: cli.control_plane_uri,
config_path_test_only: cli.config,
},
config,
)?;
Expand Down
12 changes: 9 additions & 3 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use std::collections::{HashMap, HashSet};
use std::ffi::OsString;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
Expand Down Expand Up @@ -120,6 +121,10 @@ pub struct ComputeNodeParams {
// Path to the `pg_isready` binary.
pub pg_isready_bin: String,
pub lakebase_mode: bool,

pub build_tag: String,
pub control_plane_uri: Option<String>,
pub config_path_test_only: Option<OsString>,
}

type TaskHandle = Mutex<Option<JoinHandle<()>>>;
Expand Down Expand Up @@ -1796,12 +1801,12 @@ impl ComputeNode {
let states_allowing_configuration_refresh = [
ComputeStatus::Running,
ComputeStatus::Failed,
// ComputeStatus::RefreshConfigurationPending,
ComputeStatus::RefreshConfigurationPending,
];

let state = self.state.lock().expect("state lock poisoned");
let mut state = self.state.lock().expect("state lock poisoned");
if states_allowing_configuration_refresh.contains(&state.status) {
// state.status = ComputeStatus::RefreshConfigurationPending;
state.status = ComputeStatus::RefreshConfigurationPending;
self.state_changed.notify_all();
Ok(())
} else if state.status == ComputeStatus::Init {
Expand Down Expand Up @@ -1988,6 +1993,7 @@ impl ComputeNode {
// wait
ComputeStatus::Init
| ComputeStatus::Configuration
| ComputeStatus::RefreshConfigurationPending
| ComputeStatus::Empty => {
state = self.state_changed.wait(state).unwrap();
}
Expand Down
104 changes: 95 additions & 9 deletions compute_tools/src/configurator.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
use std::sync::Arc;
use std::fs::File;
use std::thread;
use std::{path::Path, sync::Arc};

use compute_api::responses::ComputeStatus;
use compute_api::responses::{ComputeConfig, ComputeStatus};
use tracing::{error, info, instrument};

use crate::compute::ComputeNode;
use crate::compute::{ComputeNode, ParsedSpec};
use crate::spec::get_config_from_control_plane;

#[instrument(skip_all)]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let mut state = compute.state.lock().unwrap();

// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
if compute.params.lakebase_mode {
while state.status != ComputeStatus::ConfigurationPending
&& state.status != ComputeStatus::RefreshConfigurationPending
&& state.status != ComputeStatus::Failed
{
info!("configurator: compute status: {:?}, sleeping", state.status);
state = compute.state_changed.wait(state).unwrap();
}
} else {
// We have to re-check the status after re-acquiring the lock because it could be that
// the status has changed while we were waiting for the lock, and we might not need to
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
// we are waiting for a condition variable that will never be signaled.
if state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
}
}

// Re-check the status after waking up
Expand All @@ -38,6 +50,80 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
// std::thread::sleep(std::time::Duration::from_millis(10000));

compute.set_status(new_status);
} else if state.status == ComputeStatus::RefreshConfigurationPending {
info!(
"compute node suspects its configuration is out of date, now refreshing configuration"
);
// Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC.
// This is the only thread that can move compute_ctl out of the `RefreshConfigurationPending` state, so it
// is safe to drop the lock like this.
drop(state);

let spec = if let Some(config_path) = &compute.params.config_path_test_only {
// This path is only to make testing easier. In production we always get the spec from the HCC.
info!(
"reloading config.json from path: {}",
config_path.to_string_lossy()
);
let path = Path::new(config_path);
if let Ok(file) = File::open(path) {
match serde_json::from_reader::<File, ComputeConfig>(file) {
Ok(config) => config.spec,
Err(e) => {
error!("could not parse spec file: {}", e);
None
}
}
} else {
error!(
"could not open config file at path: {}",
config_path.to_string_lossy()
);
None
}
} else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
match get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) {
Ok(config) => config.spec,
Err(e) => {
error!("could not get config from control plane: {}", e);
None
}
}
} else {
None
};

if let Some(spec) = spec {
if let Ok(pspec) = ParsedSpec::try_from(spec) {
{
let mut state = compute.state.lock().unwrap();
// Defensive programming to make sure this thread is indeed the only one that can move the compute
// node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant
// into the type system.
assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending);
// state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
// the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
// "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
// but it's not worth forking the codebase too much for this minor point alone right now.
state.pspec = Some(pspec);
}
match compute.reconfigure() {
Ok(_) => {
info!("Refresh configuration: compute node configured");
compute.set_status(ComputeStatus::Running);
}
Err(e) => {
error!(
"Refresh configuration: could not configure compute node: {}",
e
);
// Leave the compute node in the `RefreshConfigurationPending` state if the configuration
// was not successful. It should be okay to treat this situation the same as if the loop
// hasn't executed yet as long as the detection side keeps notifying.
}
}
}
}
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");
break;
Expand Down
20 changes: 7 additions & 13 deletions compute_tools/src/http/routes/refresh_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,22 @@ use axum::{
response::{IntoResponse, Response},
};
use http::StatusCode;
use tracing::debug;

use crate::compute::ComputeNode;
// use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS;
use crate::http::JsonResponse;

// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec
// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait
// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause
// configuration to be reloaded in a best effort manner. Invocation of this method does not
// guarantee that a reconfiguration will occur. The caller should consider keep sending this
// request while it believes that the compute configuration is out of date.
/// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec
/// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait
/// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause
/// configuration to be reloaded in a best effort manner. Invocation of this method does not
/// guarantee that a reconfiguration will occur. The caller should consider keep sending this
/// request while it believes that the compute configuration is out of date.
pub(in crate::http) async fn refresh_configuration(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
debug!("serving /refresh_configuration POST request");
// POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc();
match compute.signal_refresh_configuration().await {
Ok(_) => StatusCode::OK.into_response(),
Err(e) => {
tracing::error!("error handling /refresh_configuration request: {}", e);
JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e)
}
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
}
}
4 changes: 2 additions & 2 deletions compute_tools/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, lfc, metrics, metrics_json, promote, status, terminate,
grants, hadron_liveness_probe, insights, lfc, metrics, metrics_json, promote,
refresh_configuration, status, terminate,
},
};
use crate::compute::ComputeNode;
use crate::http::routes::{hadron_liveness_probe, refresh_configuration};

/// `compute_ctl` has two servers: internal and external. The internal server
/// binds to the loopback interface and handles communication from clients on
Expand Down
65 changes: 65 additions & 0 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,9 @@ enum EndpointCmd {
Create(EndpointCreateCmdArgs),
Start(EndpointStartCmdArgs),
Reconfigure(EndpointReconfigureCmdArgs),
RefreshConfiguration(EndpointRefreshConfigurationArgs),
Stop(EndpointStopCmdArgs),
UpdatePageservers(EndpointUpdatePageserversCmdArgs),
GenerateJwt(EndpointGenerateJwtCmdArgs),
}

Expand Down Expand Up @@ -721,6 +723,13 @@ struct EndpointReconfigureCmdArgs {
safekeepers: Option<String>,
}

#[derive(clap::Args)]
#[clap(about = "Refresh the endpoint's configuration by forcing it reload it's spec")]
struct EndpointRefreshConfigurationArgs {
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,
}

#[derive(clap::Args)]
#[clap(about = "Stop an endpoint")]
struct EndpointStopCmdArgs {
Expand All @@ -738,6 +747,16 @@ struct EndpointStopCmdArgs {
mode: EndpointTerminateMode,
}

#[derive(clap::Args)]
#[clap(about = "Update the pageservers in the spec file of the compute endpoint")]
struct EndpointUpdatePageserversCmdArgs {
#[clap(help = "Postgres endpoint id")]
endpoint_id: String,

#[clap(short = 'p', long, help = "Specified pageserver id")]
pageserver_id: Option<NodeId>,
}

#[derive(clap::Args)]
#[clap(about = "Generate a JWT for an endpoint")]
struct EndpointGenerateJwtCmdArgs {
Expand Down Expand Up @@ -1625,6 +1644,44 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(args).await?;
}
EndpointCmd::UpdatePageservers(args) => {
let endpoint_id = &args.endpoint_id;
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageservers = match args.pageserver_id {
Some(pageserver_id) => {
let pageserver =
PageServerNode::from_env(env, env.get_pageserver_conf(pageserver_id)?);

vec![(
PageserverProtocol::Libpq,
pageserver.pg_connection_config.host().clone(),
pageserver.pg_connection_config.port(),
)]
}
None => {
let storage_controller = StorageController::from_env(env);
storage_controller
.tenant_locate(endpoint.tenant_id)
.await?
.shards
.into_iter()
.map(|shard| {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported malformed host"),
shard.listen_pg_port,
)
})
.collect::<Vec<_>>()
}
};

endpoint.update_pageservers_in_config(pageservers).await?;
}
EndpointCmd::Reconfigure(args) => {
let endpoint_id = &args.endpoint_id;
let endpoint = cplane
Expand Down Expand Up @@ -1678,6 +1735,14 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.reconfigure(Some(pageservers), None, safekeepers, None)
.await?;
}
EndpointCmd::RefreshConfiguration(args) => {
let endpoint_id = &args.endpoint_id;
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.refresh_configuration().await?;
}
EndpointCmd::Stop(args) => {
let endpoint_id = &args.endpoint_id;
let endpoint = cplane
Expand Down
Loading
Loading