Skip to content

Commit 7998fc2

Browse files
committed
Switch from pg_local to pg
pg_local was based on the pg2 module, which got removed from Erlang/OTP years ago. It was replaced by the more efficient pg module, so let's use it directly. We use the node() as the local scope, so we maintain isolation between nodes and avoid local groups from being synchronized between nodes.
1 parent a1f9571 commit 7998fc2

File tree

8 files changed

+41
-21
lines changed

8 files changed

+41
-21
lines changed

deps/rabbit/src/rabbit.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
%% Boot steps.
4141
-export([update_cluster_tags/0, maybe_insert_default_data/0, boot_delegate/0, recover/0,
4242
pg_local_amqp_session/0,
43-
pg_local_amqp_connection/0, prevent_startup_if_node_was_reset/0]).
43+
pg_local_amqp_connection/0,
44+
start_pg_local/0,
45+
prevent_startup_if_node_was_reset/0]).
4446

4547
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
4648

@@ -157,6 +159,12 @@
157159
[{description, "kernel ready"},
158160
{requires, external_infrastructure}]}).
159161

162+
-rabbit_boot_step({pg_local,
163+
[{description, "local-only pg scope"},
164+
{mfa, {rabbit, start_pg_local, []}},
165+
{requires, kernel_ready},
166+
{enables, core_initialized}]}).
167+
160168
-rabbit_boot_step({guid_generator,
161169
[{description, "guid generator"},
162170
{mfa, {rabbit_sup, start_restartable_child,
@@ -1146,6 +1154,9 @@ boot_delegate() ->
11461154
recover() ->
11471155
ok = rabbit_vhost:recover().
11481156

1157+
start_pg_local() ->
1158+
rabbit_sup:start_child(pg_local_scope, pg, [node()]).
1159+
11491160
pg_local_amqp_session() ->
11501161
PgScope = pg_local_scope(amqp_session),
11511162
rabbit_sup:start_child(pg_amqp_session, pg, [PgScope]).

deps/rabbit/src/rabbit_channel.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ send_command(Pid, Msg) ->
304304
%% Delete this function when feature flag rabbitmq_4.2.0 becomes required.
305305
-spec deliver_reply_local(pid(), binary(), mc:state()) -> ok.
306306
deliver_reply_local(Pid, Key, Message) ->
307-
case pg_local:in_group(rabbit_channels, Pid) of
307+
case lists:member(Pid, pg:get_local_members(node(), rabbit_channels)) of
308308
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
309309
false -> ok
310310
end.
@@ -318,7 +318,7 @@ list() ->
318318
-spec list_local() -> [pid()].
319319

320320
list_local() ->
321-
pg_local:get_members(rabbit_channels).
321+
pg:get_local_members(node(), rabbit_channels).
322322

323323
-spec info_keys() -> rabbit_types:info_keys().
324324

@@ -444,7 +444,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
444444

445445
?LG_PROCESS_TYPE(channel),
446446
?store_proc_name({ConnName, Channel}),
447-
ok = pg_local:join(rabbit_channels, self()),
447+
ok = pg:join(node(), rabbit_channels, self()),
448448
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
449449
true -> flow;
450450
false -> noflow
@@ -783,7 +783,7 @@ terminate(_Reason,
783783
queue_states = QueueCtxs}) ->
784784
rabbit_queue_type:close(QueueCtxs),
785785
{_Res, _State1} = notify_queues(State),
786-
pg_local:leave(rabbit_channels, self()),
786+
pg:leave(node(), rabbit_channels, self()),
787787
rabbit_event:if_enabled(State, #ch.stats_timer,
788788
fun() -> emit_stats(State) end),
789789
[delete_stats(Tag) || {Tag, _} <- get()],

deps/rabbit/src/rabbit_networking.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -463,11 +463,11 @@ node_client_listeners(Node) ->
463463

464464
-spec register_connection(pid()) -> ok.
465465

466-
register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
466+
register_connection(Pid) -> pg:join(node(), rabbit_connections, Pid).
467467

468468
-spec unregister_connection(pid()) -> ok.
469469

470-
unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
470+
unregister_connection(Pid) -> pg:leave(node(), rabbit_connections, Pid).
471471

472472
-spec connections() -> [rabbit_types:connection()].
473473
connections() ->
@@ -476,17 +476,17 @@ connections() ->
476476

477477
-spec local_connections() -> [rabbit_types:connection()].
478478
local_connections() ->
479-
Amqp091Pids = pg_local:get_members(rabbit_connections),
479+
Amqp091Pids = pg:get_local_members(node(), rabbit_connections),
480480
Amqp10Pids = rabbit_amqp1_0:list_local(),
481481
Amqp10Pids ++ Amqp091Pids.
482482

483483
-spec register_non_amqp_connection(pid()) -> ok.
484484

485-
register_non_amqp_connection(Pid) -> pg_local:join(rabbit_non_amqp_connections, Pid).
485+
register_non_amqp_connection(Pid) -> pg:join(node(), rabbit_non_amqp_connections, Pid).
486486

487487
-spec unregister_non_amqp_connection(pid()) -> ok.
488488

489-
unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connections, Pid).
489+
unregister_non_amqp_connection(Pid) -> pg:leave(node(), rabbit_non_amqp_connections, Pid).
490490

491491
-spec non_amqp_connections() -> [rabbit_types:connection()].
492492

@@ -496,7 +496,7 @@ non_amqp_connections() ->
496496

497497
-spec local_non_amqp_connections() -> [rabbit_types:connection()].
498498
local_non_amqp_connections() ->
499-
pg_local:get_members(rabbit_non_amqp_connections).
499+
pg:get_local_members(node(), rabbit_non_amqp_connections).
500500

501501
-spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) ->
502502
rabbit_types:infos().

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ local_call(Pid, Request) ->
235235

236236
is_local(Pid) ->
237237
rabbit_amqp_session:is_local(Pid) orelse
238-
pg_local:in_group(rabbit_channels, Pid).
238+
lists:member(Pid, pg:get_local_members(node(), rabbit_channels)).
239239

240240
handle_event(QName, {deliver, Msg}, #?STATE{name = QName,
241241
ctag = Ctag,

deps/rabbit/test/proxy_protocol_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ proxy_protocol_v2_local(Config) ->
110110
ok.
111111

112112
connection_name() ->
113-
?awaitMatch([_], pg_local:get_members(rabbit_connections), 30000),
114-
[Pid] = pg_local:get_members(rabbit_connections),
113+
?awaitMatch([_], pg:get_local_members(node(), rabbit_connections), 30000),
114+
[Pid] = pg:get_local_members(node(), rabbit_connections),
115115
{dictionary, Dict} = process_info(Pid, dictionary),
116116
{process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict),
117117
ConnectionName.
@@ -120,5 +120,5 @@ wait_for_connection_close(Config) ->
120120
?awaitMatch(
121121
[],
122122
rabbit_ct_broker_helpers:rpc(
123-
Config, 0, pg_local, get_members, [rabbit_connnections]),
123+
Config, 0, pg, get_local_members, [node(), rabbit_connections]),
124124
30000).

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2214,8 +2214,17 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
22142214
[NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []),
22152215
%% Check the channel state contains the state for the quorum queue on
22162216
%% channel 1 and 2
2217-
wait_for_cleanup(Server, NCh1, 0),
2218-
wait_for_cleanup(Server, NCh2, 1),
2217+
%% Note: pg:get_local_members doesn't guarantee order, so we need to identify
2218+
%% which channel has queue state
2219+
{ChWithoutState, ChWithState} = case length(rpc:call(Server,
2220+
rabbit_channel,
2221+
list_queue_states,
2222+
[NCh1])) of
2223+
0 -> {NCh1, NCh2};
2224+
1 -> {NCh2, NCh1}
2225+
end,
2226+
wait_for_cleanup(Server, ChWithoutState, 0),
2227+
wait_for_cleanup(Server, ChWithState, 1),
22192228
%% then delete the queue and wait for the process to terminate
22202229
?assertMatch(#'queue.delete_ok'{},
22212230
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
@@ -2225,8 +2234,8 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
22252234
[?SUPNAME]))
22262235
end, 30000),
22272236
%% Check that all queue states have been cleaned
2228-
wait_for_cleanup(Server, NCh2, 0),
2229-
wait_for_cleanup(Server, NCh1, 0).
2237+
wait_for_cleanup(Server, ChWithState, 0),
2238+
wait_for_cleanup(Server, ChWithoutState, 0).
22302239

22312240
cleanup_queue_state_on_channel_after_subscribe(Config) ->
22322241
%% Declare/delete the queue and publish in one channel, while consuming on a

deps/rabbitmq_stream/src/rabbit_stream.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ kill_connection(ConnectionName) ->
144144
after 1000 -> ok
145145
end
146146
end,
147-
pg_local:get_members(rabbit_stream_connections)).
147+
pg:get_local_members(node(), rabbit_stream_connections)).
148148

149149
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
150150
Pids =

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ transition_to_opened(Transport,
473473
% TODO remove registration to rabbit_stream_connections
474474
% just meant to be able to close the connection remotely
475475
% should be possible once the connections are available in ctl list_connections
476-
pg_local:join(rabbit_stream_connections, self()),
476+
pg:join(node(), rabbit_stream_connections, self()),
477477
Connection1 =
478478
rabbit_event:init_stats_timer(NewConnection,
479479
#stream_connection.stats_timer),

0 commit comments

Comments
 (0)