Skip to content

Commit bf5746e

Browse files
hughcapetavandras
authored andcommitted
Implement kubernetes.bootstrap_labels (patroni#3257)
Allow to define labels that will be assigned to a postgres instance pod when in 'initializing new cluster', 'running custom bootstrap script', 'starting after custom bootstrap', or 'creating replica' state
1 parent 6ed7554 commit bf5746e

20 files changed

+9203
-30
lines changed

docs/ENVIRONMENT.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ Kubernetes
113113
- **PATRONI\_KUBERNETES\_NAMESPACE**: (optional) Kubernetes namespace where the Patroni pod is running. Default value is `default`.
114114
- **PATRONI\_KUBERNETES\_LABELS**: Labels in format ``{label1: value1, label2: value2}``. These labels will be used to find existing objects (Pods and either Endpoints or ConfigMaps) associated with the current cluster. Also Patroni will set them on every object (Endpoint or ConfigMap) it creates.
115115
- **PATRONI\_KUBERNETES\_SCOPE\_LABEL**: (optional) name of the label containing cluster name. Default value is `cluster-name`.
116+
- **PATRONI\_KUBERNETES\_BOOTSTRAP\_LABELS**: (optional) Labels in format ``{label1: value1, label2: value2}``. These labels will be assigned to a Patroni pod when its state is either ``initializing new cluster``, ``running custom bootstrap script``, ``starting after custom bootstrap`` or ``creating replica``.
116117
- **PATRONI\_KUBERNETES\_ROLE\_LABEL**: (optional) name of the label containing role (`primary`, `replica` or other custom value). Patroni will set this label on the pod it runs in. Default value is ``role``.
117118
- **PATRONI\_KUBERNETES\_LEADER\_LABEL\_VALUE**: (optional) value of the pod label when Postgres role is `primary`. Default value is `primary`.
118119
- **PATRONI\_KUBERNETES\_FOLLOWER\_LABEL\_VALUE**: (optional) value of the pod label when Postgres role is `replica`. Default value is `replica`.

docs/yaml_configuration.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,12 @@ Kubernetes
178178
- **namespace**: (optional) Kubernetes namespace where Patroni pod is running. Default value is `default`.
179179
- **labels**: Labels in format ``{label1: value1, label2: value2}``. These labels will be used to find existing objects (Pods and either Endpoints or ConfigMaps) associated with the current cluster. Also Patroni will set them on every object (Endpoint or ConfigMap) it creates.
180180
- **scope\_label**: (optional) name of the label containing cluster name. Default value is `cluster-name`.
181+
- **bootstrap\_labels**: (optional) Labels in format ``{label1: value1, label2: value2}``. These labels will be assigned to a Patroni pod when its state is either ``initializing new cluster``, ``running custom bootstrap script``, ``starting after custom bootstrap`` or ``creating replica``.
181182
- **role\_label**: (optional) name of the label containing role (`primary`, `replica`, or other custom value). Patroni will set this label on the pod it runs in. Default value is ``role``.
182183
- **leader\_label\_value**: (optional) value of the pod label when Postgres role is ``primary``. Default value is ``primary``.
183184
- **follower\_label\_value**: (optional) value of the pod label when Postgres role is ``replica``. Default value is ``replica``.
184185
- **standby\_leader\_label\_value**: (optional) value of the pod label when Postgres role is ``standby_leader``. Default value is ``primary``.
185-
- **tmp_\role\_label**: (optional) name of the temporary label containing role (`primary` or `replica`). Value of this label will always use the default of corresponding role. Set only when necessary.
186+
- **tmp\_role\_label**: (optional) name of the temporary label containing role (`primary` or `replica`). Value of this label will always use the default of corresponding role. Set only when necessary.
186187
- **use\_endpoints**: (optional) if set to true, Patroni will use Endpoints instead of ConfigMaps to run leader elections and keep cluster state.
187188
- **pod\_ip**: (optional) IP address of the pod Patroni is running in. This value is required when `use_endpoints` is enabled and is used to populate the leader endpoint subsets when the pod's PostgreSQL is promoted.
188189
- **ports**: (optional) if the Service object has the name for the port, the same name must appear in the Endpoint object, otherwise service won't work. For example, if your service is defined as ``{Kind: Service, spec: {ports: [{name: postgresql, port: 5432, targetPort: 5432}]}}``, then you have to set ``kubernetes.ports: [{"name": "postgresql", "port": 5432}]`` and Patroni will use it for updating subsets of the leader Endpoint. This parameter is used only if `kubernetes.use_endpoints` is set.

features/backup_create.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,18 @@
33
import subprocess
44
import sys
55

6+
from time import sleep
7+
68
if __name__ == "__main__":
79
parser = argparse.ArgumentParser()
810
parser.add_argument("--datadir", required=True)
911
parser.add_argument("--dbname", required=True)
1012
parser.add_argument("--walmethod", required=True, choices=("fetch", "stream", "none"))
13+
parser.add_argument("--sleep", required=False, type=int)
1114
args, _ = parser.parse_known_args()
1215

16+
if args.sleep:
17+
sleep(args.sleep)
18+
1319
walmethod = ["-X", args.walmethod] if args.walmethod != "none" else []
1420
sys.exit(subprocess.call(["pg_basebackup", "-D", args.datadir, "-c", "fast", "-d", args.dbname] + walmethod))

features/backup_restore.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@
22
import argparse
33
import shutil
44

5+
from time import sleep
6+
57
if __name__ == "__main__":
68
parser = argparse.ArgumentParser()
79
parser.add_argument("--datadir", required=True)
810
parser.add_argument("--sourcedir", required=True)
911
parser.add_argument("--test-argument", required=True)
12+
parser.add_argument("--sleep", required=False, type=int)
1013
args, _ = parser.parse_known_args()
1114

15+
if args.sleep:
16+
sleep(args.sleep)
17+
1218
shutil.copytree(args.sourcedir, args.datadir)

features/bootstrap_labels.feature

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
Feature: bootstrap labels
2+
Check that user-configurable bootstrap labels are set and removed with state change
3+
4+
Scenario: check label for cluster bootstrap
5+
When I start postgres-0
6+
Then postgres-0 is a leader after 10 seconds
7+
When I start postgres-1 in a cluster batman1 as a long-running clone of postgres-0
8+
Then "members/postgres-1" key in DCS has state=running custom bootstrap script after 20 seconds
9+
And postgres-1 is labeled with "foo"
10+
And postgres-1 is a leader of batman1 after 20 seconds
11+
12+
Scenario: check label for replica bootstrap
13+
When I do a backup of postgres-1
14+
And I start postgres-2 in cluster batman1 using long-running backup_restore
15+
Then "members/postgres-2" key in DCS has state=creating replica after 20 seconds
16+
And postgres-2 is labeled with "foo"
17+
18+
Scenario: check bootstrap label is removed
19+
Given "members/postgres-1" key in DCS has state=running after 2 seconds
20+
And "members/postgres-2" key in DCS has state=running after 20 seconds
21+
Then postgres-1 is not labeled with "foo"
22+
And postgres-2 is not labeled with "foo"

features/environment.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ def start(self, max_wait_limit=5):
5454
self._log = open(os.path.join(self._output_dir, self._name + '.log'), 'a')
5555
self._handle = self._start()
5656

57+
if max_wait_limit < 0:
58+
return
5759
max_wait_limit *= self._context.timeout_multiplier
5860
for _ in range(max_wait_limit):
5961
assert self._has_started(), "Process {0} is not running after being started".format(self._name)
@@ -218,6 +220,8 @@ def _make_patroni_test_config(self, name, custom_config):
218220
'host replication replicator all md5',
219221
'host all all all md5'
220222
]
223+
if isinstance(self._context.dcs_ctl, KubernetesController):
224+
config['kubernetes'] = {'bootstrap_labels': {'foo': 'bar'}}
221225

