Skip to content

Add a coordinator threads framework#5749

Open
ndptech wants to merge 12 commits intoFreeRADIUS:masterfrom
ndptech:v4_coordinator_threads
Open

Add a coordinator threads framework#5749
ndptech wants to merge 12 commits intoFreeRADIUS:masterfrom
ndptech:v4_coordinator_threads

Conversation

@ndptech
Copy link
Member

@ndptech ndptech commented Feb 20, 2026

Add the ability for modules to register coordinators and the associated callbacks to run when messages are passed between workers and coordinators.

In addition to raw data messages, coord_pair adds the ability to use pair lists as the data sent, encoded with the internal encoder.

* If there is not enough space in the array of callbacks
* re-allocate the fr_control_t with a larger array.
*/
if (id >= ctrl->num_callbacks) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add someone to the control sockets that prevent adding callbacks after use. Just like we lock the dictionaries at runtime.

@arr2036
Copy link
Member

arr2036 commented Feb 23, 2026

@claude review for all issue types, thinking extra hard about concurrency issues

@arr2036
Copy link
Member

arr2036 commented Feb 24, 2026

...and the action is broken. That's great.

@FreeRADIUS FreeRADIUS deleted a comment from claude bot Feb 24, 2026
#include <mach/semaphore.h>

#undef sem_t
#define sem_t semaphore_t
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we just add fr variants? and do a typedef? These seem like very special cross-process semaphores, and we might want to identify them as such.

@arr2036
Copy link
Member

arr2036 commented Feb 26, 2026

