@@ -340,46 +340,49 @@ process_connect(Implicit, Frame,
340340 {? HEADER_HEART_BEAT ,
341341 io_lib :format (" ~B ,~B " , [SendTimeout , ReceiveTimeout ])},
342342 {? HEADER_VERSION , Version }],
343- ok ('CONNECTED' ,
344- case application :get_env (rabbitmq_stomp , hide_server_info , false ) of
345- true -> Headers ;
346- false -> [{? HEADER_SERVER , server_header ()} | Headers ]
347- end ,
348- " " ,
349- StateN1 # state {cfg = # cfg {
343+
344+ Res = ok (" CONNECTED" ,
345+ case application :get_env (rabbitmq_stomp , hide_server_info , false ) of
346+ true -> Headers ;
347+ false -> [{? HEADER_SERVER , server_header ()} | Headers ]
348+ end ,
349+ " " ,
350+ StateN1 # state {cfg = # cfg {
350351 session_id = SessionId ,
351- version = Version
352- },
353- user = User ,
354- authz_ctx = AuthzCtx })
352+ version = Version
353+ },
354+ user = User ,
355+ authz_ctx = AuthzCtx }),
356+ self () ! connection_created ,
357+ Res
355358 else
356359 {error , no_common_version } ->
357360 error (" Version mismatch" ,
358361 " Supported versions are ~ts~n " ,
359362 [string :join (? SUPPORTED_VERSIONS , " ," )],
360363 StateN );
361- {error , not_allowed } ->
364+ {error , not_allowed , EUsername , EVHost } ->
362365 rabbit_log :warning (" STOMP login failed for user '~ts ': "
363- " virtual host access not allowed" , [Username ]),
366+ " virtual host access not allowed" , [EUsername ]),
364367 error (" Bad CONNECT" , " Virtual host '" ++
365- binary_to_list (VHost ) ++
368+ binary_to_list (EVHost ) ++
366369 " ' access denied" , State );
367370 {refused , Username1 , _Msg , _Args } ->
368371 rabbit_log :warning (" STOMP login failed for user '~ts ': authentication failed" , [Username1 ]),
369372 error (" Bad CONNECT" , " Access refused for user '" ++
370373 binary_to_list (Username1 ) ++ " '" , [], State );
371- {error , not_loopback } ->
374+ {error , not_loopback , EUsername } ->
372375 rabbit_log :warning (" STOMP login failed for user '~ts ': "
373- " this user's access is restricted to localhost" , [Username ]),
376+ " this user's access is restricted to localhost" , [EUsername ]),
374377 error (" Bad CONNECT" , " non-loopback access denied" , State )
375- end ,
378+ end
376379 case {Res , Implicit } of
377380 {{ok , _ , StateN2 }, implicit } ->
378381 self () ! connection_created , ok (StateN2 );
379382 _ ->
380383 self () ! connection_created , Res
381- end
382- end ,
384+
385+ end ,
383386 State ).
384387
385388creds (_ , _ , # cfg {default_login = DefLogin ,
@@ -912,16 +915,6 @@ do_send(Destination, _DestHdr,
912915
913916 io :format (" Message: ~p~n " , [Message ]),
914917
915- % % {ok, BasicMessage} = rabbit_basic:message(ExchangeName, RoutingKey, Content),
916-
917- % % Delivery = #delivery{
918- % % mandatory = false,
919- % % confirm = DoConfirm,
920- % % sender = self(),
921- % % message = BasicMessage,
922- % % msg_seq_no = MsgSeqNo,
923- % % flow = Flow
924- % % },
925918 QNames = rabbit_exchange :route (Exchange , Message , #{return_binding_keys => true }),
926919 io :format (" QNames ~p~n " , [QNames ]),
927920
@@ -1318,8 +1311,11 @@ ensure_reply_queue(TempQueueId, State = #state{reply_queues = RQS,
13181311 # resource {name = QNameBin } = QName = amqqueue :get_name (Queue ),
13191312
13201313 ConsumerTag = rabbit_stomp_util :consumer_tag_reply_to (TempQueueId ),
1314+
1315+
1316+ {ok , {_Global , DefaultPrefetch }} = application :get_env (rabbit , default_consumer_prefetch ),
13211317 Spec = #{no_ack => true ,
1322- prefetch_count => application : get_env ( rabbit , default_consumer_prefetch ) ,
1318+ prefetch_count => DefaultPrefetch ,
13231319 consumer_tag => ConsumerTag ,
13241320 exclusive_consume => false ,
13251321 args => []},
@@ -1718,7 +1714,6 @@ check_resource_access(User, Resource, Perm, Context) ->
17181714
17191715handle_down ({{'DOWN' , QName }, _MRef , process , QPid , Reason },
17201716 State0 = # state {queue_states = QStates0 } = State ) ->
1721- credit_flow :peer_down (QPid ),
17221717 case rabbit_queue_type :handle_down (QPid , QName , Reason , QStates0 ) of
17231718 {ok , QStates1 , Actions } ->
17241719 State1 = State0 # state {queue_states = QStates1 },
@@ -1782,13 +1777,6 @@ handle_queue_actions(Actions, #state{} = State0) ->
17821777 record_rejects (Rej , S );
17831778 ({queue_down , QRef }, S0 ) ->
17841779 handle_consuming_queue_down_or_eol (QRef , S0 );
1785- % % TODO: I have no idea about the scope of credit_flow
1786- ({block , QName }, S0 ) ->
1787- credit_flow :block (QName ),
1788- S0 ;
1789- ({unblock , QName }, S0 ) ->
1790- credit_flow :unblock (QName ),
1791- S0 ;
17921780 % % TODO: in rabbit_channel there code for handling
17931781 % % send_drained and send_credit_reply
17941782 % % I'm doing catch all here to not crash?
0 commit comments