Skip to content

Meta without dets #548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@
queries_waiting_heartbeats := queue:queue({non_neg_integer(),
consistent_query_ref()}),
pending_consistent_queries := [consistent_query_ref()],
commit_latency => option(non_neg_integer())
commit_latency => option(non_neg_integer()),
meta_fd => option(file:fd())
}.

-type state() :: ra_server_state().
Expand Down Expand Up @@ -349,9 +350,11 @@ init(#{id := Id,
end,

MetaName = meta_name(SystemConfig),
CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0),
LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0),
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),
DataDir = maps:get(data_dir, SystemConfig),
MetaFile = ra_server_meta:path(DataDir, UId),

{ok, {VotedFor, CurrentTerm, LastApplied}} = ra_server_meta:fetch(MetaFile, MetaName, UId),
{ok, MetaFd} = file:open(MetaFile, [read, write, raw, binary]),

LatestMacVer = ra_machine:version(Machine),
InitialMachineVersion = min(LatestMacVer,
Expand Down Expand Up @@ -420,7 +423,8 @@ init(#{id := Id,
aux_state => ra_machine:init_aux(MacMod, Name),
query_index => 0,
queries_waiting_heartbeats => queue:new(),
pending_consistent_queries => []}.
pending_consistent_queries => [],
meta_fd => MetaFd}.

