Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
-include_lib("kernel/include/logger.hrl").

-define(LINK_CREDIT_TIMEOUT, 20_000).
-define(AWAIT_SEND_MSG_TIMEOUT, 1_000).

-type state() :: rabbit_shovel_behaviour:state().
-type uri() :: rabbit_shovel_behaviour:uri().
Expand Down Expand Up @@ -374,7 +375,11 @@ send_msg(Link, Msg) ->
send_msg(Link, Msg)
after ?LINK_CREDIT_TIMEOUT ->
{stop, credited_timeout}
end
end;
{error, remote_incoming_window_exceeded} ->
%% We could be blocked because of an alarm
timer:sleep(?AWAIT_SEND_MSG_TIMEOUT),
send_msg(Link, Msg)
end.

add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->
Expand Down
81 changes: 75 additions & 6 deletions deps/rabbitmq_shovel/src/rabbit_local_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

-export([
boot_step/0,
conserve_resources/3,
parse/2,
connect_source/1,
connect_dest/1,
Expand Down Expand Up @@ -76,6 +77,12 @@ boot_step() ->
rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}),
rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}).

-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
conserve_resources(Pid, Source, {_, Conserve, _}) ->
gen_server:cast(Pid, {conserve_resources, Source, Conserve}).

parse(_Name, {source, Source}) ->
Queue = parse_parameter(queue, fun parse_binary/1,
proplists:get_value(queue, Source)),
Expand Down Expand Up @@ -234,14 +241,17 @@ init_dest(#{name := Name,
dest := #{add_forward_headers := AFH} = Dst} = State) ->
rabbit_global_counters:publisher_created(?PROTOCOL),
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Alarms = sets:from_list(Alarms0),
case AFH of
true ->
Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(),
<<"x-opt-shovel-type">> => rabbit_data_coercion:to_binary(Type),
<<"x-opt-shovel-name">> => rabbit_data_coercion:to_binary(Name)},
State#{dest => Dst#{cached_forward_headers => Props}};
State#{dest => Dst#{cached_forward_headers => Props,
alarms => Alarms}};
false ->
State
State#{dest => Dst#{alarms => Alarms}}
end.

