|
19 | 19 |
|
20 | 20 | import java.time.Instant;
|
21 | 21 |
|
| 22 | +/** API to configure and create a {@link Consumer}. */ |
22 | 23 | public interface ConsumerBuilder {
|
23 | 24 |
|
| 25 | + /** |
| 26 | + * The queue to consume from. |
| 27 | + * |
| 28 | + * @param queue queue |
| 29 | + * @return this builder instance |
| 30 | + */ |
24 | 31 | ConsumerBuilder queue(String queue);
|
25 | 32 |
|
| 33 | + /** |
| 34 | + * The callback for inbound messages. |
| 35 | + * |
| 36 | + * @param handler callback |
| 37 | + * @return this builder instance |
| 38 | + */ |
26 | 39 | ConsumerBuilder messageHandler(Consumer.MessageHandler handler);
|
27 | 40 |
|
| 41 | + /** |
| 42 | + * The initial number credits to grant to the AMQP receiver. |
| 43 | + * |
| 44 | + * <p>The default is 100. |
| 45 | + * |
| 46 | + * @param initialCredits number of initial credits |
| 47 | + * @return this buidler instance |
| 48 | + */ |
28 | 49 | ConsumerBuilder initialCredits(int initialCredits);
|
29 | 50 |
|
| 51 | + /** |
| 52 | + * The consumer priority. |
| 53 | + * |
| 54 | + * @param priority consumer priority |
| 55 | + * @return this builder instance |
| 56 | + * @see <a href="https://www.rabbitmq.com/docs/consumer-priority">Consumer Priorities</a> |
| 57 | + */ |
30 | 58 | ConsumerBuilder priority(int priority);
|
31 | 59 |
|
| 60 | + /** |
| 61 | + * Add {@link com.rabbitmq.client.amqp.Resource.StateListener}s to the consumer. |
| 62 | + * |
| 63 | + * @param listeners listeners |
| 64 | + * @return this builder instance |
| 65 | + */ |
32 | 66 | ConsumerBuilder listeners(Resource.StateListener... listeners);
|
33 | 67 |
|
| 68 | + /** |
| 69 | + * Options for a consumer consuming from a stream. |
| 70 | + * |
| 71 | + * @return stream options |
| 72 | + * @see <a href="https://www.rabbitmq.com/docs/streams">Streams</a> |
| 73 | + * @see <a href="https://www.rabbitmq.com/docs/streams#consuming">Stream Consumers</a> |
| 74 | + */ |
34 | 75 | StreamOptions stream();
|
35 | 76 |
|
| 77 | + /** |
| 78 | + * Build the consumer. |
| 79 | + * |
| 80 | + * @return the configured consumer instance |
| 81 | + */ |
36 | 82 | Consumer build();
|
37 | 83 |
|
| 84 | + /** |
| 85 | + * Options for a consumer consuming from a stream. |
| 86 | + * |
| 87 | + * @see <a href="https://www.rabbitmq.com/docs/streams">Streams</a> |
| 88 | + * @see <a href="https://www.rabbitmq.com/docs/streams#consuming">Stream Consumers</a> |
| 89 | + */ |
38 | 90 | interface StreamOptions {
|
39 | 91 |
|
| 92 | + /** |
| 93 | + * The offset to start consuming from. |
| 94 | + * |
| 95 | + * @param offset offset |
| 96 | + * @return stream options |
| 97 | + */ |
40 | 98 | StreamOptions offset(long offset);
|
41 | 99 |
|
| 100 | + /** |
| 101 | + * A point in time to start consuming from. |
| 102 | + * |
| 103 | + * <p>Be aware consumers can receive messages published a bit before the specified timestamp. |
| 104 | + * |
| 105 | + * @param timestamp the timestamp |
| 106 | + * @return stream options |
| 107 | + */ |
42 | 108 | StreamOptions offset(Instant timestamp);
|
43 | 109 |
|
| 110 | + /** |
| 111 | + * The offset to start consuming from. |
| 112 | + * |
| 113 | + * @param specification offset specification |
| 114 | + * @return stream options |
| 115 | + * @see StreamOffsetSpecification |
| 116 | + */ |
44 | 117 | StreamOptions offset(StreamOffsetSpecification specification);
|
45 | 118 |
|
| 119 | + /** |
| 120 | + * The offset to start consuming from as an interval string value. |
| 121 | + * |
| 122 | + * <p>Valid units are Y, M, D, h, m, s. Examples: <code>7D</code> (7 days), <code>12h</code> (12 |
| 123 | + * hours). |
| 124 | + * |
| 125 | + * @param interval the interval |
| 126 | + * @return stream options |
| 127 | + * @see <a href="https://www.rabbitmq.com/docs/streams#retention">Interval Syntax</a> |
| 128 | + */ |
46 | 129 | StreamOptions offset(String interval);
|
47 | 130 |
|
| 131 | + /** |
| 132 | + * Filter values. |
| 133 | + * |
| 134 | + * @param values filter values |
| 135 | + * @return stream options |
| 136 | + * @see <a href="https://www.rabbitmq.com/docs/streams#filtering">Stream Filtering</a> |
| 137 | + */ |
48 | 138 | StreamOptions filterValues(String... values);
|
49 | 139 |
|
| 140 | + /** |
| 141 | + * Whether messages without a filter value should be sent. |
| 142 | + * |
| 143 | + * <p>Default is <code>false</code> (messages without a filter value are not sent). |
| 144 | + * |
| 145 | + * @param matchUnfiltered true to send messages without a filter value |
| 146 | + * @return stream options |
| 147 | + */ |
50 | 148 | StreamOptions filterMatchUnfiltered(boolean matchUnfiltered);
|
51 | 149 |
|
| 150 | + /** |
| 151 | + * Return the consumer builder. |
| 152 | + * |
| 153 | + * @return the consumer builder |
| 154 | + */ |
52 | 155 | ConsumerBuilder builder();
|
53 | 156 | }
|
54 | 157 |
|
| 158 | + /** Offset specification to start consuming from. */ |
55 | 159 | enum StreamOffsetSpecification {
|
| 160 | + /** Beginning of the stream. */ |
56 | 161 | FIRST,
|
| 162 | + /** Last chunk of the stream. */ |
57 | 163 | LAST,
|
| 164 | + /** Very end of the stream (new chunks). */ |
58 | 165 | NEXT
|
59 | 166 | }
|
60 | 167 | }
|
0 commit comments