Skip to content

Commit 07d411b

Browse files
committed
New server metadata persistenace
Until now, we persisted metadata for all servers in a given Ra system in a single DETS file, owned by ra_log_meta gen batch server. This was a source of bottlenecks, so we move each servers metadata to a simple binary file. If the file doesn't exist, we try to fetch metadata from ra_log_meta and immediately persist it, so that we don't need ra_log_meta going forward.
1 parent 2ee7a7e commit 07d411b

File tree

3 files changed

+250
-14
lines changed

3 files changed

+250
-14
lines changed

src/ra_server.erl

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@
9999
queries_waiting_heartbeats := queue:queue({non_neg_integer(),
100100
consistent_query_ref()}),
101101
pending_consistent_queries := [consistent_query_ref()],
102-
commit_latency => option(non_neg_integer())
102+
commit_latency => option(non_neg_integer()),
103+
meta_fd => option(file:fd())
103104
}.
104105

105106
-type state() :: ra_server_state().
@@ -349,9 +350,11 @@ init(#{id := Id,
349350
end,
350351

351352
MetaName = meta_name(SystemConfig),
352-
CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0),
353-
LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0),
354-
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),
353+
DataDir = maps:get(data_dir, SystemConfig),
354+
MetaFile = ra_server_meta:path(DataDir, UId),
355+
356+
{ok, {VotedFor, CurrentTerm, LastApplied}} = ra_server_meta:fetch(MetaFile, MetaName, UId),
357+
{ok, MetaFd} = file:open(MetaFile, [read, write, raw, binary]),
355358

356359
LatestMacVer = ra_machine:version(Machine),
357360
InitialMachineVersion = min(LatestMacVer,
@@ -420,7 +423,8 @@ init(#{id := Id,
420423
aux_state => ra_machine:init_aux(MacMod, Name),
421424
query_index => 0,
422425
queries_waiting_heartbeats => queue:new(),
423-
pending_consistent_queries => []}.
426+
pending_consistent_queries => [],
427+
meta_fd => MetaFd}.
424428