source_uri(_State) ->
Expand Down Expand Up @@ -359,6 +369,19 @@ handle_dest({{'DOWN', #resource{kind = queue,
{eol, QState1, _QRef} ->
State0#{dest => Dest#{current => Current#{queue_states => QState1}}}
end;
handle_dest({conserve_resources, Alarm, Conserve}, #{dest := #{alarms := Alarms0} = Dest} = State0) ->
Alarms = case Conserve of
true -> sets:add_element(Alarm, Alarms0);
false -> sets:del_element(Alarm, Alarms0)
end,
State = State0#{dest => Dest#{alarms => Alarms}},
case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of
{false, true} ->
%% All alarms cleared
forward_pending_delivery(State);
{_, _} ->
State
end;
handle_dest(_Msg, State) ->
State.

Expand All @@ -374,7 +397,16 @@ forward(_, _, #{source := #{remaining_unacked := 0}} = State) ->
%% come back. So drop subsequent messages on the floor to be
%% requeued later
State;
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
forward(Tag, Msg, State) ->
case is_blocked(State) of
true ->
PendingEntry = {Tag, Msg},
add_pending_delivery(PendingEntry, State);
false ->
do_forward(Tag, Msg, State)
end.

do_forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
ack_mode := AckMode} = State0) ->
{Options, #{dest := #{current := Current1} = Dest1} = State} =
case AckMode of
Expand Down Expand Up @@ -437,10 +469,15 @@ add_routing(Msg0, Dest) ->
RK -> mc:set_annotation(?ANN_ROUTING_KEYS, [RK], Msg)
end.

status(_) ->
running.
status(State) ->
case is_blocked(State) of
true -> blocked;
false -> running
end.

pending_count(_State) ->
pending_count(#{dest := #{pending_delivery := Pending}}) ->
queue:len(Pending);
pending_count(_) ->
0.

%% Internal
Expand Down Expand Up @@ -903,3 +940,35 @@ messages_delivered(QName, S0) ->
_ ->
ok
end.

is_blocked(#{dest := #{alarms := Alarms}}) ->
not sets:is_empty(Alarms);
is_blocked(_) ->
false.

add_pending_delivery(Elem, State = #{dest := Dest}) ->
Pending = maps:get(pending_delivery, Dest, queue:new()),
State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}.

pop_pending_delivery(State = #{dest := Dest}) ->
Pending = maps:get(pending_delivery, Dest, queue:new()),
case queue:out(Pending) of
{empty, _} ->
empty;
{{value, Elem}, Pending2} ->
{Elem, State#{dest => Dest#{pending_delivery => Pending2}}}
end.

forward_pending_delivery(State) ->
case pop_pending_delivery(State) of
empty ->
State;
{{Tag, Mc}, S} ->
S2 = do_forward(Tag, Mc, S),
case is_blocked(S2) of
true ->
S2;
false ->
forward_pending_delivery(S2)
end
end.
46 changes: 45 additions & 1 deletion deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ groups() ->
local_to_local_delete_dest_queue,
local_to_local_stream_credit_flow_no_ack,
local_to_local_simple_uri,
local_to_local_counters
local_to_local_counters,
local_to_local_alarms
]}
].

Expand Down Expand Up @@ -247,6 +248,41 @@ local_to_local_counters(Config) ->
get_global_counters(Config), 30_000)
end).

local_to_local_alarms(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
ShovelArgs = [{<<"src-protocol">>, <<"local">>},
{<<"src-queue">>, Src},
{<<"dest-protocol">>, <<"local">>},
{<<"dest-queue">>, Dest}],
with_amqp10_session(
Config,
fun (Sess) ->
amqp10_publish(Sess, Src, <<"hello">>, 1000),
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
amqp10_expect_empty(Sess, Dest),
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
amqp10_expect_count(Sess, Dest, 1000),

shovel_test_utils:clear_param(Config, ?PARAM),

amqp10_publish(Sess, Src, <<"hello">>, 1000),
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
rabbit_ct_broker_helpers:set_alarm(Config, 0, memory),
shovel_test_utils:set_param(Config, ?PARAM, ShovelArgs),
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
amqp10_expect_empty(Sess, Dest),
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
?awaitMatch({running, blocked}, get_blocked_status(Config), 30000),
amqp10_expect_empty(Sess, Dest),
rabbit_ct_broker_helpers:clear_alarm(Config, 0, memory),
?awaitMatch({running, running}, get_blocked_status(Config), 30000),
amqp10_expect_count(Sess, Dest, 1000)
end).
%%----------------------------------------------------------------------------
declare_queue(Config, VHost, QName) ->
declare_queue(Config, VHost, QName, []).
Expand Down Expand Up @@ -303,3 +339,11 @@ get_global_counters(Config) ->
get_global_counters0(Config, Key) ->
Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []),
maps:get(Key, Overview).

get_blocked_status(Config) ->
case rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []) of
[{_, _, {Status, PropList}, _, _}] ->
{Status, proplists:get_value(blocked_status, PropList)};
_ ->
empty
end.
17 changes: 16 additions & 1 deletion deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ tests() ->
application_properties,
delete_src_queue,
shovel_status,
change_definition
change_definition,
disk_alarm
].

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -594,6 +595,20 @@ change_definition(Config) ->
amqp10_expect_empty(Sess, Dest2)
end).

disk_alarm(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
with_amqp10_session(Config,
fun (Sess) ->
ShovelArgs = ?config(shovel_args, Config),
amqp10_publish(Sess, Src, <<"hello">>, 10),
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
set_param(Config, ?PARAM, ShovelArgs),
amqp10_expect_empty(Sess, Dest),
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
amqp10_expect_count(Sess, Dest, 10)
end).

%%----------------------------------------------------------------------------
maybe_skip_local_protocol(Config) ->
[Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down
Loading