Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit 6ce6f20

Browse files
authored
Adding telemetry support to gen_rmq (#136)
1 parent 823a71c commit 6ce6f20

File tree

7 files changed

+389
-31
lines changed

7 files changed

+389
-31
lines changed

README.md

Lines changed: 100 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,24 @@ GenRMQ is a set of [behaviours][behaviours] meant to be used to create RabbitMQ
1010
Internally it is using [AMQP][amqp] elixir RabbitMQ client. The idea is to reduce boilerplate consumer / publisher
1111
code, which usually includes:
1212

13-
* creating connection / channel and keeping it in a state
14-
* creating and binding queue
15-
* handling reconnections / consumer cancellations
13+
- creating connection / channel and keeping it in a state
14+
- creating and binding queue
15+
- handling reconnections / consumer cancellations
1616

1717
The project currently provides the following functionality:
1818

19-
* `GenRMQ.Consumer` - a behaviour for implementing RabbitMQ consumers
20-
* `GenRMQ.Publisher` - a behaviour for implementing RabbitMQ publishers
21-
* `GenRMQ.Processor` - a behaviour for implementing RabbitMQ message processors
22-
* `GenRMQ.RabbitCase` - test utilities for RabbitMQ ([example usage][rabbit_case_example])
19+
- `GenRMQ.Consumer` - a behaviour for implementing RabbitMQ consumers
20+
- `GenRMQ.Publisher` - a behaviour for implementing RabbitMQ publishers
21+
- `GenRMQ.Processor` - a behaviour for implementing RabbitMQ message processors
22+
- `GenRMQ.RabbitCase` - test utilities for RabbitMQ ([example usage][rabbit_case_example])
2323

2424
## Installation
2525

26-
~~~elixir
26+
```elixir
2727
def deps do
2828
[{:gen_rmq, "~> 2.3.0"}]
2929
end
30-
~~~
30+
```
3131

3232
## Migrations
3333

@@ -39,7 +39,7 @@ More thorough examples for using `GenRMQ.Consumer` and `GenRMQ.Publisher` can be
3939

4040
### Consumer
4141

42-
~~~elixir
42+
```elixir
4343
defmodule Consumer do
4444
@behaviour GenRMQ.Consumer
4545

@@ -62,26 +62,26 @@ defmodule Consumer do
6262
...
6363
end
6464
end
65-
~~~
65+
```
6666

67-
~~~elixir
67+
```elixir
6868
GenRMQ.Consumer.start_link(Consumer, name: Consumer)
69-
~~~
69+
```
7070

7171
This will result in:
7272

73-
* durable `gen_rmq_exchange.deadletter` exchange created or redeclared
74-
* durable `gen_rmq_in_queue_error` queue created or redeclared. It will be bound to `gen_rmq_exchange.deadletter`
75-
* durable topic `gen_rmq_exchange` exchange created or redeclared
76-
* durable `gen_rmq_in_queue` queue created or redeclared. It will be bound to `gen_rmq_exchange` exchange and has a deadletter exchange set to `gen_rmq_exchange.deadletter`
77-
* every `handle_message` callback will executed in separate process. This can be disabled by setting `concurrency: false` in `init` callback
78-
* on failed rabbitmq connection it will wait for a bit and then reconnect
73+
- durable `gen_rmq_exchange.deadletter` exchange created or redeclared
74+
- durable `gen_rmq_in_queue_error` queue created or redeclared. It will be bound to `gen_rmq_exchange.deadletter`
75+
- durable topic `gen_rmq_exchange` exchange created or redeclared
76+
- durable `gen_rmq_in_queue` queue created or redeclared. It will be bound to `gen_rmq_exchange` exchange and has a deadletter exchange set to `gen_rmq_exchange.deadletter`
77+
- every `handle_message` callback will executed in separate process. This can be disabled by setting `concurrency: false` in `init` callback
78+
- on failed rabbitmq connection it will wait for a bit and then reconnect
7979

8080
There are many options to control the consumer setup details, please check the `c:GenRMQ.Consumer.init/0` [docs][consumer_doc] for all available settings.
8181

8282
### Publisher
8383

84-
~~~elixir
84+
```elixir
8585
defmodule Publisher do
8686
@behaviour GenRMQ.Publisher
8787

@@ -92,20 +92,95 @@ defmodule Publisher do
9292
]
9393
end
9494
end
95-
~~~
95+
```
9696

97-
~~~elixir
97+
```elixir
9898
GenRMQ.Publisher.start_link(Publisher, name: Publisher)
9999
GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))
100-
~~~
100+
```
101+
102+
## Telemetry
103+
104+
GenRMQ emits [Telemetry][https://github.com/beam-telemetry/telemetry] events for both consumers and publishers.
105+
It currently exposes the following events:
106+
107+
- `[:gen_rmq, :publisher, :connection, :start]` - Dispatched by a GenRMQ publisher when a connection to RabbitMQ is started
108+
109+
- Measurement: `%{time: System.monotonic_time}`
110+
- Metadata: `%{exchange: String.t}`
111+
112+
* `[:gen_rmq, :publisher, :connection, :stop]` - Dispatched by a GenRMQ publisher when a connection to RabbitMQ has been established
113+
114+
- Measurement: `%{time: System.monotonic_time, duration: native_time}`
115+
- Metadata: `%{exchange: String.t}`
116+
117+
* `[:gen_rmq, :publisher, :connection, :down]` - Dispatched by a GenRMQ publisher when a connection to RabbitMQ has been lost
118+
119+
- Measurement: `%{time: System.monotonic_time}`
120+
- Metadata: `%{module: atom, reason: atom}`
121+
122+
* `[:gen_rmq, :publisher, :message, :start]` - Dispatched by a GenRMQ publisher when a message is about to be published to RabbitMQ
123+
124+
- Measurement: `%{time: System.monotonic_time}`
125+
- Metadata: `%{exchange: String.t, message: String.t}`
126+
127+
* `[:gen_rmq, :publisher, :message, :stop]` - Dispatched by a GenRMQ publisher when a message has been published to RabbitMQ
128+
129+
- Measurement: `%{time: System.monotonic_time, duration: native_time}`
130+
- Metadata: `%{exchange: String.t, message: String.t}`
131+
132+
* `[:gen_rmq, :publisher, :message, :error]` - Dispatched by a GenRMQ publisher when a message failed to be published to RabbitMQ
133+
134+
- Measurement: `%{time: System.monotonic_time, duration: native_time}`
135+
- Metadata: `%{exchange: String.t, message: String.t, kind: atom, reason: atom}`
136+
137+
* `[:gen_rmq, :consumer, :message, :ack]` - Dispatched by a GenRMQ consumer when a message has been acknowledged
138+
139+
- Measurement: `%{time: System.monotonic_time}`
140+
- Metadata: `%{message: String.t}`
141+
142+
* `[:gen_rmq, :consumer, :message, :reject]` - Dispatched by a GenRMQ consumer when a message has been rejected
143+
144+
- Measurement: `%{time: System.monotonic_time}`
145+
- Metadata: `%{message: String.t, requeue: boolean}`
146+
147+
* `[:gen_rmq, :consumer, :message, :start]` - Dispatched by a GenRMQ consumer when the processing of a message has begun
148+
149+
- Measurement: `%{time: System.monotonic_time}`
150+
- Metadata: `%{message: String.t, module: atom}`
151+
152+
* `[:gen_rmq, :consumer, :message, :stop]` - Dispatched by a GenRMQ consumer when the processing of a message has completed
153+
154+
- Measurement: `%{time: System.monotonic_time, duration: native_time}`
155+
- Metadata: `%{message: String.t, module: atom}`
156+
157+
* `[:gen_rmq, :consumer, :connection, :start]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ is started
158+
159+
- Measurement: `%{time: System.monotonic_time}`
160+
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}`
161+
162+
* `[:gen_rmq, :consumer, :connection, :stop]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been established
163+
164+
- Measurement: `%{time: System.monotonic_time, duration: native_time}`
165+
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}`
166+
167+
* `[:gen_rmq, :consumer, :connection, :error]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ could not be made
168+
169+
- Measurement: `%{time: System.monotonic_time}`
170+
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t, error: any}`
171+
172+
* `[:gen_rmq, :consumer, :connection, :down]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been lost
173+
174+
- Measurement: `%{time: System.monotonic_time}`
175+
- Metadata: `%{module: atom, reason: atom}`
101176

102177
## Running tests
103178

104179
You need [docker-compose][docker_compose] installed.
105180

106-
~~~bash
181+
```bash
107182
$ make test
108-
~~~
183+
```
109184

110185
## How to contribute
111186

lib/consumer.ex

Lines changed: 116 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,9 @@ defmodule GenRMQ.Consumer do
193193
`message` - `GenRMQ.Message` struct
194194
"""
195195
@spec ack(message :: %GenRMQ.Message{}) :: :ok
196-
def ack(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}}) do
196+
def ack(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message) do
197+
emit_message_ack_event(message)
198+
197199
Basic.ack(channel, tag)
198200
end
199201

@@ -205,7 +207,9 @@ defmodule GenRMQ.Consumer do
205207
`requeue` - indicates if message should be requeued
206208
"""
207209
@spec reject(message :: %GenRMQ.Message{}, requeue :: boolean) :: :ok
208-
def reject(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}}, requeue \\ false) do
210+
def reject(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message, requeue \\ false) do
211+
emit_message_reject_event(message, requeue)
212+
209213
Basic.reject(channel, tag, requeue: requeue)
210214
end
211215

@@ -254,6 +258,8 @@ defmodule GenRMQ.Consumer do
254258
def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config} = state) do
255259
Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}")
256260

261+
emit_connection_down_event(module, reason)
262+
257263
config
258264
|> Keyword.get(:reconnect, true)
259265
|> handle_reconnect(state)
@@ -333,14 +339,26 @@ defmodule GenRMQ.Consumer do
333339
end
334340

335341
defp handle_message(payload, attributes, %{module: module} = state, false) do
342+
start_time = System.monotonic_time()
336343
message = Message.create(attributes, payload, state)
337-
apply(module, :handle_message, [message])
344+
345+
emit_message_start_event(start_time, message, module)
346+
result = apply(module, :handle_message, [message])
347+
emit_message_stop_event(start_time, message, module)
348+
349+
result
338350
end
339351

340352
defp handle_message(payload, attributes, %{module: module} = state, true) do
341353
spawn(fn ->
354+
start_time = System.monotonic_time()
342355
message = Message.create(attributes, payload, state)
343-
apply(module, :handle_message, [message])
356+
357+
emit_message_start_event(start_time, message, module)
358+
result = apply(module, :handle_message, [message])
359+
emit_message_stop_event(start_time, message, module)
360+
361+
result
344362
end)
345363
end
346364

@@ -361,8 +379,16 @@ defmodule GenRMQ.Consumer do
361379
end
362380

363381
defp get_connection(%{config: config, module: module, reconnect_attempt: attempt} = state) do
382+
start_time = System.monotonic_time()
383+
queue = config[:queue]
384+
exchange = config[:exchange]
385+
routing_key = config[:routing_key]
386+
387+
emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key)
388+
364389
case Connection.open(config[:uri]) do
365390
{:ok, conn} ->
391+
emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key)
366392
Process.monitor(conn.pid)
367393
Map.put(state, :conn, conn)
368394

@@ -372,6 +398,8 @@ defmodule GenRMQ.Consumer do
372398
"#{inspect(strip_key(config, :uri))}, reason #{inspect(e)}"
373399
)
374400

401+
emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, e)
402+
375403
retry_delay_fn = config[:retry_delay_function] || (&linear_delay/1)
376404
next_attempt = attempt + 1
377405
retry_delay_fn.(next_attempt)
@@ -441,6 +469,90 @@ defmodule GenRMQ.Consumer do
441469
end
442470
end
443471

472+
defp emit_message_ack_event(message) do
473+
start_time = System.monotonic_time()
474+
measurements = %{time: start_time}
475+
metadata = %{message: message}
476+
477+
:telemetry.execute([:gen_rmq, :consumer, :message, :ack], measurements, metadata)
478+
end
479+
480+
defp emit_message_reject_event(message, requeue) do
481+
start_time = System.monotonic_time()
482+
measurements = %{time: start_time}
483+
metadata = %{message: message, requeue: requeue}
484+
485+
:telemetry.execute([:gen_rmq, :consumer, :message, :reject], measurements, metadata)
486+
end
487+
488+
defp emit_message_start_event(start_time, message, module) do
489+
measurements = %{time: start_time}
490+
metadata = %{message: message, module: module}
491+
492+
:telemetry.execute([:gen_rmq, :consumer, :message, :start], measurements, metadata)
493+
end
494+
495+
defp emit_message_stop_event(start_time, message, module) do
496+
stop_time = System.monotonic_time()
497+
measurements = %{time: stop_time, duration: stop_time - start_time}
498+
metadata = %{message: message, module: module}
499+
500+
:telemetry.execute([:gen_rmq, :consumer, :message, :stop], measurements, metadata)
501+
end
502+
503+
defp emit_connection_down_event(module, reason) do
504+
start_time = System.monotonic_time()
505+
measurements = %{time: start_time}
506+
metadata = %{module: module, reason: reason}
507+
508+
:telemetry.execute([:gen_rmq, :consumer, :connection, :down], measurements, metadata)
509+
end
510+
511+
defp emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key) do
512+
measurements = %{time: start_time}
513+
514+
metadata = %{
515+
module: module,
516+
attempt: attempt,
517+
queue: queue,
518+
exchange: exchange,
519+
routing_key: routing_key
520+
}
521+
522+
:telemetry.execute([:gen_rmq, :consumer, :connection, :start], measurements, metadata)
523+
end
524+
525+
defp emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key) do
526+
stop_time = System.monotonic_time()
527+
measurements = %{time: stop_time, duration: stop_time - start_time}
528+
529+
metadata = %{
530+
module: module,
531+
attempt: attempt,
532+
queue: queue,
533+
exchange: exchange,
534+
routing_key: routing_key
535+
}
536+
537+
:telemetry.execute([:gen_rmq, :consumer, :connection, :stop], measurements, metadata)
538+
end
539+
540+
defp emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, error) do
541+
stop_time = System.monotonic_time()
542+
measurements = %{time: stop_time, duration: stop_time - start_time}
543+
544+
metadata = %{
545+
module: module,
546+
attempt: attempt,
547+
queue: queue,
548+
exchange: exchange,
549+
routing_key: routing_key,
550+
error: error
551+
}
552+
553+
:telemetry.execute([:gen_rmq, :consumer, :connection, :error], measurements, metadata)
554+
end
555+
444556
defp strip_key(keyword_list, key) do
445557
keyword_list
446558
|> Keyword.delete(key)

0 commit comments

Comments
 (0)