recover(#{cfg := #cfg{log_id = LogId,
machine_version = MacVer,
Expand Down Expand Up @@ -2238,8 +2242,8 @@ persist_last_applied(#{persisted_last_applied := PLA,
% if last applied is less than PL for some reason do nothing
State;
persist_last_applied(#{last_applied := LastApplied,
cfg := #cfg{uid = UId} = Cfg} = State) ->
ok = ra_log_meta:store(meta_name(Cfg), UId, last_applied, LastApplied),
meta_fd := MetaFd} = State) ->
ok = ra_server_meta:update_last_applied(MetaFd, LastApplied),
State#{persisted_last_applied => LastApplied}.


Expand Down Expand Up @@ -2355,13 +2359,17 @@ handle_node_status(RaftState, Type, Node, Status, _Info,

-spec terminate(ra_server_state(), Reason :: {shutdown, delete} | term()) -> ok.
terminate(#{log := Log,
meta_fd := MetaFd,
cfg := #cfg{log_id = LogId}} = _State, {shutdown, delete}) ->
?NOTICE("~ts: terminating with reason 'delete'", [LogId]),
_ = file:close(MetaFd),
catch ra_log:delete_everything(Log),
ok;
terminate(#{cfg := #cfg{log_id = LogId}} = State, Reason) ->
terminate(#{cfg := #cfg{log_id = LogId}, meta_fd := MetaFd} = State, Reason) ->
?DEBUG("~ts: terminating with reason '~w'", [LogId, Reason]),
#{log := Log} = persist_last_applied(State),
_ = file:sync(MetaFd),
_ = file:close(MetaFd),
catch ra_log:close(Log),
ok.

Expand Down Expand Up @@ -2563,18 +2571,17 @@ peer(PeerId, #{cluster := Nodes}) ->
put_peer(PeerId, Peer, #{cluster := Peers} = State) ->
State#{cluster => Peers#{PeerId => Peer}}.

update_term_and_voted_for(Term, VotedFor, #{cfg := #cfg{uid = UId} = Cfg,
current_term := CurTerm} = State) ->
update_term_and_voted_for(Term, VotedFor, #{cfg := Cfg,
current_term := CurTerm,
meta_fd := MetaFd} = State) ->
CurVotedFor = maps:get(voted_for, State, undefined),
case Term =:= CurTerm andalso VotedFor =:= CurVotedFor of
true ->
%% no update needed
State;
false ->
MetaName = meta_name(Cfg),
%% as this is a rare event it is ok to go sync here
ok = ra_log_meta:store(MetaName, UId, current_term, Term),
ok = ra_log_meta:store_sync(MetaName, UId, voted_for, VotedFor),
LastApplied = maps:get(last_applied, State, 0),
ok = ra_server_meta:store_sync(MetaFd, VotedFor, Term, LastApplied),
incr_counter(Cfg, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, Term),
%% this is probably not necessary
Expand Down Expand Up @@ -3413,8 +3420,6 @@ put_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
put_counter(#cfg{counter = undefined}, _Ix, _N) ->
ok.

meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) ->
Name;
meta_name(#{names := #{log_meta := Name}}) ->
Name.

Expand Down
213 changes: 213 additions & 0 deletions src/ra_server_meta.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
-module(ra_server_meta).

-include_lib("stdlib/include/assert.hrl").

-export([
path/2,
fetch/3,
fetch_from_file/1,
store_sync/4,
update_last_applied/2
]).

%% This module implements persistance for server metadata
%% Before Ra 3.0, metadata was stored in a DETS file, shared
%% by all Ra servers in a Ra system.
%% Now, we store each server's metadata in a separate file,
%% in the server's data directory.
%% The structure of the metadata file is as follows:
%% - 4 bytes magic header (RAM1)
%% - 1004 bytes VotedFor field, which is a binary
%% - 2 bytes for the size of the first atom (server name)
%% - first atom (server name) as a binary
%% - 2 bytes for the size of the second atom (node name)
%% - second atom (node name) as a binary
%% - padding (zeroed)
%% - 8 bytes CurrentTerm (unsigned 64-bit integer)
%% - 8 bytes LastApplied (unsigned 64-bit integer)
%% for a total of 1024 bytes
%%
%% When VotedFor/Term change, the file is updated and fsynced.
%% If only the LastApplied changes, we update but do not fsync,
%% since this would be prohibitively slow.

-define(FILENAME, "server.meta").
-define(MAGIC, "RAM1").
-define(TOTAL_SIZE, 1024).
-define(VOTED_FOR_MAX_SIZE, 1004). %% TOTAL_SIZE minus everything else
-define(LAST_APPLIED_POSITION, ?TOTAL_SIZE - 8).

path(DataDir, UId) ->
ServerDir = filename:join(DataDir, UId),
filename:join(ServerDir, ?FILENAME).

fetch(Path, MetaName, UId) ->
case fetch_from_file(Path) of
{ok, Metadata} when is_tuple(Metadata) ->
{ok, Metadata};
{error, _} ->
%% metadata migration case:
%% fetch from ra_log_meta and store in a file
{VotedFor, CurrentTerm, LastApplied} = fetch_from_ra_log_meta(MetaName, UId),
case store_sync(Path, VotedFor, CurrentTerm, LastApplied) of
ok ->
ra_log_meta:delete(MetaName, UId),
{ok, {VotedFor, CurrentTerm, LastApplied}};
Err ->
Err
end
end.

fetch_from_file(Path) ->
case file:read_file(Path) of
{ok, <<?MAGIC, VotedForBin:1004/binary, CurrentTerm:64/unsigned, LastApplied:64/unsigned>>} ->
VotedFor = try
parse_voted_for(VotedForBin)
catch
_:_ -> undefined
end,
{ok, {VotedFor, CurrentTerm, LastApplied}};
{ok, _} ->
{error, invalid_format};
Err ->
Err
end.

fetch_from_ra_log_meta(MetaName, UId) ->
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),
CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0),
LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0),
{VotedFor, CurrentTerm, LastApplied}.

store_sync(MetaFile, VotedFor, CurrentTerm, LastApplied) when is_binary(MetaFile) ->
{ok, MetaFd} = file:open(MetaFile, [write, binary, raw]),
store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied),
file:close(MetaFd);
store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied) ->
Data = encode_metadata(VotedFor, CurrentTerm, LastApplied),
ok = file:pwrite(MetaFd, 0, Data),
ok = file:sync(MetaFd).

update_last_applied(MetaFd, LastApplied) ->
ok = file:pwrite(MetaFd, ?LAST_APPLIED_POSITION, <<LastApplied:64>>).

encode_metadata(VotedFor, CurrentTerm, LastApplied) ->
VotedForBin = case VotedFor of
undefined ->
<<>>;
{NameAtom, NodeAtom} ->
NameAtomBin = atom_to_binary(NameAtom, utf8),
NodeAtomBin = atom_to_binary(NodeAtom, utf8),
NameSize = byte_size(NameAtomBin),
NodeSize = byte_size(NodeAtomBin),
<<NameSize:16/unsigned, NameAtomBin/binary,
NodeSize:16/unsigned, NodeAtomBin/binary>>
end,

VotedForSize = byte_size(VotedForBin),
PaddingSize = ?VOTED_FOR_MAX_SIZE - VotedForSize,
case PaddingSize >= 0 of
true ->
Padding = <<0:PaddingSize/unit:8>>,
<<?MAGIC, VotedForBin/binary, Padding/binary,
CurrentTerm:64/unsigned, LastApplied:64/unsigned>>;
false ->
vote_for_binary_too_long
end.

parse_voted_for(<<NameAtomSize:16/unsigned, Rest/binary>>) when NameAtomSize > 0 ->
case Rest of
<<NameAtom:NameAtomSize/binary, NodeAtomSize:16/unsigned, NodeAtom:NodeAtomSize/binary, _/binary>>
when NodeAtomSize > 0 ->
{binary_to_atom(NameAtom, utf8), binary_to_atom(NodeAtom, utf8)};
_ ->
undefined
end;
parse_voted_for(_) ->
undefined.

%%% ===================
%%% Internal unit tests
%%% ===================

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

v1_format_test() ->
CurrentTerm = rand:uniform(10000),
LastApplied = rand:uniform(100000),
VotedFor = {somename, somenode},

% we always encode into a 1024-byte binary
Data = encode_metadata(VotedFor, CurrentTerm, LastApplied),
?assertEqual(1024, byte_size(Data)),

% we can reconstruct the VotedFor from the binary
<<"RAM1", VotedForBin/binary>> = Data,
?assertEqual({somename, somenode}, parse_voted_for(VotedForBin)),

% we can extract term and last applied from fixed positions
<<_:1008/binary, ParsedTerm:64/unsigned, ParsedLastApplied:64/unsigned>> = Data,
?assertEqual(CurrentTerm, ParsedTerm),
?assertEqual(LastApplied, ParsedLastApplied),

% "empty" metadata
EmptyData = encode_metadata(undefined, 0, 0),
?assertEqual(1024, byte_size(EmptyData)),
<<"RAM1", VotedForDataUndef/binary>> = EmptyData,
?assertEqual(undefined, parse_voted_for(VotedForDataUndef)),
<<_:1008/binary, ZeroTerm:64/unsigned, ZeroLastApplied:64/unsigned>> = EmptyData,
?assertEqual(ZeroTerm, 0),
?assertEqual(ZeroLastApplied, 0),

% end-to-end test
TempFile = "test_new_meta", %% TODO - put in the right place
file:write_file(TempFile, Data),
{ok, {E2EVotedFor, E2ECurrentTerm, E2ELastApplied}} = fetch_from_file(TempFile),
file:delete(TempFile),
?assertEqual(VotedFor, E2EVotedFor),
?assertEqual(CurrentTerm, E2ECurrentTerm),
?assertEqual(LastApplied, E2ELastApplied),

% Test edge cases

% very long atom names, including UTF-8 in on of the atoms
LongName = list_to_atom([$Σ || _ <- lists:seq(1, 255)]),
LongNode = list_to_atom([$a || _ <- lists:seq(1, 255)]),
LongVotedFor = {LongName, LongNode},
DataLong = encode_metadata(LongVotedFor, 999999, 888888),
?assertEqual(1024, byte_size(DataLong)),
<<"RAM1", VotedForDataLong/binary>> = DataLong,
?assertEqual(LongVotedFor, parse_voted_for(VotedForDataLong)),

% single character atoms
ShortVotedFor = {a, b},
DataShort = encode_metadata(ShortVotedFor, 1, 2),
?assertEqual(1024, byte_size(DataShort)),
<<"RAM1", VotedForDataShort/binary>> = DataShort,
?assertEqual(ShortVotedFor, parse_voted_for(VotedForDataShort)),

% max values are handled
MaxTerm = 18446744073709551615, % 2^64 - 1
MaxApplied = 18446744073709551615,
DataMax = encode_metadata(VotedFor, MaxTerm, MaxApplied),
?assertEqual(1024, byte_size(DataMax)),
<<_:1008/binary, ParsedMaxTerm:64/unsigned, ParsedMaxApplied:64/unsigned>> = DataMax,
?assertEqual(MaxTerm, ParsedMaxTerm),
?assertEqual(MaxApplied, ParsedMaxApplied),

% invalid magic header
BadHeaderData = <<"ACME", VotedForBin/binary>>,
TempFileBadHeader = "test_bad_header", %% TODO path
file:write_file(TempFileBadHeader, BadHeaderData),
?assertEqual({error, invalid_format}, fetch_from_file(TempFileBadHeader)),
file:delete(TempFileBadHeader),

ok.

-endif.
24 changes: 22 additions & 2 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,25 @@ setup_log() ->
fun(_Data, _OutOf, _Flag, SS) ->
{ok, SS}
end),
meck:expect(ra_server_meta, path, fun(_, U) -> U end),
meck:expect(ra_server_meta, fetch, fun(P, _, _) ->
case get(P) of
undefined ->
{ok, {undefined, 0, 0}};
Metadata ->
{ok, Metadata}
end
end),
meck:expect(ra_server_meta, store_sync, fun (P, V, T, L) ->
put(P, {V, T, L}), ok
end),
meck:expect(ra_server_meta, update_last_applied, fun
(undefined, _) ->
ok;
(P, L) ->
{V, T, _} = get(P),
put(P, {V, T, L}), ok
end),
meck:expect(ra_snapshot, abort_accept, fun(SS) -> SS end),
meck:expect(ra_snapshot, accepting, fun(_SS) -> undefined end),
meck:expect(ra_log, snapshot_state, fun ra_log_memory:snapshot_state/1),
Expand Down Expand Up @@ -215,6 +234,7 @@ init_test(_Config) ->
ok = ra_log_meta:store(ra_log_meta, UId, voted_for, some_server),
ok = ra_log_meta:store(ra_log_meta, UId, current_term, CurrentTerm),
meck:expect(ra_log, init, fun (_) -> Log0 end),
meck:expect(ra_server_meta, fetch, fun(_, _, _) -> {ok, {some_server, 5, 0}} end),
#{current_term := 5,
voted_for := some_server} = ra_server_init(InitConf),
% snapshot
Expand Down Expand Up @@ -1981,10 +2001,9 @@ is_new(_Config) ->
log_init_args => #{uid => <<>>},
machine => {simple, fun erlang:'+'/2, 0}},
NewState = ra_server:init(Args),
true = ra_server:is_new(NewState),
{leader, State, _} = ra_server:handle_leader(usr_cmd(1), NewState),
false = ra_server:is_new(State),
NewState = ra_server:init(Args),
true = ra_server:is_new(NewState),
ok.

command(_Config) ->
Expand Down Expand Up @@ -3175,6 +3194,7 @@ base_state(NumServers, MacMod) ->
},
#{cfg => Cfg,
leader_id => ?N1,
meta_fd => undefined,
cluster => Servers,
cluster_index_term => {0, 0},
cluster_change_permitted => true,
Expand Down