425429
recover(#{cfg := #cfg{log_id = LogId,
426430
machine_version = MacVer,
@@ -2238,8 +2242,8 @@ persist_last_applied(#{persisted_last_applied := PLA,
22382242
% if last applied is less than PL for some reason do nothing
22392243
State;
22402244
persist_last_applied(#{last_applied := LastApplied,
2241-
cfg := #cfg{uid = UId} = Cfg} = State) ->
2242-
ok = ra_log_meta:store(meta_name(Cfg), UId, last_applied, LastApplied),
2245+
meta_fd := MetaFd} = State) ->
2246+
ok = ra_server_meta:update_last_applied(MetaFd, LastApplied),
22432247
State#{persisted_last_applied => LastApplied}.
22442248

22452249

@@ -2355,13 +2359,17 @@ handle_node_status(RaftState, Type, Node, Status, _Info,
23552359

23562360
-spec terminate(ra_server_state(), Reason :: {shutdown, delete} | term()) -> ok.
23572361
terminate(#{log := Log,
2362+
meta_fd := MetaFd,
23582363
cfg := #cfg{log_id = LogId}} = _State, {shutdown, delete}) ->
23592364
?NOTICE("~ts: terminating with reason 'delete'", [LogId]),
2365+
_ = file:close(MetaFd),
23602366
catch ra_log:delete_everything(Log),
23612367
ok;
2362-
terminate(#{cfg := #cfg{log_id = LogId}} = State, Reason) ->
2368+
terminate(#{cfg := #cfg{log_id = LogId}, meta_fd := MetaFd} = State, Reason) ->
23632369
?DEBUG("~ts: terminating with reason '~w'", [LogId, Reason]),
23642370
#{log := Log} = persist_last_applied(State),
2371+
_ = file:sync(MetaFd),
2372+
_ = file:close(MetaFd),
23652373
catch ra_log:close(Log),
23662374
ok.
23672375

@@ -2563,18 +2571,17 @@ peer(PeerId, #{cluster := Nodes}) ->
25632571
put_peer(PeerId, Peer, #{cluster := Peers} = State) ->
25642572
State#{cluster => Peers#{PeerId => Peer}}.
25652573

2566-
update_term_and_voted_for(Term, VotedFor, #{cfg := #cfg{uid = UId} = Cfg,
2567-
current_term := CurTerm} = State) ->
2574+
update_term_and_voted_for(Term, VotedFor, #{cfg := Cfg,
2575+
current_term := CurTerm,
2576+
meta_fd := MetaFd} = State) ->
25682577
CurVotedFor = maps:get(voted_for, State, undefined),
25692578
case Term =:= CurTerm andalso VotedFor =:= CurVotedFor of
25702579
true ->
25712580
%% no update needed
25722581
State;
25732582
false ->
2574-
MetaName = meta_name(Cfg),
2575-
%% as this is a rare event it is ok to go sync here
2576-
ok = ra_log_meta:store(MetaName, UId, current_term, Term),
2577-
ok = ra_log_meta:store_sync(MetaName, UId, voted_for, VotedFor),
2583+
LastApplied = maps:get(last_applied, State, 0),
2584+
ok = ra_server_meta:store_sync(MetaFd, VotedFor, Term, LastApplied),
25782585
incr_counter(Cfg, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, 1),
25792586
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, Term),
25802587
%% this is probably not necessary

src/ra_server_meta.erl

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
6+
-module(ra_server_meta).
7+
8+
-include_lib("stdlib/include/assert.hrl").
9+
10+
-export([
11+
path/2,
12+
fetch/3,
13+
fetch_from_file/1,
14+
store_sync/4,
15+
update_last_applied/2
16+
]).
17+
18+
%% This module implements persistance for server metadata
19+
%% Before Ra 3.0, metadata was stored in a DETS file, shared
20+
%% by all Ra servers in a Ra system.
21+
%% Now, we store each server's metadata in a separate file,
22+
%% in the server's data directory.
23+
%% The structure of the metadata file is as follows:
24+
%% - 4 bytes magic header (RAM1)
25+
%% - 1004 bytes VotedFor field, which is a binary
26+
%% - 1 byte for the size of the first atom (server name)
27+
%% - first atom (server name) as a binary
28+
%% - 1 byte for the size of the second atom (node name)
29+
%% - second atom (node name) as a binary
30+
%% - padding (zeroed)
31+
%% - 8 bytes CurrentTerm (unsigned 64-bit integer)
32+
%% - 8 bytes LastApplied (unsigned 64-bit integer)
33+
%% for a total of 1024 bytes
34+
%%
35+
%% When VotedFor/Term change, the file is updated and fsynced.
36+
%% If only the LastApplied changes, we update but do not fsync,
37+
%% since this would be prohibitively slow.
38+
39+
-define(FILENAME, "server.meta").
40+
-define(MAGIC, "RAM1").
41+
-define(TOTAL_SIZE, 1024).
42+
-define(LAST_APPLIED_POSITION, ?TOTAL_SIZE - 8).
43+
-define(TERM_POSITION, ?TOTAL_SIZE - ?LAST_APPLIED_POSITION - 8).
44+
45+
path(DataDir, UId) ->
46+
ServerDir = filename:join(DataDir, UId),
47+
filename:join(ServerDir, ?FILENAME).
48+
49+
fetch(Path, MetaName, UId) ->
50+
case fetch_from_file(Path) of
51+
{ok, Metadata} when is_tuple(Metadata) ->
52+
{ok, Metadata};
53+
{error, _} ->
54+
%% metadata migration case:
55+
%% fetch from ra_log_meta and store in a file
56+
{VotedFor, CurrentTerm, LastApplied} = fetch_from_ra_log_meta(MetaName, UId),
57+
case store_sync(Path, VotedFor, CurrentTerm, LastApplied) of
58+
ok ->
59+
ra_log_meta:delete(MetaName, UId),
60+
{ok, {VotedFor, CurrentTerm, LastApplied}};
61+
Err ->
62+
Err
63+
end
64+
end.
65+
66+
fetch_from_file(Path) ->
67+
case file:read_file(Path) of
68+
{ok, <<?MAGIC, VotedForBin:1004/binary, CurrentTerm:64/unsigned, LastApplied:64/unsigned>>} ->
69+
VotedFor = try
70+
parse_voted_for(VotedForBin)
71+
catch
72+
_:_ -> undefined
73+
end,
74+
{ok, {VotedFor, CurrentTerm, LastApplied}};
75+
{ok, _} ->
76+
{error, invalid_format};
77+
Err ->
78+
Err
79+
end.
80+
81+
fetch_from_ra_log_meta(MetaName, UId) ->
82+
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),
83+
CurrentTerm = ra_log_meta:fetch(MetaName, UId, current_term, 0),
84+
LastApplied = ra_log_meta:fetch(MetaName, UId, last_applied, 0),
85+
{VotedFor, CurrentTerm, LastApplied}.
86+
87+
store_sync(MetaFile, VotedFor, CurrentTerm, LastApplied) when is_binary(MetaFile) ->
88+
{ok, MetaFd} = file:open(MetaFile, [write, binary, raw]),
89+
store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied),
90+
file:close(MetaFd);
91+
store_sync(MetaFd, VotedFor, CurrentTerm, LastApplied) ->
92+
Data = encode_metadata(VotedFor, CurrentTerm, LastApplied),
93+
ok = file:pwrite(MetaFd, 0, Data),
94+
ok = file:sync(MetaFd).
95+
96+
update_last_applied(MetaFd, LastApplied) ->
97+
ok = file:pwrite(MetaFd, ?LAST_APPLIED_POSITION, <<LastApplied:64>>).
98+
99+
encode_metadata(VotedFor, CurrentTerm, LastApplied) ->
100+
VotedForBin = case VotedFor of
101+
undefined ->
102+
<<0, 0>>;
103+
{NameAtom, NodeAtom} ->
104+
NameAtomBin = atom_to_binary(NameAtom, utf8),
105+
NodeAtomBin = atom_to_binary(NodeAtom, utf8),
106+
NameSize = byte_size(NameAtomBin),
107+
NodeSize = byte_size(NodeAtomBin),
108+
<<NameSize:8/unsigned, NameAtomBin/binary,
109+
NodeSize:8/unsigned, NodeAtomBin/binary>>
110+
end,
111+
112+
HeaderSize = length(?MAGIC),
113+
VotedForSize = byte_size(VotedForBin),
114+
UsedSize = HeaderSize + VotedForSize,
115+
PaddingSize = 1008 - UsedSize,
116+
Padding = <<0:PaddingSize/unit:8>>,
117+
118+
<<?MAGIC, VotedForBin/binary, Padding/binary,
119+
CurrentTerm:64/unsigned, LastApplied:64/unsigned>>.
120+
121+
parse_voted_for(<<NameAtomSize:8/unsigned, Rest/binary>>) when NameAtomSize > 0 ->
122+
case Rest of
123+
<<NameAtom:NameAtomSize/binary, NodeAtomSize:8/unsigned, NodeAtom:NodeAtomSize/binary, _/binary>>
124+
when NodeAtomSize > 0 ->
125+
{binary_to_atom(NameAtom, utf8), binary_to_atom(NodeAtom, utf8)};
126+
_ ->
127+
undefined
128+
end;
129+
parse_voted_for(_) ->
130+
undefined.
131+
132+
%%% ===================
133+
%%% Internal unit tests
134+
%%% ===================
135+
136+
-ifdef(TEST).
137+
-include_lib("eunit/include/eunit.hrl").
138+
139+
v1_format_test() ->
140+
CurrentTerm = rand:uniform(10000),
141+
LastApplied = rand:uniform(100000),
142+
VotedFor = {somename, somenode},
143+
144+
% we always encode into a 1024-byte binary
145+
Data = encode_metadata(VotedFor, CurrentTerm, LastApplied),
146+
?assertEqual(1024, byte_size(Data)),
147+
148+
% we can reconstruct the VotedFor from the binary
149+
<<"RAMD", VotedForBin/binary>> = Data,
150+
?assertEqual({somename, somenode}, parse_voted_for(VotedForBin)),
151+
152+
% we can extract term and last applied from fixed positions
153+
<<_:1008/binary, ParsedTerm:64/unsigned, ParsedLastApplied:64/unsigned>> = Data,
154+
?assertEqual(CurrentTerm, ParsedTerm),
155+
?assertEqual(LastApplied, ParsedLastApplied),
156+
157+
% "empty" metadata
158+
EmptyData = encode_metadata(undefined, 0, 0),
159+
?assertEqual(1024, byte_size(EmptyData)),
160+
<<"RAMD", VotedForDataUndef/binary>> = EmptyData,
161+
?assertEqual(undefined, parse_voted_for(VotedForDataUndef)),
162+
<<_:1008/binary, ZeroTerm:64/unsigned, ZeroLastApplied:64/unsigned>> = EmptyData,
163+
?assertEqual(ZeroTerm, 0),
164+
?assertEqual(ZeroLastApplied, 0),
165+
166+
% end-to-end test
167+
TempFile = "test_new_meta", %% TODO - put in the right place
168+
file:write_file(TempFile, Data),
169+
{ok, {E2EVotedFor, E2ECurrentTerm, E2ELastApplied}} = fetch_from_file(TempFile),
170+
file:delete(TempFile),
171+
?assertEqual(VotedFor, E2EVotedFor),
172+
?assertEqual(CurrentTerm, E2ECurrentTerm),
173+
?assertEqual(LastApplied, E2ELastApplied),
174+
175+
% Test edge cases
176+
177+
% very long atom names
178+
LongName = list_to_atom([$a || _ <- lists:seq(1, 255)]),
179+
LongNode = list_to_atom([$b || _ <- lists:seq(1, 255)]),
180+
LongVotedFor = {LongName, LongNode},
181+
DataLong = encode_metadata(LongVotedFor, 999999, 888888),
182+
?assertEqual(1024, byte_size(DataLong)),
183+
<<"RAMD", VotedForDataLong/binary>> = DataLong,
184+
?assertEqual(LongVotedFor, parse_voted_for(VotedForDataLong)),
185+
186+
% single character atoms
187+
ShortVotedFor = {a, b},
188+
DataShort = encode_metadata(ShortVotedFor, 1, 2),
189+
?assertEqual(1024, byte_size(DataShort)),
190+
<<"RAMD", VotedForDataShort/binary>> = DataShort,
191+
?assertEqual(ShortVotedFor, parse_voted_for(VotedForDataShort)),
192+
193+
% max values are handled
194+
MaxTerm = 18446744073709551615, % 2^64 - 1
195+
MaxApplied = 18446744073709551615,
196+
DataMax = encode_metadata(VotedFor, MaxTerm, MaxApplied),
197+
?assertEqual(1024, byte_size(DataMax)),
198+
<<_:1008/binary, ParsedMaxTerm:64/unsigned, ParsedMaxApplied:64/unsigned>> = DataMax,
199+
?assertEqual(MaxTerm, ParsedMaxTerm),
200+
?assertEqual(MaxApplied, ParsedMaxApplied),
201+
202+
% invalid magic header
203+
BadHeaderData = <<"ACME", VotedForBin/binary>>,
204+
TempFileBadHeader = "test_bad_header", %% TODO path
205+
file:write_file(TempFileBadHeader, BadHeaderData),
206+
?assertEqual({error, invalid_format}, fetch_from_file(TempFileBadHeader)),
207+
file:delete(TempFileBadHeader),
208+
209+
ok.
210+
211+
-endif.

test/ra_server_SUITE.erl

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,22 @@ setup_log() ->
159159
fun(_Data, _OutOf, _Flag, SS) ->
160160
{ok, SS}
161161
end),
162+
meck:expect(ra_server_meta, path, fun(_, U) -> U end),
163+
meck:expect(ra_server_meta, fetch, fun(P, _, _) ->
164+
case get(P) of
165+
undefined ->
166+
{ok, {undefined, 0, 0}};
167+
Metadata ->
168+
{ok, Metadata}
169+
end
170+
end),
171+
meck:expect(ra_server_meta, store_sync, fun (P, V, T, L) ->
172+
put(P, {V, T, L}), ok
173+
end),
174+
meck:expect(ra_server_meta, update_last_applied, fun (P, L) ->
175+
{V, T, _} = get(P),
176+
put(P, {V, T, L}), ok
177+
end),
162178
meck:expect(ra_snapshot, abort_accept, fun(SS) -> SS end),
163179
meck:expect(ra_snapshot, accepting, fun(_SS) -> undefined end),
164180
meck:expect(ra_log, snapshot_state, fun ra_log_memory:snapshot_state/1),
@@ -215,6 +231,7 @@ init_test(_Config) ->
215231
ok = ra_log_meta:store(ra_log_meta, UId, voted_for, some_server),
216232
ok = ra_log_meta:store(ra_log_meta, UId, current_term, CurrentTerm),
217233
meck:expect(ra_log, init, fun (_) -> Log0 end),
234+
meck:expect(ra_server_meta, fetch, fun(_, _, _) -> {ok, {some_server, 5, 0}} end),
218235
#{current_term := 5,
219236
voted_for := some_server} = ra_server_init(InitConf),
220237
% snapshot
@@ -3175,6 +3192,7 @@ base_state(NumServers, MacMod) ->
31753192
},
31763193
#{cfg => Cfg,
31773194
leader_id => ?N1,
3195+
meta_fd => fake_fd,
31783196
cluster => Servers,
31793197
cluster_index_term => {0, 0},
31803198
cluster_change_permitted => true,

0 commit comments

Comments
 (0)