Manually triggered review

  Bugs — High Severity
                                                                                                                                                              
  1. sizeof(fr_coord_t) instead of sizeof(fr_coord_data_t) in fr_message_set_create — coord.c (diff ~1129)

  coord->ms[i] = fr_message_set_create(coord, FR_CONTROL_MAX_MESSAGES, sizeof(fr_coord_t),
                                       coord_reg->outbound_rb_size, true);

  The message_size parameter is documented as "the size of each message, INCLUDING fr_message_t" (message.c:121). The existing codebase consistently passes
  the concrete message struct: sizeof(fr_channel_data_t) in both worker.c:309 and network.c:1430-1431.

  Here, the messages are fr_coord_data_t (embeds fr_message_t m + uint32_t coord_cb_id), but the code passes sizeof(fr_coord_t) — the entire coordinator
  struct with pointers to event lists, rbtrees, heaps, slab allocators, etc. message_size is capped at 1024 (message.c:148), so if sizeof(fr_coord_t) > 1024,
  fr_message_set_create returns NULL and the coordinator fails to start. If it's under 1024, it massively over-allocates each message slot. The
  worker->coordinator direction at diff ~1497 correctly uses sizeof(fr_coord_data_t).

  Fix: sizeof(fr_coord_t) -> sizeof(fr_coord_data_t)

  2. Off-by-one in callback bounds check — coord.c (diff ~807)

  if (cd->coord_cb_id > coord->num_callbacks) {

  Should be >=. If coord_cb_id == num_callbacks, this indexes one past the end of the array. The worker-side check at diff ~834 correctly uses >=:

  if (cd->coord_cb_id >= cw->num_callbacks) {

  3. coord_pair_reg->cb_id is never set — coord_pair.c / coord_pair.h

  The fr_coord_pair_reg_t struct has a cb_id field (diff ~1742), used in fr_coord_to_worker_reply_send (diff ~1867) and fr_worker_to_coord_pair_send (diff
  ~1893):

  ret = fr_coord_to_worker_send(packet_ctx->coord, worker_id, coord_pair_reg->cb_id, &dbuff);

  But fr_coord_pair_register never sets coord_pair_reg->cb_id — it's zero-initialized via the compound literal. The FR_COORD_PAIR_CB_CTX_SET macro (diff
  ~1977) takes _id but only sets uctx, not cb_id. So coord_pair_reg->cb_id is always 0, meaning pair-list replies will always be dispatched to callback ID 0
  on the receiver side regardless of intent.

  Fix: Add coord_pair_reg->cb_id = reg_ctx->cb_id; in fr_coord_pair_register.

  4. Missing mmap() error check — semaphore.c:33-38

  sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);

  if (sem_init(sem, 0, SEMAPHORE_LOCKED) != 0) {

  mmap() returns MAP_FAILED ((void *)-1) on failure, but the return is never checked. If mmap fails, sem_init is called on (void *)-1 — SIGSEGV.

  ---
  Bugs — Medium Severity

  5. fr_control_open frees caller's object on error — control.c (PR diff ~141-144)

  int fr_control_open(fr_control_t *c) {
      if (pipe(c->pipe) < 0) {
          talloc_free(c);     // <-- Frees the caller's object!

  This was carried over from when the code was inside fr_control_create() (a constructor, where freeing on failure is idiomatic). Now that fr_control_open is
  a separate post-creation call, the caller still holds a pointer and all call sites do goto fail -> talloc_free(parent) which would cascade-free c again. The
   talloc_free(c) calls in fr_control_open should be removed — the caller owns cleanup.

  Same issue on the fr_event_fd_insert failure path at diff ~156.

  6. fr_message_alloc return value unchecked — coord.c (diff ~1535-1537, ~1569-1571)

  fr_coord_data_t *cd = NULL;
  cd = (fr_coord_data_t *)fr_message_alloc(coord->ms[worker_id], (fr_message_t *)cd, fr_dbuff_used(dbuff));
  memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));  // NULL deref if alloc failed

  If the message ring buffer is exhausted, fr_message_alloc returns NULL. The subsequent memcpy segfaults. Both fr_coord_to_worker_send and
  fr_worker_to_coord_send have this.

  7. fr_atomic_queue_push return value unchecked — coord.c (diff ~1539, ~1573)

  fr_atomic_queue_push(coord->worker_data_aq[worker_id], cd);
  // ... then sends control signal anyway

  fr_atomic_queue_push returns false when the queue is full. If it fails, the data message is lost but the control signal is still sent. The receiver gets the
   pipe notification, calls fr_atomic_queue_pop, gets nothing, and silently discards. The sender returns success. Silent data loss in both
  fr_coord_to_worker_send and fr_worker_to_coord_send.

  8. fr_control_callback_add return unchecked in fr_coord_attach — coord.c (diff ~1506-1507)

  fr_control_callback_add(&cw->control, FR_CONTROL_ID_COORD_WORKER_ACK, cw, coordinate_worker_ack);
  fr_control_callback_add(&cw->control, FR_CONTROL_ID_COORD_CALLBACK, cw, coord_worker_recv_message);

  Neither call checks the return value. If either fails, the worker has a broken control plane but proceeds to attach to the coordinator.

  9. fr_control_open return unchecked in fr_coord_attach — coord.c (diff ~1509)

  fr_control_open(cw->control);  // No error check

  If fr_control_open fails, the worker proceeds with a non-functional control plane.

  ---
  Concurrency — Medium Severity

  10. Multiple workers writing to same coordinator pipe — coord.c (diff ~1573-1576)

  Workers call fr_control_message_send(cw->coord->control, cw->rb, ...) from different threads. Each worker has its own cw->rb ring buffer, and the atomic
  queue is MPMC-safe. However, fr_control_message_send at control.c:354 writes to c->pipe[1]:

  if (write(c->pipe[1], ".", 1) >= 0) return 0;

  Multiple workers write to the same cw->coord->control->pipe[1] concurrently. POSIX guarantees atomicity for write() of <= PIPE_BUF bytes (1 byte here), so
  this is technically safe. But fr_control_message_send has a documented contract at control.c:329: "This function is called ONLY from the originating
  thread." This is a novel usage that violates the documented assumption.

  Deserves at minimum a comment on cw->coord->control documenting that it's intentionally shared across worker threads, and explaining why it's safe (1-byte
  writes, per-worker ring buffers, MPMC atomic queue).

  ---
  Bugs — Low Severity

  11. UNUSED annotation on parameter that IS used — coord_pair.c (diff ~1810)

  void coord_recv_pair_data(fr_coord_t *coord, uint32_t worker_id, fr_dbuff_t *dbuff,
                            UNUSED fr_time_t now, void *uctx)
  {
      ...
      coord_request_bootstrap(coord, worker_id, dbuff, now, coord_pair_reg);
                                                       ^^^

  UNUSED is __attribute__((unused)) so this compiles fine, but it's misleading. Remove the UNUSED.

  12. Comma operator instead of semicolon — coord.c (diff ~1517)

  fr_control_message_send(cw->coord->control, cw->rb, FR_CONTROL_ID_COORD_WORKER_ATTACH,
                          msg, sizeof(fr_coord_worker_msg_t)),
  talloc_free(msg);

  The comma silently discards the return value. Should be a semicolon, and ideally the return value checked.

  13. Typo — coord.c (diff ~805)

  "Recevied" -> "Received"

  ---
  Design / Fragility — Low Severity

  14. pshared=0 vs stated inter-process goal — semaphore.c:39

  sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
  sem_init(sem, 0, SEMAPHORE_LOCKED);  // pshared=0

  The commit message says "In preparation for using separate processes in place of threads", but pshared=0 only supports thread sharing per POSIX. For
  inter-process sharing, pshared must be non-zero. Works now (threads only) but won't work for the stated future goal.

  15. Any worker detach unconditionally sets coord->exiting = true — coord.c (diff ~886)

  When any worker detaches, exiting is set and the coordinator loop will exit once num_workers == 0. No mechanism for graceful worker replacement or partial
  detach.

  16. Stack-allocated search key with garbage fields — coord.c (diff ~1487)

  cw->coord = fr_rb_find(&coords, &(fr_coord_t){ .coord_reg = coord_reg });

  Creates a stack-allocated fr_coord_t with only coord_reg initialized. coord_cmp only accesses coord_reg, so this works. But if coord_cmp ever changes to
  compare additional fields, this reads uninitialized memory. Consider using a designated initializer that zeros the struct: &(fr_coord_t){ 0, .coord_reg =
  coord_reg } or a comment.

  ---
  Style — Low Severity

  17. Indentation error — control.c (diff ~211)

          memset(...);

                 ctrl->num_callbacks = id + 1;  // over-indented

  18. Brace style inconsistency — control.c (diff ~141)

  int fr_control_open(fr_control_t *c) {

  Rest of codebase uses next-line braces for function definitions.

  ---
  Summary

  ┌─────┬──────────────────────────────────────────┬─────────────┬──────────┐
  │  #  │                  Issue                   │    Type     │ Severity │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 1   │ sizeof(fr_coord_t) in message_set_create │ Bug         │ High     │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 2   │ Off-by-one > vs >= in callback check     │ Bug         │ High     │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 3   │ coord_pair_reg->cb_id never set          │ Bug         │ High     │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 4   │ Missing mmap() error check               │ Bug         │ High     │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 5   │ fr_control_open frees caller's object    │ Bug         │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 6   │ fr_message_alloc return unchecked        │ Bug         │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 7   │ fr_atomic_queue_push return unchecked    │ Concurrency │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 8   │ fr_control_callback_add return unchecked │ Bug         │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 9   │ fr_control_open return unchecked         │ Bug         │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 10  │ Multi-thread pipe writes undocumented    │ Concurrency │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 11  │ UNUSED on used parameter                 │ Bug         │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 12  │ Comma instead of semicolon               │ Bug         │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 13  │ Typo "Recevied"                          │ Style       │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 14  │ pshared=0 vs inter-process intent        │ Design      │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 15  │ Detach unconditionally sets exiting      │ Design      │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 16  │ Stack search key with garbage fields     │ Fragility   │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 17  │ Over-indentation                         │ Style       │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 18  │ Brace style inconsistency                │ Style       │ Low      │
  └─────┴──────────────────────────────────────────┴─────────────┴──────────┘

@ndptech ndptech force-pushed the v4_coordinator_threads branch from 3df3086 to 6750655 Compare February 26, 2026 17:44
To allow the control plane to be used for an arbitrary number of message
IDs
In preparation for using separate processes in place of threads
@ndptech ndptech force-pushed the v4_coordinator_threads branch 2 times, most recently from 2a2602a to 517dcf4 Compare February 27, 2026 09:32
@ndptech ndptech force-pushed the v4_coordinator_threads branch from 517dcf4 to a54820e Compare February 27, 2026 09:46
uint32_t max_workers; //!< Maximum number of workers we expect.
uint32_t num_workers; //!< How many workers are attached.

fr_atomic_queue_t *aq; //!< Atomic queue for worker -> coordinator control messages.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this field, as a pointer is held by control

uint32_t num_workers; //!< How many workers are attached.

fr_atomic_queue_t *aq; //!< Atomic queue for worker -> coordinator control messages.
fr_control_t *control; //!< Control plane for worker -> coordinator messages.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coord_recv_control


fr_atomic_queue_t *aq; //!< Atomic queue for worker -> coordinator control messages.
fr_control_t *control; //!< Control plane for worker -> coordinator messages.
fr_atomic_queue_t *data_aq; //!< Atomic queue for worker -> coordinator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coord_recv_aq

fr_atomic_queue_t *aq; //!< Atomic queue for worker -> coordinator control messages.
fr_control_t *control; //!< Control plane for worker -> coordinator messages.
fr_atomic_queue_t *data_aq; //!< Atomic queue for worker -> coordinator
fr_ring_buffer_t **rb; //!< Ring buffers for coordinator -> worker control messages.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coord_send_rb

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove if the message set can hold the pointer (like control)

fr_control_t *control; //!< Control plane for worker -> coordinator messages.
fr_atomic_queue_t *data_aq; //!< Atomic queue for worker -> coordinator
fr_ring_buffer_t **rb; //!< Ring buffers for coordinator -> worker control messages.
fr_message_set_t **ms; //!< Message sets for coordinator -> worker messages.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coord_send_ms

fr_ring_buffer_t **rb; //!< Ring buffers for coordinator -> worker control messages.
fr_message_set_t **ms; //!< Message sets for coordinator -> worker messages.
fr_control_t **worker_control; //!< Control planes for coordinator -> worker messages.
fr_atomic_queue_t **worker_data_aq; //!< Atomic queues for coordinator -> worker data.
Copy link
Member

@arr2036 arr2036 Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove if worker_control is the only user

fr_atomic_queue_t *data_aq; //!< Atomic queue for worker -> coordinator
fr_ring_buffer_t **rb; //!< Ring buffers for coordinator -> worker control messages.
fr_message_set_t **ms; //!< Message sets for coordinator -> worker messages.
fr_control_t **worker_control; //!< Control planes for coordinator -> worker messages.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coord_send_control

fr_atomic_queue_t **worker_data_aq; //!< Atomic queues for coordinator -> worker data.

unlang_interpret_t *intp; //!< Interpreter for running requests.
fr_heap_t *runnable; //!< Current runnable requests.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move both intp and runnable into coord_pair

unlang_interpret_t *intp; //!< Interpreter for running requests.
fr_heap_t *runnable; //!< Current runnable requests.

fr_timer_list_t *timeout; //!< Track when requests timeout using a dlist.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to coord_pair

fr_time_tracking_t tracking; //!< How much time the coordinator has spent doing things.
uint64_t num_active; //!< Number of active requests.
request_slab_list_t *slab; //!< slab allocator for request_t

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All move to coord_pair

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants