Skip to content

Commit 4d8631d

Browse files
committed
Switch from pg_local to pg
pg_local is based on the pg2 module, which got removed from Erlang/OTP years ago. It was replaced by the more efficient pg module, so let's use it directly. We use node-local scopes and single-item groups so that (de)registration is fast, but we can list all connections/channels (by listing all groups in the scope).
1 parent a1f9571 commit 4d8631d

File tree

6 files changed

+97
-24
lines changed

6 files changed

+97
-24
lines changed

deps/rabbit/src/rabbit.erl

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
base_product_version/0,
3232
motd_file/0,
3333
motd/0,
34-
pg_local_scope/1]).
34+
pg_local_scope/1,
35+
pg_scope_amqp091_channel/0,
36+
pg_scope_amqp091_connection/0]).
3537
%% For CLI, testing and mgmt-agent.
3638
-export([set_log_level/1, log_locations/0, config_files/0]).
3739
-export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]).
@@ -40,7 +42,11 @@
4042
%% Boot steps.
4143
-export([update_cluster_tags/0, maybe_insert_default_data/0, boot_delegate/0, recover/0,
4244
pg_local_amqp_session/0,
43-
pg_local_amqp_connection/0, prevent_startup_if_node_was_reset/0]).
45+
pg_local_amqp_connection/0,
46+
pg_local_amqp091_channel/0,
47+
pg_local_amqp091_connection/0,
48+
start_pg_local/0,
49+
prevent_startup_if_node_was_reset/0]).
4450

4551
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
4652

@@ -157,6 +163,12 @@
157163
[{description, "kernel ready"},
158164
{requires, external_infrastructure}]}).
159165

