Add a coordinator threads framework#5749
Conversation
| * If there is not enough space in the array of callbacks | ||
| * re-allocate the fr_control_t with a larger array. | ||
| */ | ||
| if (id >= ctrl->num_callbacks) { |
There was a problem hiding this comment.
Could we add someone to the control sockets that prevent adding callbacks after use. Just like we lock the dictionaries at runtime.
|
@claude review for all issue types, thinking extra hard about concurrency issues |
|
...and the action is broken. That's great. |
src/lib/util/semaphore.h
Outdated
| #include <mach/semaphore.h> | ||
|
|
||
| #undef sem_t | ||
| #define sem_t semaphore_t |
There was a problem hiding this comment.
could we just add fr variants? and do a typedef? These seem like very special cross-process semaphores, and we might want to identify them as such.
|
Manually triggered review |
3df3086 to
6750655
Compare
To allow the control plane to be used for an arbitrary number of message IDs
In preparation for using separate processes in place of threads
2a2602a to
517dcf4
Compare
To start a request being processed through a coordinator
Control planes can be multi-producer, single consumer - so mulitple threads can send / push a message.
517dcf4 to
a54820e
Compare
| uint32_t max_workers; //!< Maximum number of workers we expect. | ||
| uint32_t num_workers; //!< How many workers are attached. | ||
|
|
||
| fr_atomic_queue_t *aq; //!< Atomic queue for worker -> coordinator control messages. |
There was a problem hiding this comment.
Remove this field, as a pointer is held by control
| uint32_t num_workers; //!< How many workers are attached. | ||
|
|
||
| fr_atomic_queue_t *aq; //!< Atomic queue for worker -> coordinator control messages. | ||
| fr_control_t *control; //!< Control plane for worker -> coordinator messages. |
|
|
||
| fr_atomic_queue_t *aq; //!< Atomic queue for worker -> coordinator control messages. | ||
| fr_control_t *control; //!< Control plane for worker -> coordinator messages. | ||
| fr_atomic_queue_t *data_aq; //!< Atomic queue for worker -> coordinator |
| fr_atomic_queue_t *aq; //!< Atomic queue for worker -> coordinator control messages. | ||
| fr_control_t *control; //!< Control plane for worker -> coordinator messages. | ||
| fr_atomic_queue_t *data_aq; //!< Atomic queue for worker -> coordinator | ||
| fr_ring_buffer_t **rb; //!< Ring buffers for coordinator -> worker control messages. |
There was a problem hiding this comment.
Maybe remove if the message set can hold the pointer (like control)
| fr_control_t *control; //!< Control plane for worker -> coordinator messages. | ||
| fr_atomic_queue_t *data_aq; //!< Atomic queue for worker -> coordinator | ||
| fr_ring_buffer_t **rb; //!< Ring buffers for coordinator -> worker control messages. | ||
| fr_message_set_t **ms; //!< Message sets for coordinator -> worker messages. |
| fr_ring_buffer_t **rb; //!< Ring buffers for coordinator -> worker control messages. | ||
| fr_message_set_t **ms; //!< Message sets for coordinator -> worker messages. | ||
| fr_control_t **worker_control; //!< Control planes for coordinator -> worker messages. | ||
| fr_atomic_queue_t **worker_data_aq; //!< Atomic queues for coordinator -> worker data. |
There was a problem hiding this comment.
Maybe remove if worker_control is the only user
| fr_atomic_queue_t *data_aq; //!< Atomic queue for worker -> coordinator | ||
| fr_ring_buffer_t **rb; //!< Ring buffers for coordinator -> worker control messages. | ||
| fr_message_set_t **ms; //!< Message sets for coordinator -> worker messages. | ||
| fr_control_t **worker_control; //!< Control planes for coordinator -> worker messages. |
| fr_atomic_queue_t **worker_data_aq; //!< Atomic queues for coordinator -> worker data. | ||
|
|
||
| unlang_interpret_t *intp; //!< Interpreter for running requests. | ||
| fr_heap_t *runnable; //!< Current runnable requests. |
There was a problem hiding this comment.
Move both intp and runnable into coord_pair
| unlang_interpret_t *intp; //!< Interpreter for running requests. | ||
| fr_heap_t *runnable; //!< Current runnable requests. | ||
|
|
||
| fr_timer_list_t *timeout; //!< Track when requests timeout using a dlist. |
| fr_time_tracking_t tracking; //!< How much time the coordinator has spent doing things. | ||
| uint64_t num_active; //!< Number of active requests. | ||
| request_slab_list_t *slab; //!< slab allocator for request_t | ||
|
|
Add the ability for modules to register coordinators and the associated callbacks to run when messages are passed between workers and coordinators.
In addition to raw data messages,
coord_pairadds the ability to use pair lists as the data sent, encoded with the internal encoder.