diff --git a/src/ra_server.erl b/src/ra_server.erl index 5d820d5b..c2478da4 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -99,7 +99,8 @@ queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}), pending_consistent_queries := [consistent_query_ref()], - commit_latency => option(non_neg_integer()) + commit_latency => option(non_neg_integer()), + meta_fd => option(file:fd()) }. -type state() :: ra_server_state(). @@ -349,9 +350,11 @@ init(#{id := Id, end, MetaName = meta_name(SystemConfig), - CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0), - LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0), - VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined), + DataDir = maps:get(data_dir, SystemConfig), + MetaFile = ra_server_meta:path(DataDir, UId), + + {ok, {VotedFor, CurrentTerm, LastApplied}} = ra_server_meta:fetch(MetaFile, MetaName, UId), + {ok, MetaFd} = file:open(MetaFile, [read, write, raw, binary]), LatestMacVer = ra_machine:version(Machine), InitialMachineVersion = min(LatestMacVer, @@ -420,7 +423,8 @@ init(#{id := Id, aux_state => ra_machine:init_aux(MacMod, Name), query_index => 0, queries_waiting_heartbeats => queue:new(), - pending_consistent_queries => []}. + pending_consistent_queries => [], + meta_fd => MetaFd}. recover(#{cfg := #cfg{log_id = LogId, machine_version = MacVer, @@ -2238,8 +2242,8 @@ persist_last_applied(#{persisted_last_applied := PLA, % if last applied is less than PL for some reason do nothing State; persist_last_applied(#{last_applied := LastApplied, - cfg := #cfg{uid = UId} = Cfg} = State) -> - ok = ra_log_meta:store(meta_name(Cfg), UId, last_applied, LastApplied), + meta_fd := MetaFd} = State) -> + ok = ra_server_meta:update_last_applied(MetaFd, LastApplied), State#{persisted_last_applied => LastApplied}. @@ -2355,13 +2359,17 @@ handle_node_status(RaftState, Type, Node, Status, _Info, -spec terminate(ra_server_state(), Reason :: {shutdown, delete} | term()) -> ok. terminate(#{log := Log, + meta_fd := MetaFd, cfg := #cfg{log_id = LogId}} = _State, {shutdown, delete}) -> ?NOTICE("~ts: terminating with reason 'delete'", [LogId]), + _ = file:close(MetaFd), catch ra_log:delete_everything(Log), ok; -terminate(#{cfg := #cfg{log_id = LogId}} = State, Reason) -> +terminate(#{cfg := #cfg{log_id = LogId}, meta_fd := MetaFd} = State, Reason) -> ?DEBUG("~ts: terminating with reason '~w'", [LogId, Reason]), #{log := Log} = persist_last_applied(State), + _ = file:sync(MetaFd), + _ = file:close(MetaFd), catch ra_log:close(Log), ok. @@ -2563,18 +2571,17 @@ peer(PeerId, #{cluster := Nodes}) -> put_peer(PeerId, Peer, #{cluster := Peers} = State) -> State#{cluster => Peers#{PeerId => Peer}}. -update_term_and_voted_for(Term, VotedFor, #{cfg := #cfg{uid = UId} = Cfg, - current_term := CurTerm} = State) -> +update_term_and_voted_for(Term, VotedFor, #{cfg := Cfg, + current_term := CurTerm, + meta_fd := MetaFd} = State) -> CurVotedFor = maps:get(voted_for, State, undefined), case Term =:= CurTerm andalso VotedFor =:= CurVotedFor of true -> %% no update needed State; false -> - MetaName = meta_name(Cfg), - %% as this is a rare event it is ok to go sync here - ok = ra_log_meta:store(MetaName, UId, current_term, Term), - ok = ra_log_meta:store_sync(MetaName, UId, voted_for, VotedFor), + LastApplied = maps:get(last_applied, State, 0), + ok = ra_server_meta:store_sync(MetaFd, VotedFor, Term, LastApplied), incr_counter(Cfg, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, 1), put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, Term), %% this is probably not necessary @@ -3413,8 +3420,6 @@ put_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> put_counter(#cfg{counter = undefined}, _Ix, _N) -> ok. -meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) -> - Name; meta_name(#{names := #{log_meta := Name}}) -> Name. diff --git a/src/ra_server_meta.erl b/src/ra_server_meta.erl new file mode 100644 index 00000000..62df171e --- /dev/null +++ b/src/ra_server_meta.erl @@ -0,0 +1,213 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +-module(ra_server_meta). + +-include_lib("stdlib/include/assert.hrl"). + +-export([ + path/2, + fetch/3, + fetch_from_file/1, + store_sync/4, + update_last_applied/2 + ]). + +%% This module implements persistance for server metadata +%% Before Ra 3.0, metadata was stored in a DETS file, shared +%% by all Ra servers in a Ra system. +%% Now, we store each server's metadata in a separate file, +%% in the server's data directory. +%% The structure of the metadata file is as follows: +%% - 4 bytes magic header (RAM1) +%% - 1004 bytes VotedFor field, which is a binary +%% - 2 bytes for the size of the first atom (server name) +%% - first atom (server name) as a binary +%% - 2 bytes for the size of the second atom (node name) +%% - second atom (node name) as a binary +%% - padding (zeroed) +%% - 8 bytes CurrentTerm (unsigned 64-bit integer) +%% - 8 bytes LastApplied (unsigned 64-bit integer) +%% for a total of 1024 bytes +%% +%% When VotedFor/Term change, the file is updated and fsynced. +%% If only the LastApplied changes, we update but do not fsync, +%% since this would be prohibitively slow. + +-define(FILENAME, "server.meta"). +-define(MAGIC, "RAM1"). +-define(TOTAL_SIZE, 1024). +-define(VOTED_FOR_MAX_SIZE, 1004). %% TOTAL_SIZE minus everything else +-define(LAST_APPLIED_POSITION, ?TOTAL_SIZE - 8). + +path(DataDir, UId) -> + ServerDir = filename:join(DataDir, UId), + filename:join(ServerDir, ?FILENAME). + +fetch(Path, MetaName, UId) -> + case fetch_from_file(Path) of + {ok, Metadata} when is_tuple(Metadata) -> + {ok, Metadata}; + {error, _} -> + %% metadata migration case: + %% fetch from ra_log_meta and store in a file + {VotedFor, CurrentTerm, LastApplied} = fetch_from_ra_log_meta(MetaName, UId), + case store_sync(Path, VotedFor, CurrentTerm, LastApplied) of + ok -> + ra_log_meta:delete(MetaName, UId), + {ok, {VotedFor, CurrentTerm, LastApplied}}; + Err -> + Err + end + end. + +fetch_from_file(Path) -> + case file:read_file(Path) of + {ok, <>} -> + VotedFor = try + parse_voted_for(VotedForBin) + catch + _:_ -> undefined + end, + {ok, {VotedFor, CurrentTerm, LastApplied}}; + {ok, _} -> + {error, invalid_format}; + Err -> + Err + end. + +fetch_from_ra_log_meta(MetaName, UId) -> + VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined), + CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0), + LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0), + {VotedFor, CurrentTerm, LastApplied}. + +store_sync(MetaFile, VotedFor, CurrentTerm, LastApplied) when is_binary(MetaFile) -> + {ok, MetaFd} = file:open(MetaFile, [write, binary, raw]), + store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied), + file:close(MetaFd); +store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied) -> + Data = encode_metadata(VotedFor, CurrentTerm, LastApplied), + ok = file:pwrite(MetaFd, 0, Data), + ok = file:sync(MetaFd). + +update_last_applied(MetaFd, LastApplied) -> + ok = file:pwrite(MetaFd, ?LAST_APPLIED_POSITION, <>). + +encode_metadata(VotedFor, CurrentTerm, LastApplied) -> + VotedForBin = case VotedFor of + undefined -> + <<>>; + {NameAtom, NodeAtom} -> + NameAtomBin = atom_to_binary(NameAtom, utf8), + NodeAtomBin = atom_to_binary(NodeAtom, utf8), + NameSize = byte_size(NameAtomBin), + NodeSize = byte_size(NodeAtomBin), + <> + end, + + VotedForSize = byte_size(VotedForBin), + PaddingSize = ?VOTED_FOR_MAX_SIZE - VotedForSize, + case PaddingSize >= 0 of + true -> + Padding = <<0:PaddingSize/unit:8>>, + <>; + false -> + vote_for_binary_too_long + end. + +parse_voted_for(<>) when NameAtomSize > 0 -> + case Rest of + <> + when NodeAtomSize > 0 -> + {binary_to_atom(NameAtom, utf8), binary_to_atom(NodeAtom, utf8)}; + _ -> + undefined + end; +parse_voted_for(_) -> + undefined. + +%%% =================== +%%% Internal unit tests +%%% =================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +v1_format_test() -> + CurrentTerm = rand:uniform(10000), + LastApplied = rand:uniform(100000), + VotedFor = {somename, somenode}, + + % we always encode into a 1024-byte binary + Data = encode_metadata(VotedFor, CurrentTerm, LastApplied), + ?assertEqual(1024, byte_size(Data)), + + % we can reconstruct the VotedFor from the binary + <<"RAM1", VotedForBin/binary>> = Data, + ?assertEqual({somename, somenode}, parse_voted_for(VotedForBin)), + + % we can extract term and last applied from fixed positions + <<_:1008/binary, ParsedTerm:64/unsigned, ParsedLastApplied:64/unsigned>> = Data, + ?assertEqual(CurrentTerm, ParsedTerm), + ?assertEqual(LastApplied, ParsedLastApplied), + + % "empty" metadata + EmptyData = encode_metadata(undefined, 0, 0), + ?assertEqual(1024, byte_size(EmptyData)), + <<"RAM1", VotedForDataUndef/binary>> = EmptyData, + ?assertEqual(undefined, parse_voted_for(VotedForDataUndef)), + <<_:1008/binary, ZeroTerm:64/unsigned, ZeroLastApplied:64/unsigned>> = EmptyData, + ?assertEqual(ZeroTerm, 0), + ?assertEqual(ZeroLastApplied, 0), + + % end-to-end test + TempFile = "test_new_meta", %% TODO - put in the right place + file:write_file(TempFile, Data), + {ok, {E2EVotedFor, E2ECurrentTerm, E2ELastApplied}} = fetch_from_file(TempFile), + file:delete(TempFile), + ?assertEqual(VotedFor, E2EVotedFor), + ?assertEqual(CurrentTerm, E2ECurrentTerm), + ?assertEqual(LastApplied, E2ELastApplied), + + % Test edge cases + + % very long atom names, including UTF-8 in on of the atoms + LongName = list_to_atom([$Σ || _ <- lists:seq(1, 255)]), + LongNode = list_to_atom([$a || _ <- lists:seq(1, 255)]), + LongVotedFor = {LongName, LongNode}, + DataLong = encode_metadata(LongVotedFor, 999999, 888888), + ?assertEqual(1024, byte_size(DataLong)), + <<"RAM1", VotedForDataLong/binary>> = DataLong, + ?assertEqual(LongVotedFor, parse_voted_for(VotedForDataLong)), + + % single character atoms + ShortVotedFor = {a, b}, + DataShort = encode_metadata(ShortVotedFor, 1, 2), + ?assertEqual(1024, byte_size(DataShort)), + <<"RAM1", VotedForDataShort/binary>> = DataShort, + ?assertEqual(ShortVotedFor, parse_voted_for(VotedForDataShort)), + + % max values are handled + MaxTerm = 18446744073709551615, % 2^64 - 1 + MaxApplied = 18446744073709551615, + DataMax = encode_metadata(VotedFor, MaxTerm, MaxApplied), + ?assertEqual(1024, byte_size(DataMax)), + <<_:1008/binary, ParsedMaxTerm:64/unsigned, ParsedMaxApplied:64/unsigned>> = DataMax, + ?assertEqual(MaxTerm, ParsedMaxTerm), + ?assertEqual(MaxApplied, ParsedMaxApplied), + + % invalid magic header + BadHeaderData = <<"ACME", VotedForBin/binary>>, + TempFileBadHeader = "test_bad_header", %% TODO path + file:write_file(TempFileBadHeader, BadHeaderData), + ?assertEqual({error, invalid_format}, fetch_from_file(TempFileBadHeader)), + file:delete(TempFileBadHeader), + + ok. + +-endif. diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index f811fb38..920ffd05 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -159,6 +159,25 @@ setup_log() -> fun(_Data, _OutOf, _Flag, SS) -> {ok, SS} end), + meck:expect(ra_server_meta, path, fun(_, U) -> U end), + meck:expect(ra_server_meta, fetch, fun(P, _, _) -> + case get(P) of + undefined -> + {ok, {undefined, 0, 0}}; + Metadata -> + {ok, Metadata} + end + end), + meck:expect(ra_server_meta, store_sync, fun (P, V, T, L) -> + put(P, {V, T, L}), ok + end), + meck:expect(ra_server_meta, update_last_applied, fun + (undefined, _) -> + ok; + (P, L) -> + {V, T, _} = get(P), + put(P, {V, T, L}), ok + end), meck:expect(ra_snapshot, abort_accept, fun(SS) -> SS end), meck:expect(ra_snapshot, accepting, fun(_SS) -> undefined end), meck:expect(ra_log, snapshot_state, fun ra_log_memory:snapshot_state/1), @@ -215,6 +234,7 @@ init_test(_Config) -> ok = ra_log_meta:store(ra_log_meta, UId, voted_for, some_server), ok = ra_log_meta:store(ra_log_meta, UId, current_term, CurrentTerm), meck:expect(ra_log, init, fun (_) -> Log0 end), + meck:expect(ra_server_meta, fetch, fun(_, _, _) -> {ok, {some_server, 5, 0}} end), #{current_term := 5, voted_for := some_server} = ra_server_init(InitConf), % snapshot @@ -1981,10 +2001,9 @@ is_new(_Config) -> log_init_args => #{uid => <<>>}, machine => {simple, fun erlang:'+'/2, 0}}, NewState = ra_server:init(Args), + true = ra_server:is_new(NewState), {leader, State, _} = ra_server:handle_leader(usr_cmd(1), NewState), false = ra_server:is_new(State), - NewState = ra_server:init(Args), - true = ra_server:is_new(NewState), ok. command(_Config) -> @@ -3175,6 +3194,7 @@ base_state(NumServers, MacMod) -> }, #{cfg => Cfg, leader_id => ?N1, + meta_fd => undefined, cluster => Servers, cluster_index_term => {0, 0}, cluster_change_permitted => true,