166+
-rabbit_boot_step({pg_local,
167+
[{description, "local-only pg scope"},
168+
{mfa, {rabbit, start_pg_local, []}},
169+
{requires, kernel_ready},
170+
{enables, core_initialized}]}).
171+
160172
-rabbit_boot_step({guid_generator,
161173
[{description, "guid generator"},
162174
{mfa, {rabbit_sup, start_restartable_child,
@@ -292,11 +304,23 @@
292304
{enables, core_initialized}]}).
293305

294306
-rabbit_boot_step({pg_local_amqp_connection,
295-
[{description, "local-only pg scope for AMQP connections"},
307+
[{description, "local-only pg scope for AMQP 1.0 connections"},
296308
{mfa, {rabbit, pg_local_amqp_connection, []}},
297309
{requires, kernel_ready},
298310
{enables, core_initialized}]}).
299311

312+
-rabbit_boot_step({pg_local_amqp091_channel,
313+
[{description, "local-only pg scope for AMQP 0-9-1 channels"},
314+
{mfa, {rabbit, pg_local_amqp091_channel, []}},
315+
{requires, kernel_ready},
316+
{enables, core_initialized}]}).
317+
318+
-rabbit_boot_step({pg_local_amqp091_connection,
319+
[{description, "local-only pg scope for AMQP 0-9-1 connections"},
320+
{mfa, {rabbit, pg_local_amqp091_connection, []}},
321+
{requires, kernel_ready},
322+
{enables, core_initialized}]}).
323+
300324
%%---------------------------------------------------------------------------
301325

302326
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -1146,6 +1170,9 @@ boot_delegate() ->
11461170
recover() ->
11471171
ok = rabbit_vhost:recover().
11481172

1173+
start_pg_local() ->
1174+
rabbit_sup:start_child(pg_local_scope, pg, [node()]).
1175+
11491176
pg_local_amqp_session() ->
11501177
PgScope = pg_local_scope(amqp_session),
11511178
rabbit_sup:start_child(pg_amqp_session, pg, [PgScope]).
@@ -1154,9 +1181,23 @@ pg_local_amqp_connection() ->
11541181
PgScope = pg_local_scope(amqp_connection),
11551182
rabbit_sup:start_child(pg_amqp_connection, pg, [PgScope]).
11561183

1184+
pg_local_amqp091_channel() ->
1185+
PgScope = pg_local_scope(amqp091_channel),
1186+
rabbit_sup:start_child(pg_amqp091_channel, pg, [PgScope]).
1187+
1188+
pg_local_amqp091_connection() ->
1189+
PgScope = pg_local_scope(amqp091_connection),
1190+
rabbit_sup:start_child(pg_amqp091_connection, pg, [PgScope]).
1191+
11571192
pg_local_scope(Prefix) ->
11581193
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).
11591194

1195+
pg_scope_amqp091_channel() ->
1196+
pg_local_scope(amqp091_channel).
1197+
1198+
pg_scope_amqp091_connection() ->
1199+
pg_local_scope(amqp091_connection).
1200+
11601201
-spec update_cluster_tags() -> 'ok'.
11611202

11621203
update_cluster_tags() ->

deps/rabbit/src/rabbit_channel.erl

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,9 @@ send_command(Pid, Msg) ->
304304
%% Delete this function when feature flag rabbitmq_4.2.0 becomes required.
305305
-spec deliver_reply_local(pid(), binary(), mc:state()) -> ok.
306306
deliver_reply_local(Pid, Key, Message) ->
307-
case pg_local:in_group(rabbit_channels, Pid) of
308-
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
309-
false -> ok
307+
case pg:get_local_members(pg_scope(), Pid) of
308+
[] -> ok;
309+
_ -> gen_server2:cast(Pid, {deliver_reply, Key, Message})
310310
end.
311311

312312
-spec list() -> [pid()].
@@ -318,7 +318,9 @@ list() ->
318318
-spec list_local() -> [pid()].
319319

320320
list_local() ->
321-
pg_local:get_members(rabbit_channels).
321+
try pg:which_groups(pg_scope())
322+
catch error:badarg -> []
323+
end.
322324

323325
-spec info_keys() -> rabbit_types:info_keys().
324326

@@ -436,6 +438,10 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
436438

437439
%%---------------------------------------------------------------------------
438440

441+
-spec pg_scope() -> atom().
442+
pg_scope() ->
443+
rabbit:pg_scope_amqp091_channel().
444+
439445
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
440446
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
441447
process_flag(trap_exit, true),
@@ -444,7 +450,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
444450

445451
?LG_PROCESS_TYPE(channel),
446452
?store_proc_name({ConnName, Channel}),
447-
ok = pg_local:join(rabbit_channels, self()),
453+
ok = pg:join(pg_scope(), self(), self()),
448454
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
449455
true -> flow;
450456
false -> noflow
@@ -783,7 +789,7 @@ terminate(_Reason,
783789
queue_states = QueueCtxs}) ->
784790
rabbit_queue_type:close(QueueCtxs),
785791
{_Res, _State1} = notify_queues(State),
786-
pg_local:leave(rabbit_channels, self()),
792+
pg:leave(pg_scope(), self(), self()),
787793
rabbit_event:if_enabled(State, #ch.stats_timer,
788794
fun() -> emit_stats(State) end),
789795
[delete_stats(Tag) || {Tag, _} <- get()],

deps/rabbit/src/rabbit_networking.erl

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -461,13 +461,18 @@ node_client_listeners(Node) ->
461461
end, Xs)
462462
end.
463463

464+
pg_scope_amqp091_connection() ->
465+
rabbit:pg_scope_amqp091_connection().
466+
464467
-spec register_connection(pid()) -> ok.
465468

466-
register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
469+
register_connection(Pid) ->
470+
pg:join(pg_scope_amqp091_connection(), Pid, Pid).
467471

468472
-spec unregister_connection(pid()) -> ok.
469473

470-
unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
474+
unregister_connection(Pid) ->
475+
pg:leave(pg_scope_amqp091_connection(), Pid, Pid).
471476

472477
-spec connections() -> [rabbit_types:connection()].
473478
connections() ->
@@ -476,17 +481,19 @@ connections() ->
476481

477482
-spec local_connections() -> [rabbit_types:connection()].
478483
local_connections() ->
479-
Amqp091Pids = pg_local:get_members(rabbit_connections),
484+
Amqp091Pids = try pg:which_groups(pg_scope_amqp091_connection())
485+
catch error:badarg -> []
486+
end,
480487
Amqp10Pids = rabbit_amqp1_0:list_local(),
481488
Amqp10Pids ++ Amqp091Pids.
482489

483490
-spec register_non_amqp_connection(pid()) -> ok.
484491

