|
| 1 | +# TaskExecutor Implementation Notes |
| 2 | + |
| 3 | +The asynchronous task executor powers every non-blocking logger. It accepts |
| 4 | +work from multiple producer threads and drains it on a dedicated worker. This |
| 5 | +document describes how the executor behaves across build configurations, |
| 6 | +provides guidance on tuning the backpressure policies, and explains the |
| 7 | +lifetime guarantees that logger integrations rely on. |
| 8 | + |
| 9 | +## 1. Implementation variants |
| 10 | + |
| 11 | +### Default deque worker (without `LOGIT_USE_MPSC_RING`) |
| 12 | + |
| 13 | +* Structure: one worker thread (`m_worker_thread`) consumes a `std::deque` |
| 14 | + protected by `m_queue_mutex`. |
| 15 | +* Synchronisation: producers and the worker coordinate through |
| 16 | + `m_queue_condition` and the `m_stop_flag` atomic. |
| 17 | +* Backpressure policies are implemented literally on the protected deque. |
| 18 | +* Intended for environments where a simple mutex-protected queue is sufficient |
| 19 | + or where the lock-free ring cannot be used. |
| 20 | + |
| 21 | +### Lock-free MPSC ring (`LOGIT_USE_MPSC_RING`) |
| 22 | + |
| 23 | +* Structure: producers push tasks into `m_mpsc_queue`, a lock-free |
| 24 | + `MpscRingAny<std::function<void()>>` with a single consumer thread. |
| 25 | +* Synchronisation primitives: |
| 26 | + * `m_cv` + `m_cv_mutex` coordinate sleepers for both the worker and producers |
| 27 | + that wait for capacity during `QueuePolicy::Block`. |
| 28 | + * `m_queue_condition` wakes `wait()` callers once the queue drains. |
| 29 | + * `m_active_tasks` tracks in-flight work so that `Block` limits concurrent |
| 30 | + execution and `wait()` can determine quiescence. |
| 31 | + * `m_stop_flag` terminates the worker and stops accepting new tasks. |
| 32 | +* Enables very low producer overhead while maintaining FIFO ordering on the |
| 33 | + consumer side. |
| 34 | + |
| 35 | +### Emscripten builds without pthreads |
| 36 | + |
| 37 | +* Structure: single-threaded `std::deque` guarded by `m_mutex`. |
| 38 | +* No dedicated worker thread is created. Instead, tasks are drained via |
| 39 | + `emscripten_async_call` scheduled from the main loop. |
| 40 | +* Not thread-safe — intended for WebAssembly builds where pthreads are not |
| 41 | + available. |
| 42 | + |
| 43 | +## 2. Backpressure semantics |
| 44 | + |
| 45 | +`QueuePolicy` controls what happens when the queue reaches `max_queue_size` |
| 46 | +(`0` means "unbounded"). |
| 47 | + |
| 48 | +* `Block` |
| 49 | + * Uses `m_active_tasks` to count in-flight work. If the counter reaches the |
| 50 | + limit, producers wait. The non-MPSC build waits on |
| 51 | + `m_queue_condition`. The MPSC build parks on `m_cv` with short sleeps while |
| 52 | + the worker drains tasks. This policy avoids loss at the expense of |
| 53 | + producer-side backpressure. |
| 54 | +* `DropNewest` |
| 55 | + * Non-MPSC: the incoming task is discarded when the deque is full. |
| 56 | + * MPSC: identical semantics — the incoming task is dropped and |
| 57 | + `m_dropped_tasks` is incremented. |
| 58 | +* `DropOldest` |
| 59 | + * Non-MPSC: the oldest dequeued element is removed, then the incoming task is |
| 60 | + enqueued, providing literal "drop the oldest" behaviour. |
| 61 | + * MPSC: **drop-incoming semantics**. The executor rejects the incoming task |
| 62 | + instead of racing to remove an old element. This preserves the order of |
| 63 | + tasks already accepted by the consumer, avoids lock-step coordination |
| 64 | + between multiple producers and the worker, and keeps the implementation |
| 65 | + TSAN-clean. `m_dropped_tasks` still counts these rejections. |
| 66 | + |
| 67 | +The drop counter is observable via `TaskExecutor::dropped_tasks()` and exposed to |
| 68 | +end users through `LOGIT_GET_DROPPED_TASKS()`. |
| 69 | + |
| 70 | +## 3. Hot queue resize (`LOGIT_USE_MPSC_RING`) |
| 71 | + |
| 72 | +`set_max_queue_size()` performs a "hot" resize without tearing down the |
| 73 | +application. |
| 74 | + |
| 75 | +1. `m_resizing` is set to `true` with release semantics. |
| 76 | +2. `wait()` drains the queue and ensures `m_active_tasks == 0`. |
| 77 | +3. The worker is stopped by setting `m_stop_flag`, notifying sleepers, and |
| 78 | + joining the thread so it no longer touches `m_mpsc_queue`. |
| 79 | +4. In a single thread the ring is rebuilt with the new capacity. The resize |
| 80 | + keeps `m_dropped_tasks` intact but resets `m_active_tasks` to 0 because the |
| 81 | + queue is empty. |
| 82 | +5. The worker thread is restarted and the stop flag cleared. |
| 83 | +6. `m_resizing` flips back to `false` and `m_resize_cv.notify_all()` wakes |
| 84 | + producers that parked at the start of `add_task()`. |
| 85 | + |
| 86 | +While the resize is in progress, producers briefly wait on `m_resize_cv`. No |
| 87 | +accepted tasks are lost, and the consumer thread never observes partially |
| 88 | +initialised ring buffers. |
| 89 | + |
| 90 | +## 4. Ordering and completion guarantees |
| 91 | + |
| 92 | +* Exactly one consumer thread executes tasks, so work is processed in the order |
| 93 | + accepted by the consumer. |
| 94 | +* When the ring build is enabled, `DropNewest` and `DropOldest` both drop the |
| 95 | + incoming task; accepted tasks keep their order. |
| 96 | +* `wait()` returns once the queue is empty and `m_active_tasks == 0`, or when a |
| 97 | + shutdown is requested. |
| 98 | +* `shutdown()` blocks until the worker thread terminates. It is safe to call |
| 99 | + multiple times. |
| 100 | + |
| 101 | +## 5. Singleton and lifetime management |
| 102 | + |
| 103 | +`TaskExecutor::get_instance()` intentionally stores the singleton inside a |
| 104 | +`static TaskExecutor* instance = new TaskExecutor();`. This lets the executor |
| 105 | +outlive static destructors inside logger components. Applications may call |
| 106 | +`shutdown()` explicitly (for example during test teardown), but the singleton |
| 107 | +remains valid until the process terminates. |
| 108 | + |
| 109 | +## 6. Emscripten (no pthreads) |
| 110 | + |
| 111 | +When targeting Emscripten without pthread support: |
| 112 | + |
| 113 | +* The executor remains single-threaded and therefore not thread-safe. |
| 114 | +* `Block` is approximated by invoking `drain()` from the producer path until the |
| 115 | + deque has room. `DropNewest`/`DropOldest` mirror the deque operations exactly. |
| 116 | +* Tasks are executed by `emscripten_async_call`, which schedules a drain on the |
| 117 | + browser event loop. This keeps logging compatible with the cooperative |
| 118 | + execution model used in WebAssembly UI scenarios. |
| 119 | +* Typical use cases: browser-hosted tools or demos that need asynchronous-style |
| 120 | + logging without pulling in pthread support. |
| 121 | + |
| 122 | +## 7. API surface and macros |
| 123 | + |
| 124 | +Public methods exposed by `TaskExecutor`: |
| 125 | + |
| 126 | +* `set_max_queue_size(std::size_t size)` — change the queue capacity (`0` |
| 127 | + disables the limit). Trigger a hot resize on MPSC builds. |
| 128 | +* `set_queue_policy(QueuePolicy policy)` — change overflow behaviour. |
| 129 | +* `add_task(std::function<void()> fn)` — enqueue work for the background worker. |
| 130 | +* `wait()` — block until the queue drains or stop is requested. |
| 131 | +* `shutdown()` — stop the worker thread and release resources. |
| 132 | +* `dropped_tasks()` and `reset_dropped_tasks()` — inspect or reset the overflow |
| 133 | + counter. |
| 134 | + |
| 135 | +Macros in `<logit_cpp/logit/log_macros.hpp>` map directly onto these calls: |
| 136 | + |
| 137 | +* `LOGIT_SET_MAX_QUEUE(size)` → `set_max_queue_size(size)` |
| 138 | +* `LOGIT_SET_QUEUE_POLICY(mode)` → `set_queue_policy(mode)` |
| 139 | +* `LOGIT_QUEUE_BLOCK`, `LOGIT_QUEUE_DROP_NEWEST`, `LOGIT_QUEUE_DROP_OLDEST` |
| 140 | + select the enum value. |
| 141 | +* `LOGIT_GET_DROPPED_TASKS()` and `LOGIT_RESET_DROPPED_TASKS()` forward to the |
| 142 | + counter helpers. |
| 143 | + |
| 144 | +### Examples |
| 145 | + |
| 146 | +Basic setup using macros: |
| 147 | + |
| 148 | +```cpp |
| 149 | +#include <logit.hpp> |
| 150 | + |
| 151 | +int main() { |
| 152 | + LOGIT_ADD_CONSOLE_DEFAULT(); |
| 153 | + LOGIT_SET_QUEUE_POLICY(LOGIT_QUEUE_BLOCK); |
| 154 | + |
| 155 | + LOGIT_INFO("async logging is live"); |
| 156 | + LOGIT_WAIT(); |
| 157 | +} |
| 158 | +``` |
| 159 | + |
| 160 | +Hot resize while the system is running (only with `LOGIT_USE_MPSC_RING`): |
| 161 | + |
| 162 | +```cpp |
| 163 | +auto& executor = logit::detail::TaskExecutor::get_instance(); |
| 164 | +LOGIT_SET_QUEUE_POLICY(LOGIT_QUEUE_BLOCK); |
| 165 | + |
| 166 | +// Later, increase the capacity without losing accepted tasks. |
| 167 | +LOGIT_SET_MAX_QUEUE(1024); // producers briefly wait for resize to finish |
| 168 | +``` |
| 169 | +
|
| 170 | +Inspecting drops under `DropNewest`: |
| 171 | +
|
| 172 | +```cpp |
| 173 | +LOGIT_SET_QUEUE_POLICY(LOGIT_QUEUE_DROP_NEWEST); |
| 174 | +LOGIT_SET_MAX_QUEUE(16); |
| 175 | +LOGIT_RESET_DROPPED_TASKS(); |
| 176 | +
|
| 177 | +for (int i = 0; i < 1000; ++i) { |
| 178 | + LOGIT_INFO("burst", i); |
| 179 | +} |
| 180 | +
|
| 181 | +LOGIT_WAIT(); |
| 182 | +const auto lost = LOGIT_GET_DROPPED_TASKS(); |
| 183 | +``` |
| 184 | + |
| 185 | +## 8. Thread-safety and TSAN considerations |
| 186 | + |
| 187 | +* All public methods on non-Emscripten builds are thread-safe. Producers may |
| 188 | + call `add_task()` concurrently with `set_max_queue_size()` and |
| 189 | + `set_queue_policy()`. |
| 190 | +* The hot-resize barrier uses `m_resizing` and `m_resize_cv` so producers never |
| 191 | + touch a ring buffer that is being rebuilt. This eliminates the data races that |
| 192 | + TSAN previously reported on `try_pop()` vs. buffer assignment. |
| 193 | +* Non-MPSC builds rely solely on mutexes and had no known data races. |
| 194 | +* The Emscripten path is single-threaded and should not be used concurrently. |
| 195 | + |
| 196 | +## 9. Performance and tuning |
| 197 | + |
| 198 | +* `QueuePolicy::Block` limits the number of in-flight tasks tracked by |
| 199 | + `m_active_tasks`. Use it to introduce producer-side backpressure when the |
| 200 | + downstream sinks are expensive. |
| 201 | +* The worker drains up to 2048 tasks per iteration when the ring is enabled. |
| 202 | + Increase this "budget" in `TaskExecutor::worker_function()` if your workload |
| 203 | + generates extremely large bursts and the worker sleeps too often. Reducing it |
| 204 | + can lower per-iteration latency for latency-sensitive applications. |
| 205 | +* Adjust `LOGIT_TASK_EXECUTOR_DEFAULT_RING_CAPACITY` at compile time to select a |
| 206 | + different default capacity when `LOGIT_USE_MPSC_RING` is active. |
| 207 | +* Monitor `dropped_tasks()` during load testing to verify that the chosen policy |
| 208 | + matches the application's tolerance for loss. |
0 commit comments