222226
if self._context.postgres_supports_ssl and self._context.certfile:
223227
config['postgresql']['parameters'].update({
@@ -657,6 +661,10 @@ def delete_pod(self, name):
657661
except Exception:
658662
break
659663

664+
def pod_labels(self, name):
665+
pod = self._api.read_namespaced_pod(name, self._namespace)
666+
return pod.metadata.labels or {}
667+
660668
def query(self, key, scope='batman', group=None):
661669
if key.startswith('members/'):
662670
pod = self._api.read_namespaced_pod(key[8:], self._namespace)
@@ -870,15 +878,16 @@ def create_and_set_output_directory(self, feature_name):
870878
os.makedirs(feature_dir)
871879
self._output_dir = feature_dir
872880

873-
def clone(self, from_name, cluster_name, to_name):
881+
def clone(self, from_name, cluster_name, to_name, long_running=False):
874882
f = self._processes[from_name]
875883
custom_config = {
876884
'scope': cluster_name,
877885
'bootstrap': {
878886
'method': 'pg_basebackup',
879887
'pg_basebackup': {
880888
'command': " ".join(self.BACKUP_SCRIPT
881-
+ ['--walmethod=stream', '--dbname="{0}"'.format(f.backup_source)])
889+
+ ['--walmethod=stream', f'--dbname="{f.backup_source}"',
890+
f'--sleep {5 if long_running else 0}'])
882891
},
883892
'dcs': {
884893
'postgresql': {
@@ -901,12 +910,16 @@ def clone(self, from_name, cluster_name, to_name):
901910
}
902911
}
903912
}
904-
self.start(to_name, custom_config=custom_config)
913+
kwargs = {'custom_config': custom_config}
914+
if long_running:
915+
kwargs['max_wait_limit'] = -1
916+
self.start(to_name, **kwargs)
905917

906-
def backup_restore_config(self, params=None):
918+
def backup_restore_config(self, params=None, long_running=False):
907919
return {
908920
'command': (self.BACKUP_RESTORE_SCRIPT
909-
+ ' --sourcedir=' + os.path.join(self.patroni_path, 'data', 'basebackup')).replace('\\', '/'),
921+
+ ' --sourcedir=' + os.path.join(self.patroni_path, 'data', 'basebackup')
922+
+ f' --sleep {5 if long_running else 0}').replace('\\', '/'),
910923
'test-argument': 'test-value', # test config mapping approach on custom bootstrap/replica creation
911924
**(params or {}),
912925
}
@@ -1126,6 +1139,8 @@ def before_feature(context, feature):
11261139
lib = subprocess.check_output(['pg_config', '--pkglibdir']).decode('utf-8').strip()
11271140
if not os.path.exists(os.path.join(lib, 'citus.so')):
11281141
return feature.skip("Citus extension isn't available")
1142+
elif feature.name == 'bootstrap labels' and context.dcs_ctl.name() != 'kubernetes':
1143+
feature.skip("Tested only on Kubernetes")
11291144
context.pctl.create_and_set_output_directory(feature.name)
11301145

11311146

features/steps/bootstrap_labels.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from behave import step, then
2+
3+
4+
@step('I start {name:name} in a cluster {cluster_name:w} as a long-running clone of {name2:name}')
5+
def start_cluster_clone(context, name, cluster_name, name2):
6+
context.pctl.clone(name2, cluster_name, name, True)
7+
8+
9+
@step('I start {name:name} in cluster {cluster_name:w} using long-running backup_restore')
10+
def start_patroni(context, name, cluster_name):
11+
return context.pctl.start(name, custom_config={
12+
"scope": cluster_name,
13+
"postgresql": {
14+
'create_replica_methods': ['backup_restore'],
15+
"backup_restore": context.pctl.backup_restore_config(long_running=True),
16+
'authentication': {
17+
'superuser': {'password': 'patroni1'},
18+
'replication': {'password': 'rep-pass1'}
19+
}
20+
}
21+
}, max_wait_limit=-1)
22+
23+
24+
@then('{name:name} is labeled with "{label:w}"')
25+
def pod_labeled(context, name, label):
26+
assert label in context.dcs_ctl.pod_labels(name), f'pod {name} is not labeled with {label}'
27+
28+
29+
@then('{name:name} is not labeled with "{label:w}"')
30+
def pod_not_labeled(context, name, label):
31+
assert label not in context.dcs_ctl.pod_labels(name), f'pod {name} is still labeled with {label}'

patroni/api.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -310,12 +310,13 @@ def do_GET(self, write_status_code_only: bool = False) -> None:
310310
"""
311311
path = '/primary' if self.path == '/' else self.path
312312
response = self.get_postgresql_status()
313+
latest_end_lsn = response.pop('latest_end_lsn', 0)
313314

314315
patroni = self.server.patroni
315316
cluster = patroni.dcs.cluster
316317
config = global_config.from_cluster(cluster)
317318

318-
leader_optime = cluster and cluster.status.last_lsn
319+
leader_optime = max(cluster and cluster.status.last_lsn or 0, latest_end_lsn)
319320
replayed_location = response.get('xlog', {}).get('replayed_location', 0)
320321
max_replica_lag = parse_int(self.path_query.get('lag', [sys.maxsize])[0], 'B')
321322
if max_replica_lag is None:
@@ -1298,11 +1299,14 @@ def get_postgresql_status(self, retry: bool = False) -> Dict[str, Any]:
12981299
12991300
:returns: a dict with the status of Postgres/Patroni. The keys are:
13001301
1301-
* ``state``: one of :class:`~patroni.postgresql.misc.PostgresqlState` or ``unknown``;
1302+
* ``state``: Postgres state among ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``,
1303+
``starting``, ``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``,
1304+
``initdb failed``, ``running custom bootstrap script``, ``starting after custom bootstrap``,
1305+
``custom bootstrap failed``, ``creating replica``, or ``unknown``;
13021306
* ``postmaster_start_time``: ``pg_postmaster_start_time()``;
1303-
* ``role``: :class:`~patroni.postgresql.misc.PostgresqlRole.REPLICA` or
1304-
:class:`~patroni.postgresql.misc.PostgresqlRole.PRIMARY` based on ``pg_is_in_recovery()`` output;
1307+
* ``role``: ``replica`` or ``primary`` based on ``pg_is_in_recovery()`` output;
13051308
* ``server_version``: Postgres version without periods, e.g. ``150002`` for Postgres ``15.2``;
1309+
* ``latest_end_lsn``: latest_end_lsn value from ``pg_stat_get_wal_receiver()``, only on replica nodes;
13061310
* ``xlog``: dictionary. Its structure depends on ``role``:
13071311
13081312
* If :class:`~patroni.postgresql.misc.PostgresqlRole.PRIMARY`:
@@ -1343,15 +1347,18 @@ def get_postgresql_status(self, retry: bool = False) -> Dict[str, Any]:
13431347
if postgresql.state not in (PostgresqlState.RUNNING, PostgresqlState.RESTARTING,
13441348
PostgresqlState.STARTING):
13451349
raise RetryFailedError('')
1346-
replication_state = ('(pg_catalog.pg_stat_get_wal_receiver()).status'
1347-
if postgresql.major_version >= 90600 else 'NULL') + ", " +\
1348-
("pg_catalog.current_setting('restore_command')" if postgresql.major_version >= 120000 else "NULL")
1350+
replication_state = ("pg_catalog.pg_{0}_{1}_diff(wr.latest_end_lsn, '0/0')::bigint, wr.status"
1351+
if postgresql.major_version >= 90600 else "NULL, NULL") + ", " +\
1352+
("pg_catalog.current_setting('restore_command')" if postgresql.major_version >= 120000 else "NULL") +\
1353+
", " + ("pg_catalog.pg_wal_lsn_diff(wr.written_lsn, '0/0')::bigint"
1354+
if postgresql.major_version >= 130000 else "NULL")
13491355
stmt = ("SELECT " + postgresql.POSTMASTER_START_TIME + ", " + postgresql.TL_LSN + ","
13501356
" pg_catalog.pg_last_xact_replay_timestamp(), " + replication_state + ","
1351-
" pg_catalog.array_to_json(pg_catalog.array_agg(pg_catalog.row_to_json(ri))) "
1357+
" (SELECT pg_catalog.array_to_json(pg_catalog.array_agg(pg_catalog.row_to_json(ri))) "
13521358
"FROM (SELECT (SELECT rolname FROM pg_catalog.pg_authid WHERE oid = usesysid) AS usename,"
13531359
" application_name, client_addr, w.state, sync_state, sync_priority"
1354-
" FROM pg_catalog.pg_stat_get_wal_senders() w, pg_catalog.pg_stat_get_activity(pid)) AS ri")
1360+
" FROM pg_catalog.pg_stat_get_wal_senders() w, pg_catalog.pg_stat_get_activity(pid)) AS ri)") +\
1361+
(" FROM pg_catalog.pg_stat_get_wal_receiver() AS wr" if postgresql.major_version >= 90600 else "")
13551362

13561363
row = self.query(stmt.format(postgresql.wal_name, postgresql.lsn_name,
13571364
postgresql.wal_flush), retry=retry)[0]
@@ -1361,7 +1368,7 @@ def get_postgresql_status(self, retry: bool = False) -> Dict[str, Any]:
13611368
'role': PostgresqlRole.REPLICA if row[1] == 0 else PostgresqlRole.PRIMARY,
13621369
'server_version': postgresql.server_version,
13631370
'xlog': ({
1364-
'received_location': row[4] or row[3],
1371+
'received_location': row[10] or row[4] or row[3],
13651372
'replayed_location': row[3],
13661373
'replayed_timestamp': row[6],
13671374
'paused': row[5]} if row[1] == 0 else {
@@ -1383,12 +1390,15 @@ def get_postgresql_status(self, retry: bool = False) -> Dict[str, Any]:
13831390
if not cluster or cluster.is_unlocked() or not cluster.leader else cluster.leader.timeline
13841391
result['timeline'] = postgresql.replica_cached_timeline(leader_timeline)
13851392

1386-
replication_state = postgresql.replication_state_from_parameters(row[1] > 0, row[7], row[8])
1393+
if row[7]:
1394+
result['latest_end_lsn'] = row[7]
1395+
1396+
replication_state = postgresql.replication_state_from_parameters(row[1] > 0, row[8], row[9])
13871397
if replication_state:
13881398
result['replication_state'] = replication_state
13891399

1390-
if row[9]:
1391-
result['replication'] = row[9]
1400+
if row[11]:
1401+
result['replication'] = row[11]
13921402

13931403
except (psycopg.Error, RetryFailedError, PostgresConnectionException):
13941404
state = postgresql.state

0 commit comments

Comments
 (0)