485-
register_non_amqp_connection(Pid) -> pg_local:join(rabbit_non_amqp_connections, Pid).
492+
register_non_amqp_connection(Pid) -> pg:join(node(), rabbit_non_amqp_connections, Pid).
486493

487494
-spec unregister_non_amqp_connection(pid()) -> ok.
488495

489-
unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connections, Pid).
496+
unregister_non_amqp_connection(Pid) -> pg:leave(node(), rabbit_non_amqp_connections, Pid).
490497

491498
-spec non_amqp_connections() -> [rabbit_types:connection()].
492499

@@ -496,7 +503,7 @@ non_amqp_connections() ->
496503

497504
-spec local_non_amqp_connections() -> [rabbit_types:connection()].
498505
local_non_amqp_connections() ->
499-
pg_local:get_members(rabbit_non_amqp_connections).
506+
pg:get_local_members(node(), rabbit_non_amqp_connections).
500507

501508
-spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) ->
502509
rabbit_types:infos().

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ local_call(Pid, Request) ->
235235

236236
is_local(Pid) ->
237237
rabbit_amqp_session:is_local(Pid) orelse
238-
pg_local:in_group(rabbit_channels, Pid).
238+
pg:get_local_members(rabbit:pg_scope_amqp091_channel(), Pid) =/= [].
239239

240240
handle_event(QName, {deliver, Msg}, #?STATE{name = QName,
241241
ctag = Ctag,

deps/rabbit/test/proxy_protocol_SUITE.erl

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,25 @@ proxy_protocol_v2_local(Config) ->
110110
ok.
111111

112112
connection_name() ->
113-
?awaitMatch([_], pg_local:get_members(rabbit_connections), 30000),
114-
[Pid] = pg_local:get_members(rabbit_connections),
113+
Scope = rabbit:pg_scope_amqp091_connection(),
114+
GetGroups = fun() ->
115+
try pg:which_groups(Scope)
116+
catch error:badarg -> []
117+
end
118+
end,
119+
?awaitMatch([_], GetGroups(), 30000),
120+
[Pid] = GetGroups(),
115121
{dictionary, Dict} = process_info(Pid, dictionary),
116122
{process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict),
117123
ConnectionName.
118124

119125
wait_for_connection_close(Config) ->
120126
?awaitMatch(
121127
[],
122-
rabbit_ct_broker_helpers:rpc(
123-
Config, 0, pg_local, get_members, [rabbit_connnections]),
128+
begin
129+
Scope = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit, pg_scope_amqp091_connection, []),
130+
try rabbit_ct_broker_helpers:rpc(Config, 0, pg, which_groups, [Scope])
131+
catch error:badarg -> []
132+
end
133+
end,
124134
30000).

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2214,8 +2214,17 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
22142214
[NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []),
22152215
%% Check the channel state contains the state for the quorum queue on
22162216
%% channel 1 and 2
2217-
wait_for_cleanup(Server, NCh1, 0),
2218-
wait_for_cleanup(Server, NCh2, 1),
2217+
%% Note: pg:get_local_members doesn't guarantee order, so we need to identify
2218+
%% which channel has queue state
2219+
{ChWithoutState, ChWithState} = case length(rpc:call(Server,
2220+
rabbit_channel,
2221+
list_queue_states,
2222+
[NCh1])) of
2223+
0 -> {NCh1, NCh2};
2224+
1 -> {NCh2, NCh1}
2225+
end,
2226+
wait_for_cleanup(Server, ChWithoutState, 0),
2227+
wait_for_cleanup(Server, ChWithState, 1),
22192228
%% then delete the queue and wait for the process to terminate
22202229
?assertMatch(#'queue.delete_ok'{},
22212230
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
@@ -2225,8 +2234,8 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
22252234
[?SUPNAME]))
22262235
end, 30000),
22272236
%% Check that all queue states have been cleaned
2228-
wait_for_cleanup(Server, NCh2, 0),
2229-
wait_for_cleanup(Server, NCh1, 0).
2237+
wait_for_cleanup(Server, ChWithState, 0),
2238+
wait_for_cleanup(Server, ChWithoutState, 0).
22302239

22312240
cleanup_queue_state_on_channel_after_subscribe(Config) ->
22322241
%% Declare/delete the queue and publish in one channel, while consuming on a

0 commit comments

Comments
 (0)