Skip to content

Commit 2e93a0c

Browse files
cjen1-msftCopilotachamayou
authored
Assert ledger chunking (#7637)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Amaury Chamayou <amchamay@microsoft.com>
1 parent 0c3ae93 commit 2e93a0c

File tree

11 files changed

+171
-38
lines changed

11 files changed

+171
-38
lines changed

python/src/ccf/ledger.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import struct
55
import os
6-
from enum import Enum
6+
from enum import Enum, Flag, auto
77

88
from typing import NamedTuple, Optional, Tuple, Dict, List
99

@@ -79,6 +79,11 @@ def is_deprecated(self):
7979
)
8080

8181

82+
class TransactionFlags(Flag):
83+
FORCE_CHUNK_AFTER = auto()
84+
FORCE_CHUNK_BEFORE = auto()
85+
86+
8287
def to_uint_64(buffer):
8388
return struct.unpack("@Q", buffer)[0]
8489

tests/e2e_operations.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,7 @@ def test_ledger_chunk_redirect_recent(network, args):
782782
f"Dropping last ledger chunk {chunks[-1]} from late backup main ledger directory {main_ledger_dir}"
783783
)
784784
os.remove(os.path.join(main_ledger_dir, chunks[-1]))
785+
network.skip_verify_chunking = True
785786
with late_backup.client(
786787
interface_name=infra.interfaces.FILE_SERVING_RPC_INTERFACE
787788
) as c:
@@ -1464,6 +1465,7 @@ def run_empty_ledger_dir_check(args):
14641465
# Now write a file in the directory
14651466
with open(os.path.join(tmp_dir, "ledger_1000_1500.committed"), "wb") as f:
14661467
f.write(b"bar")
1468+
network.skip_verify_chunking = True
14671469

14681470
# Start new network, this should fail
14691471
try:

tests/governance.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import infra.path
88
import infra.proc
99
import infra.net
10+
from infra.node import CCFVersion
1011
import infra.e2e_args
1112
import infra.proposal
1213
import suite.test_requirements as reqs
@@ -508,7 +509,7 @@ def test_all_nodes_cert_renewal(network, args, valid_from=None):
508509
self_signed_node_certs_before = {}
509510
for node in network.get_joined_nodes():
510511
# Note: GET /node/self_signed_certificate endpoint was added after 2.0.0-r6
511-
if node.version_after("ccf-2.0.0-rc6"):
512+
if CCFVersion(node.version) > CCFVersion("ccf-2.0.0-rc6"):
512513
self_signed_node_certs_before[node.local_node_id] = (
513514
node.retrieve_self_signed_cert()
514515
)
@@ -524,7 +525,7 @@ def test_all_nodes_cert_renewal(network, args, valid_from=None):
524525

