From 2236ba943bc58ec708d52c85647d67091c6d0f1f Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 12 Jun 2025 15:14:55 +0200 Subject: [PATCH 1/4] New server metadata persistenace Until now, we persisted metadata for all servers in a given Ra system in a single DETS file, owned by ra_log_meta gen batch server. This was a source of bottlenecks, so we move each servers metadata to a simple binary file. If the file doesn't exist, we try to fetch metadata from ra_log_meta and immediately persist it, so that we don't need ra_log_meta going forward. --- src/ra_server.erl | 37 ++++--- src/ra_server_meta.erl | 211 +++++++++++++++++++++++++++++++++++++++ test/ra_server_SUITE.erl | 18 ++++ 3 files changed, 250 insertions(+), 16 deletions(-) create mode 100644 src/ra_server_meta.erl diff --git a/src/ra_server.erl b/src/ra_server.erl index 5d820d5b9..c2478da4a 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -99,7 +99,8 @@ queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}), pending_consistent_queries := [consistent_query_ref()], - commit_latency => option(non_neg_integer()) + commit_latency => option(non_neg_integer()), + meta_fd => option(file:fd()) }. -type state() :: ra_server_state(). @@ -349,9 +350,11 @@ init(#{id := Id, end, MetaName = meta_name(SystemConfig), - CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0), - LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0), - VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined), + DataDir = maps:get(data_dir, SystemConfig), + MetaFile = ra_server_meta:path(DataDir, UId), + + {ok, {VotedFor, CurrentTerm, LastApplied}} = ra_server_meta:fetch(MetaFile, MetaName, UId), + {ok, MetaFd} = file:open(MetaFile, [read, write, raw, binary]), LatestMacVer = ra_machine:version(Machine), InitialMachineVersion = min(LatestMacVer, @@ -420,7 +423,8 @@ init(#{id := Id, aux_state => ra_machine:init_aux(MacMod, Name), query_index => 0, queries_waiting_heartbeats => queue:new(), - pending_consistent_queries => []}. + pending_consistent_queries => [], + meta_fd => MetaFd}. recover(#{cfg := #cfg{log_id = LogId, machine_version = MacVer, @@ -2238,8 +2242,8 @@ persist_last_applied(#{persisted_last_applied := PLA, % if last applied is less than PL for some reason do nothing State; persist_last_applied(#{last_applied := LastApplied, - cfg := #cfg{uid = UId} = Cfg} = State) -> - ok = ra_log_meta:store(meta_name(Cfg), UId, last_applied, LastApplied), + meta_fd := MetaFd} = State) -> + ok = ra_server_meta:update_last_applied(MetaFd, LastApplied), State#{persisted_last_applied => LastApplied}. @@ -2355,13 +2359,17 @@ handle_node_status(RaftState, Type, Node, Status, _Info, -spec terminate(ra_server_state(), Reason :: {shutdown, delete} | term()) -> ok. terminate(#{log := Log, + meta_fd := MetaFd, cfg := #cfg{log_id = LogId}} = _State, {shutdown, delete}) -> ?NOTICE("~ts: terminating with reason 'delete'", [LogId]), + _ = file:close(MetaFd), catch ra_log:delete_everything(Log), ok; -terminate(#{cfg := #cfg{log_id = LogId}} = State, Reason) -> +terminate(#{cfg := #cfg{log_id = LogId}, meta_fd := MetaFd} = State, Reason) -> ?DEBUG("~ts: terminating with reason '~w'", [LogId, Reason]), #{log := Log} = persist_last_applied(State), + _ = file:sync(MetaFd), + _ = file:close(MetaFd), catch ra_log:close(Log), ok. @@ -2563,18 +2571,17 @@ peer(PeerId, #{cluster := Nodes}) -> put_peer(PeerId, Peer, #{cluster := Peers} = State) -> State#{cluster => Peers#{PeerId => Peer}}. -update_term_and_voted_for(Term, VotedFor, #{cfg := #cfg{uid = UId} = Cfg, - current_term := CurTerm} = State) -> +update_term_and_voted_for(Term, VotedFor, #{cfg := Cfg, + current_term := CurTerm, + meta_fd := MetaFd} = State) -> CurVotedFor = maps:get(voted_for, State, undefined), case Term =:= CurTerm andalso VotedFor =:= CurVotedFor of true -> %% no update needed State; false -> - MetaName = meta_name(Cfg), - %% as this is a rare event it is ok to go sync here - ok = ra_log_meta:store(MetaName, UId, current_term, Term), - ok = ra_log_meta:store_sync(MetaName, UId, voted_for, VotedFor), + LastApplied = maps:get(last_applied, State, 0), + ok = ra_server_meta:store_sync(MetaFd, VotedFor, Term, LastApplied), incr_counter(Cfg, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, 1), put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, Term), %% this is probably not necessary @@ -3413,8 +3420,6 @@ put_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> put_counter(#cfg{counter = undefined}, _Ix, _N) -> ok. -meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) -> - Name; meta_name(#{names := #{log_meta := Name}}) -> Name. diff --git a/src/ra_server_meta.erl b/src/ra_server_meta.erl new file mode 100644 index 000000000..ec2cbc451 --- /dev/null +++ b/src/ra_server_meta.erl @@ -0,0 +1,211 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +-module(ra_server_meta). + +-include_lib("stdlib/include/assert.hrl"). + +-export([ + path/2, + fetch/3, + fetch_from_file/1, + store_sync/4, + update_last_applied/2 + ]). + +%% This module implements persistance for server metadata +%% Before Ra 3.0, metadata was stored in a DETS file, shared +%% by all Ra servers in a Ra system. +%% Now, we store each server's metadata in a separate file, +%% in the server's data directory. +%% The structure of the metadata file is as follows: +%% - 4 bytes magic header (RAM1) +%% - 1004 bytes VotedFor field, which is a binary +%% - 1 byte for the size of the first atom (server name) +%% - first atom (server name) as a binary +%% - 1 byte for the size of the second atom (node name) +%% - second atom (node name) as a binary +%% - padding (zeroed) +%% - 8 bytes CurrentTerm (unsigned 64-bit integer) +%% - 8 bytes LastApplied (unsigned 64-bit integer) +%% for a total of 1024 bytes +%% +%% When VotedFor/Term change, the file is updated and fsynced. +%% If only the LastApplied changes, we update but do not fsync, +%% since this would be prohibitively slow. + +-define(FILENAME, "server.meta"). +-define(MAGIC, "RAM1"). +-define(TOTAL_SIZE, 1024). +-define(LAST_APPLIED_POSITION, ?TOTAL_SIZE - 8). +-define(TERM_POSITION, ?TOTAL_SIZE - ?LAST_APPLIED_POSITION - 8). + +path(DataDir, UId) -> + ServerDir = filename:join(DataDir, UId), + filename:join(ServerDir, ?FILENAME). + +fetch(Path, MetaName, UId) -> + case fetch_from_file(Path) of + {ok, Metadata} when is_tuple(Metadata) -> + {ok, Metadata}; + {error, _} -> + %% metadata migration case: + %% fetch from ra_log_meta and store in a file + {VotedFor, CurrentTerm, LastApplied} = fetch_from_ra_log_meta(MetaName, UId), + case store_sync(Path, VotedFor, CurrentTerm, LastApplied) of + ok -> + ra_log_meta:delete(MetaName, UId), + {ok, {VotedFor, CurrentTerm, LastApplied}}; + Err -> + Err + end + end. + +fetch_from_file(Path) -> + case file:read_file(Path) of + {ok, <>} -> + VotedFor = try + parse_voted_for(VotedForBin) + catch + _:_ -> undefined + end, + {ok, {VotedFor, CurrentTerm, LastApplied}}; + {ok, _} -> + {error, invalid_format}; + Err -> + Err + end. + +fetch_from_ra_log_meta(MetaName, UId) -> + VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined), + CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0), + LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0), + {VotedFor, CurrentTerm, LastApplied}. + +store_sync(MetaFile, VotedFor, CurrentTerm, LastApplied) when is_binary(MetaFile) -> + {ok, MetaFd} = file:open(MetaFile, [write, binary, raw]), + store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied), + file:close(MetaFd); +store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied) -> + Data = encode_metadata(VotedFor, CurrentTerm, LastApplied), + ok = file:pwrite(MetaFd, 0, Data), + ok = file:sync(MetaFd). + +update_last_applied(MetaFd, LastApplied) -> + ok = file:pwrite(MetaFd, ?LAST_APPLIED_POSITION, <>). + +encode_metadata(VotedFor, CurrentTerm, LastApplied) -> + VotedForBin = case VotedFor of + undefined -> + <<0, 0>>; + {NameAtom, NodeAtom} -> + NameAtomBin = atom_to_binary(NameAtom, utf8), + NodeAtomBin = atom_to_binary(NodeAtom, utf8), + NameSize = byte_size(NameAtomBin), + NodeSize = byte_size(NodeAtomBin), + <> + end, + + HeaderSize = length(?MAGIC), + VotedForSize = byte_size(VotedForBin), + UsedSize = HeaderSize + VotedForSize, + PaddingSize = 1008 - UsedSize, + Padding = <<0:PaddingSize/unit:8>>, + + <>. + +parse_voted_for(<>) when NameAtomSize > 0 -> + case Rest of + <> + when NodeAtomSize > 0 -> + {binary_to_atom(NameAtom, utf8), binary_to_atom(NodeAtom, utf8)}; + _ -> + undefined + end; +parse_voted_for(_) -> + undefined. + +%%% =================== +%%% Internal unit tests +%%% =================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +v1_format_test() -> + CurrentTerm = rand:uniform(10000), + LastApplied = rand:uniform(100000), + VotedFor = {somename, somenode}, + + % we always encode into a 1024-byte binary + Data = encode_metadata(VotedFor, CurrentTerm, LastApplied), + ?assertEqual(1024, byte_size(Data)), + + % we can reconstruct the VotedFor from the binary + <<"RAM1", VotedForBin/binary>> = Data, + ?assertEqual({somename, somenode}, parse_voted_for(VotedForBin)), + + % we can extract term and last applied from fixed positions + <<_:1008/binary, ParsedTerm:64/unsigned, ParsedLastApplied:64/unsigned>> = Data, + ?assertEqual(CurrentTerm, ParsedTerm), + ?assertEqual(LastApplied, ParsedLastApplied), + + % "empty" metadata + EmptyData = encode_metadata(undefined, 0, 0), + ?assertEqual(1024, byte_size(EmptyData)), + <<"RAM1", VotedForDataUndef/binary>> = EmptyData, + ?assertEqual(undefined, parse_voted_for(VotedForDataUndef)), + <<_:1008/binary, ZeroTerm:64/unsigned, ZeroLastApplied:64/unsigned>> = EmptyData, + ?assertEqual(ZeroTerm, 0), + ?assertEqual(ZeroLastApplied, 0), + + % end-to-end test + TempFile = "test_new_meta", %% TODO - put in the right place + file:write_file(TempFile, Data), + {ok, {E2EVotedFor, E2ECurrentTerm, E2ELastApplied}} = fetch_from_file(TempFile), + file:delete(TempFile), + ?assertEqual(VotedFor, E2EVotedFor), + ?assertEqual(CurrentTerm, E2ECurrentTerm), + ?assertEqual(LastApplied, E2ELastApplied), + + % Test edge cases + + % very long atom names + LongName = list_to_atom([$a || _ <- lists:seq(1, 255)]), + LongNode = list_to_atom([$b || _ <- lists:seq(1, 255)]), + LongVotedFor = {LongName, LongNode}, + DataLong = encode_metadata(LongVotedFor, 999999, 888888), + ?assertEqual(1024, byte_size(DataLong)), + <<"RAM1", VotedForDataLong/binary>> = DataLong, + ?assertEqual(LongVotedFor, parse_voted_for(VotedForDataLong)), + + % single character atoms + ShortVotedFor = {a, b}, + DataShort = encode_metadata(ShortVotedFor, 1, 2), + ?assertEqual(1024, byte_size(DataShort)), + <<"RAM1", VotedForDataShort/binary>> = DataShort, + ?assertEqual(ShortVotedFor, parse_voted_for(VotedForDataShort)), + + % max values are handled + MaxTerm = 18446744073709551615, % 2^64 - 1 + MaxApplied = 18446744073709551615, + DataMax = encode_metadata(VotedFor, MaxTerm, MaxApplied), + ?assertEqual(1024, byte_size(DataMax)), + <<_:1008/binary, ParsedMaxTerm:64/unsigned, ParsedMaxApplied:64/unsigned>> = DataMax, + ?assertEqual(MaxTerm, ParsedMaxTerm), + ?assertEqual(MaxApplied, ParsedMaxApplied), + + % invalid magic header + BadHeaderData = <<"ACME", VotedForBin/binary>>, + TempFileBadHeader = "test_bad_header", %% TODO path + file:write_file(TempFileBadHeader, BadHeaderData), + ?assertEqual({error, invalid_format}, fetch_from_file(TempFileBadHeader)), + file:delete(TempFileBadHeader), + + ok. + +-endif. diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index f811fb384..8a736388c 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -159,6 +159,22 @@ setup_log() -> fun(_Data, _OutOf, _Flag, SS) -> {ok, SS} end), + meck:expect(ra_server_meta, path, fun(_, U) -> U end), + meck:expect(ra_server_meta, fetch, fun(P, _, _) -> + case get(P) of + undefined -> + {ok, {undefined, 0, 0}}; + Metadata -> + {ok, Metadata} + end + end), + meck:expect(ra_server_meta, store_sync, fun (P, V, T, L) -> + put(P, {V, T, L}), ok + end), + meck:expect(ra_server_meta, update_last_applied, fun (P, L) -> + {V, T, _} = get(P), + put(P, {V, T, L}), ok + end), meck:expect(ra_snapshot, abort_accept, fun(SS) -> SS end), meck:expect(ra_snapshot, accepting, fun(_SS) -> undefined end), meck:expect(ra_log, snapshot_state, fun ra_log_memory:snapshot_state/1), @@ -215,6 +231,7 @@ init_test(_Config) -> ok = ra_log_meta:store(ra_log_meta, UId, voted_for, some_server), ok = ra_log_meta:store(ra_log_meta, UId, current_term, CurrentTerm), meck:expect(ra_log, init, fun (_) -> Log0 end), + meck:expect(ra_server_meta, fetch, fun(_, _, _) -> {ok, {some_server, 5, 0}} end), #{current_term := 5, voted_for := some_server} = ra_server_init(InitConf), % snapshot @@ -3175,6 +3192,7 @@ base_state(NumServers, MacMod) -> }, #{cfg => Cfg, leader_id => ?N1, + meta_fd => fake_fd, cluster => Servers, cluster_index_term => {0, 0}, cluster_change_permitted => true, From 8d5851aca0437ac863ccb111e8d6b4a0b2f7936e Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 13 Jun 2025 13:38:10 +0200 Subject: [PATCH 2/4] Refactor is_new test Now that meta_fd is a part of the state, repeated calls to ra_server:init() lead to different initial states, so the test is adjusted not to expect the same result from both calls. --- test/ra_server_SUITE.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 8a736388c..43dcffe1c 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -1998,10 +1998,9 @@ is_new(_Config) -> log_init_args => #{uid => <<>>}, machine => {simple, fun erlang:'+'/2, 0}}, NewState = ra_server:init(Args), + true = ra_server:is_new(NewState), {leader, State, _} = ra_server:handle_leader(usr_cmd(1), NewState), false = ra_server:is_new(State), - NewState = ra_server:init(Args), - true = ra_server:is_new(NewState), ok. command(_Config) -> From 186d9f29cf776225c4f5bdb076d33f2b7a5e8bbb Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 13 Jun 2025 14:19:50 +0200 Subject: [PATCH 3/4] Fix ra_server:follower_install_snapshot_machine_version In this test we end up in update_last_applied with an undefined meta_fd set in base_state, so let's mock it as a no-op. --- test/ra_server_SUITE.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 43dcffe1c..920ffd05c 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -171,7 +171,10 @@ setup_log() -> meck:expect(ra_server_meta, store_sync, fun (P, V, T, L) -> put(P, {V, T, L}), ok end), - meck:expect(ra_server_meta, update_last_applied, fun (P, L) -> + meck:expect(ra_server_meta, update_last_applied, fun + (undefined, _) -> + ok; + (P, L) -> {V, T, _} = get(P), put(P, {V, T, L}), ok end), @@ -3191,7 +3194,7 @@ base_state(NumServers, MacMod) -> }, #{cfg => Cfg, leader_id => ?N1, - meta_fd => fake_fd, + meta_fd => undefined, cluster => Servers, cluster_index_term => {0, 0}, cluster_change_permitted => true, From 9b3ac6099029d04c964073098ff62b1c882867da Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Wed, 18 Jun 2025 14:31:28 +0200 Subject: [PATCH 4/4] Handle UTF-8 in atoms While an atom cannot be longer than 255 characters, its binary representation can be longer, if UTF-8 characters are present. --- src/ra_server_meta.erl | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/ra_server_meta.erl b/src/ra_server_meta.erl index ec2cbc451..62df171e6 100644 --- a/src/ra_server_meta.erl +++ b/src/ra_server_meta.erl @@ -23,9 +23,9 @@ %% The structure of the metadata file is as follows: %% - 4 bytes magic header (RAM1) %% - 1004 bytes VotedFor field, which is a binary -%% - 1 byte for the size of the first atom (server name) +%% - 2 bytes for the size of the first atom (server name) %% - first atom (server name) as a binary -%% - 1 byte for the size of the second atom (node name) +%% - 2 bytes for the size of the second atom (node name) %% - second atom (node name) as a binary %% - padding (zeroed) %% - 8 bytes CurrentTerm (unsigned 64-bit integer) @@ -39,8 +39,8 @@ -define(FILENAME, "server.meta"). -define(MAGIC, "RAM1"). -define(TOTAL_SIZE, 1024). +-define(VOTED_FOR_MAX_SIZE, 1004). %% TOTAL_SIZE minus everything else -define(LAST_APPLIED_POSITION, ?TOTAL_SIZE - 8). --define(TERM_POSITION, ?TOTAL_SIZE - ?LAST_APPLIED_POSITION - 8). path(DataDir, UId) -> ServerDir = filename:join(DataDir, UId), @@ -99,28 +99,30 @@ update_last_applied(MetaFd, LastApplied) -> encode_metadata(VotedFor, CurrentTerm, LastApplied) -> VotedForBin = case VotedFor of undefined -> - <<0, 0>>; + <<>>; {NameAtom, NodeAtom} -> NameAtomBin = atom_to_binary(NameAtom, utf8), NodeAtomBin = atom_to_binary(NodeAtom, utf8), NameSize = byte_size(NameAtomBin), NodeSize = byte_size(NodeAtomBin), - <> + <> end, - HeaderSize = length(?MAGIC), VotedForSize = byte_size(VotedForBin), - UsedSize = HeaderSize + VotedForSize, - PaddingSize = 1008 - UsedSize, - Padding = <<0:PaddingSize/unit:8>>, - - <>. - -parse_voted_for(<>) when NameAtomSize > 0 -> + PaddingSize = ?VOTED_FOR_MAX_SIZE - VotedForSize, + case PaddingSize >= 0 of + true -> + Padding = <<0:PaddingSize/unit:8>>, + <>; + false -> + vote_for_binary_too_long + end. + +parse_voted_for(<>) when NameAtomSize > 0 -> case Rest of - <> + <> when NodeAtomSize > 0 -> {binary_to_atom(NameAtom, utf8), binary_to_atom(NodeAtom, utf8)}; _ -> @@ -174,9 +176,9 @@ v1_format_test() -> % Test edge cases - % very long atom names - LongName = list_to_atom([$a || _ <- lists:seq(1, 255)]), - LongNode = list_to_atom([$b || _ <- lists:seq(1, 255)]), + % very long atom names, including UTF-8 in on of the atoms + LongName = list_to_atom([$Σ || _ <- lists:seq(1, 255)]), + LongNode = list_to_atom([$a || _ <- lists:seq(1, 255)]), LongVotedFor = {LongName, LongNode}, DataLong = encode_metadata(LongVotedFor, 999999, 888888), ?assertEqual(1024, byte_size(DataLong)),