525526
for node in network.get_joined_nodes():
526527
node.set_certificate_validity_period(valid_from, validity_period_days)
527-
if node.version_after("ccf-2.0.0-rc6"):
528+
if CCFVersion(node.version) > CCFVersion("ccf-2.0.0-rc6"):
528529
assert (
529530
self_signed_node_certs_before[node.local_node_id]
530531
!= node.retrieve_self_signed_cert()

tests/infra/consortium.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import infra.proc
1010
import infra.checker
1111
import infra.node
12+
from infra.node import CCFVersion
1213
import infra.crypto
1314
import infra.member
1415
from infra.proposal import ProposalState
@@ -436,7 +437,7 @@ def add_users_and_transition_service_to_open(self, remote_node, users):
436437
proposal["actions"].append({"name": "set_user", "args": {"cert": cert}})
437438

438439
args = {}
439-
if remote_node.version_after("ccf-2.0.0-rc3"):
440+
if CCFVersion(remote_node.version) > CCFVersion("ccf-2.0.0-rc3"):
440441
args = {"args": {"next_service_identity": self.get_service_identity()}}
441442
proposal["actions"].append({"name": "transition_service_to_open", **args})
442443

@@ -664,7 +665,7 @@ def transition_service_to_open(self, remote_node, previous_service_identity=None
664665
is_recovery = False
665666

666667
args = {}
667-
if remote_node.version_after("ccf-2.0.0-rc3"):
668+
if CCFVersion(remote_node.version) > CCFVersion("ccf-2.0.0-rc3"):
668669
args = {
669670
"previous_service_identity": previous_service_identity,
670671
"next_service_identity": self.get_service_identity(),
@@ -955,7 +956,7 @@ def check_for_service(self, remote_node, status, recovery_count=None):
955956
r = c.get("/node/network").body.json()
956957
current_status = r["service_status"]
957958
current_cert = r["service_certificate"]
958-
if remote_node.version_after("ccf-2.0.3"):
959+
if CCFVersion(remote_node.version) > CCFVersion("ccf-2.0.3"):
959960
current_recovery_count = r["recovery_count"]
960961
else:
961962
assert "recovery_count" not in r
@@ -974,7 +975,7 @@ def check_for_service(self, remote_node, status, recovery_count=None):
974975
assert (
975976
current_status == status.value
976977
), f"Service status {current_status} (expected {status.value})"
977-
if remote_node.version_after("ccf-2.0.3"):
978+
if CCFVersion(remote_node.version) > CCFVersion("ccf-2.0.3"):
978979
assert (
979980
recovery_count is None or current_recovery_count == recovery_count
980981
), f"Current recovery count {current_recovery_count} is not expected {recovery_count}"

tests/infra/jwt_issuer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import uuid
1414

1515
from infra.log_capture import flush_info
16+
from infra.node import CCFVersion
1617
from loguru import logger as LOG
1718
from enum import Enum
1819
from cryptography.x509 import load_pem_x509_certificate
@@ -308,7 +309,7 @@ def wait_for_refresh(self, network, args, kid=None):
308309
kid_ = kid or self.default_kid
309310
primary, _ = network.find_nodes()
310311
end_time = time.time() + timeout
311-
if primary.version_after("ccf-5.0.0-rc3"):
312+
if CCFVersion(primary.version) > CCFVersion("ccf-5.0.0-rc3"):
312313
with primary.api_versioned_client(
313314
network.consortium.get_any_active_member().local_id,
314315
api_version=args.gov_api_version,
@@ -343,7 +344,7 @@ def wait_for_refresh(self, network, args, kid=None):
343344
keys = r.body.json()
344345
if kid_ in keys:
345346
kid_vals = keys[kid_]
346-
if primary.version_after("ccf-5.0.0-dev17"):
347+
if CCFVersion(primary.version) > CCFVersion("ccf-5.0.0-dev17"):
347348
assert len(kid_vals) == 1
348349
stored_cert = kid_vals[0]["cert"]
349350
else:

tests/infra/member.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import infra.proposal
77
import infra.crypto
88
import infra.clients
9+
from infra.node import CCFVersion
910
import http
1011
import os
1112
import base64
@@ -152,7 +153,7 @@ def __init__(self):
152153

153154
def _by_node_version(self, remote_node):
154155
min_version = "4.0.0"
155-
if remote_node.version_after(min_version):
156+
if CCFVersion(remote_node.version) > CCFVersion(min_version):
156157
return self._preview_v1
157158
else:
158159
raise ValueError(

tests/infra/network.py

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import infra.proc
1212
import infra.service_load
1313
import infra.node
14+
from infra.node import CCFVersion
1415
import infra.consortium
1516
import infra.e2e_args
1617
import ccf.ledger
@@ -23,6 +24,8 @@
2324
import functools
2425
import re
2526
import hashlib
27+
import json
28+
2629
from datetime import datetime, timedelta, timezone
2730
from infra.consortium import slurp_file
2831
from collections import deque
@@ -233,6 +236,7 @@ def __init__(
233236
service_load=None,
234237
node_data_json_file=None,
235238
next_node_id=0,
239+
skip_verify_chunking=False,
236240
):
237241
# Map of node id to dict of node arg to override value
238242
# for example, to set the election timeout to 2s for node 3:
@@ -249,6 +253,7 @@ def __init__(
249253
self.service_load = service_load
250254
self.recovery_count = 0
251255
self.common_dir = None
256+
self.skip_verify_chunking = skip_verify_chunking
252257
else:
253258
self.consortium = existing_network.consortium
254259
self.users = existing_network.users
@@ -264,6 +269,7 @@ def __init__(
264269
self.service_load.set_network(self)
265270
self.recovery_count = existing_network.recovery_count
266271
self.common_dir = existing_network.common_dir
272+
self.skip_verify_chunking = existing_network.skip_verify_chunking
267273

268274
self.ignoring_shutdown_errors = False
269275
self.ignore_error_patterns = []
@@ -383,7 +389,10 @@ def _setup_node(
383389
current_ledger_dir, committed_ledger_dirs = target_node.get_ledger()
384390

385391
# Note: temporary fix until second snapshot directory is ported to 2.x branch
386-
if not node.version_after("ccf-2.0.3") and read_only_snapshots_dir is not None:
392+
if (
393+
not CCFVersion(node.version) > CCFVersion("ccf-2.0.3")
394+
and read_only_snapshots_dir is not None
395+
):
387396
snapshots_dir = read_only_snapshots_dir
388397

389398
node.prepare_join(
@@ -940,7 +949,7 @@ def recover(
940949
self.consortium.activate(random_node)
941950
expected_status = (
942951
ServiceStatus.RECOVERING
943-
if random_node.version_after("ccf-2.0.0-rc3")
952+
if CCFVersion(random_node.version) > CCFVersion("ccf-2.0.0-rc3")
944953
else ServiceStatus.OPENING
945954
)
946955
self.consortium.check_for_service(
@@ -1043,27 +1052,80 @@ def list_files_in_dirs_with_checksums(dirs):
10431052
if last_ledger_seqno > longest_ledger_seqno:
10441053
assert longest_ledger_files is None or longest_ledger_files.issubset(
10451054
ledger_files
1046-
), f"Ledger files on node {longest_ledger_node.local_node_id} do not match files on node {node.local_node_id}: {longest_ledger_files}, expected subset of {ledger_files}, diff: {ledger_files - longest_ledger_files}"
1055+
), f"Ledger files on node {longest_ledger_node.local_node_id} do not match files on node {node.local_node_id}: {longest_ledger_files}, expected subset of {ledger_files}, diff: (Only on {node.local_node_id}: {ledger_files - longest_ledger_files}, Only on {longest_ledger_node.local_node_id}: {longest_ledger_files - ledger_files})"
10471056
longest_ledger_files = ledger_files
10481057
longest_ledger_node = node
10491058
longest_ledger_seqno = last_ledger_seqno
10501059
else:
10511060
assert ledger_files.issubset(
10521061
longest_ledger_files
1053-
), f"Ledger files on node {node.local_node_id} do not match files on node {longest_ledger_node.local_node_id}: {ledger_files}, expected subset of {longest_ledger_files}, diff: {longest_ledger_files - ledger_files}"
1062+
), f"Ledger files on node {node.local_node_id} do not match files on node {longest_ledger_node.local_node_id}: {ledger_files}, expected subset of {longest_ledger_files}, diff: (Only on {longest_ledger_node.local_node_id}: {longest_ledger_files - ledger_files}, Only on {node.local_node_id}: {ledger_files - longest_ledger_files})"
10541063

10551064
if longest_ledger_files:
10561065
LOG.info(
10571066
f"Verified {len(longest_ledger_files)} ledger files consistency on all {len(self.nodes)} stopped nodes"
10581067
)
10591068

1069+
def check_ledger_files_chunk_flags(self):
1070+
for node in self.nodes:
1071+
if node.remote is None:
1072+
continue
1073+
ledger_paths = node.remote.ledger_paths()
1074+
for path in ledger_paths:
1075+
ledger = ccf.ledger.Ledger([path])
1076+
chunks = list(ledger)
1077+
for cur, nxt in zip([None] + chunks, chunks + [None]):
1078+
if cur is None:
1079+
continue
1080+
1081+
if nxt is None:
1082+
# Assume that the next chunk would emit chunk_before
1083+
flag_force_chunk_before = True
1084+
1085+
else:
1086+
nxt_tx = nxt[0]
1087+
flags = ccf.ledger.TransactionFlags(
1088+
nxt_tx.get_transaction_header().flags
1089+
)
1090+
flag_force_chunk_before = (
1091+
ccf.ledger.TransactionFlags.FORCE_CHUNK_BEFORE in flags
1092+
)
1093+
if flag_force_chunk_before:
1094+
# We should only ever emit force_chunk_before if this is the genesis transaction of a recovering service
1095+
# Otherwise this tx could be rolled back, breaking the consistency of chunking across the network
1096+
tables = nxt_tx.get_public_domain().get_tables()
1097+
assert "public:ccf.gov.service.info" in tables
1098+
service_info = json.loads(
1099+
tables["public:ccf.gov.service.info"][
1100+
b"\x00\x00\x00\x00\x00\x00\x00\x00"
1101+
]
1102+
)
1103+
assert (
1104+
"status" in service_info
1105+
and service_info["status"] == "Recovering"
1106+
), f"Node {node.local_node_id} has a chunk which forces chunking before but does not recover the service: {nxt.filename()}"
1107+
1108+
last_tx = cur[-1]
1109+
flags = ccf.ledger.TransactionFlags(
1110+
last_tx.get_transaction_header().flags
1111+
)
1112+
flag_force_chunk_after = (
1113+
ccf.ledger.TransactionFlags.FORCE_CHUNK_AFTER in flags
1114+
)
1115+
1116+
assert (
1117+
flag_force_chunk_after or flag_force_chunk_before
1118+
), f"Node {node.local_node_id} has chunks which do not force chunking correctly: {cur.filename()} -> {nxt.filename() if nxt is not None else 'NA'}"
1119+
10601120
def stop_all_nodes(
10611121
self,
10621122
skip_verification=False,
10631123
verbose_verification=False,
10641124
accept_ledger_diff=False,
1125+
skip_verify_chunking=None,
10651126
**kwargs,
10661127
):
1128+
skip_verify_chunking = skip_verify_chunking or self.skip_verify_chunking
10671129
if not skip_verification and self.txs is not None:
10681130
LOG.info("Verifying that all committed txs can be read before shutdown")
10691131
log_capture = []
@@ -1102,6 +1164,10 @@ def stop_all_nodes(
11021164
if not accept_ledger_diff:
11031165
self.check_ledger_files_identical(**kwargs)
11041166

1167+
if not skip_verify_chunking:
1168+
LOG.info("Verifying ledger chunk flags before shutdown")
1169+
self.check_ledger_files_chunk_flags()
1170+
11051171
if fatal_error_found:
11061172
if self.ignoring_shutdown_errors:
11071173
LOG.warning("Ignoring shutdown errors")
@@ -1254,7 +1320,7 @@ def retire_node(self, remote_node, node_to_retire, timeout=10):
12541320
)
12551321
if remote_node == node_to_retire:
12561322
remote_node, _ = self.wait_for_new_primary(remote_node)
1257-
if remote_node.version_after("ccf-2.0.4") and not pending:
1323+
if CCFVersion(remote_node.version) > CCFVersion("ccf-2.0.4") and not pending:
12581324
end_time = time.time() + timeout
12591325
r = None
12601326
while time.time() < end_time:
@@ -1996,7 +2062,9 @@ def close_on_error(net, pdb=False):
19962062
pdb.post_mortem()
19972063

19982064
LOG.info("Stopping network")
1999-
net.stop_all_nodes(skip_verification=True, accept_ledger_diff=True)
2065+
net.stop_all_nodes(
2066+
skip_verification=True, accept_ledger_diff=True, skip_verify_chunking=True
2067+
)
20002068

20012069
raise
20022070

@@ -2014,6 +2082,7 @@ def network(
20142082
version=None,
20152083
service_load=None,
20162084
node_data_json_file=None,
2085+
skip_verify_chunking=False,
20172086
**kwargs,
20182087
):
20192088
"""
@@ -2042,11 +2111,15 @@ def network(
20422111
version=version,
20432112
service_load=service_load,
20442113
node_data_json_file=node_data_json_file,
2114+
skip_verify_chunking=skip_verify_chunking,
20452115
**kwargs,
20462116
)
20472117
with close_on_error(net, pdb=pdb):
20482118
yield net
20492119
LOG.info("Stopping network")
2050-
net.stop_all_nodes(skip_verification=True, accept_ledger_diff=True)
2120+
net.stop_all_nodes(
2121+
skip_verification=True,
2122+
accept_ledger_diff=True,
2123+
)
20512124
if init_partitioner:
20522125
net.partitioner.cleanup()

tests/infra/node.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from contextlib import contextmanager, closing
55
from enum import Enum, auto
6+
import functools
67
import infra.crypto
78
import infra.remote
89
from datetime import datetime, timedelta, timezone
@@ -107,6 +108,28 @@ def version_after(version, cmp_version):
107108
) > ccf._versionifier.to_python_version(cmp_version)
108109

109110

111+
@functools.total_ordering
112+
class CCFVersion:
113+
# None is assumed to be the latest development version
114+
# so None > any specific version, and None == None
115+
def __init__(self, version_str):
116+
self.version_str = version_str
117+
if version_str is not None:
118+
self.parsed_version = ccf._versionifier.to_python_version(version_str)
119+
else:
120+
self.parsed_version = None
121+
122+
def __eq__(self, other):
123+
return self.parsed_version == other.parsed_version
124+
125+
def __lt__(self, other):
126+
if self.parsed_version is None:
127+
return False
128+
if other.parsed_version is None:
129+
return True
130+
return self.parsed_version < other.parsed_version
131+
132+
110133
class Node:
111134
def __init__(
112135
self,
@@ -823,7 +846,7 @@ def check_log_for_error_message(self, msg):
823846
return False
824847

825848
def version_after(self, version):
826-
return version_after(self.version, version)
849+
return CCFVersion(self.version) > CCFVersion(version)
827850

828851
def get_receipt(self, view, seqno, timeout=3):
829852
found = False

0 commit comments

Comments
 (0)