Skip to content

Commit 4cba3ec

Browse files
committed
Bump version to v1.0.14: Improve README and documentation, update dev dependency
1 parent 2191060 commit 4cba3ec

7 files changed

+177
-296
lines changed

README.md

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Each use case necessitates distinct handling capabilities, which will be discuss
2121
* [Getter Methods](#getter-methods)
2222
* [1st use-case: Multiple Jobs Execution](#first-use-case)
2323
* [2nd use-case: Single Job Execution](#second-use-case)
24-
* [Graceful Termination](#graceful-termination)
24+
* [Graceful Teardown](#graceful-teardown)
2525
* [Error Handling for Background Jobs](#error-handling)
2626
* [Unavoidable / Implicit Backpressure](#unavoidable-backpressure)
2727
* [Promise Semaphores Are Not Promise Pools](#not-promise-pool)
@@ -31,8 +31,8 @@ Each use case necessitates distinct handling capabilities, which will be discuss
3131
## Key Features :sparkles:<a id="key-features"></a>
3232

3333
- __Weighted Jobs :weight_lifting_woman:__: Suitable for situations where jobs have **varying** processing requirements, such as in backend applications managing resource load. For instance, consider multiple machine learning models being trained on a shared GPU resource. Each model demands different amounts of GPU memory and processing power. A weighted semaphore can regulate the total GPU memory usage, ensuring that only a specific combination of models is trained concurrently, thus preventing the GPU capacity from being exceeded.
34-
- __Backpressure Control__: Ideal for job workers and background services. Concurrency control alone isn't sufficient to ensure stability and performance if backpressure control is overlooked. Without backpressure control, the heap can become overloaded, resulting in space complexity of O(*semaphore-slots* + *pending-jobs*) instead of O(*semaphore-slots*).
35-
- __Graceful Termination__: Await the completion of all currently executing jobs via the `waitForAllExecutingJobsToComplete` method.
34+
- __Backpressure Control :vertical_traffic_light:__: Ideal for job workers and background services. Concurrency control alone isn't sufficient to ensure stability and performance if backpressure control is overlooked. Without backpressure control, the heap can become overloaded, resulting in space complexity of O(*semaphore-slots* + *pending-jobs*) instead of O(*semaphore-slots*).
35+
- __Graceful Teardown :hourglass_flowing_sand:__: Await the completion of all currently executing jobs via the `waitForAllExecutingJobsToComplete` method. Example use cases include **application shutdowns** (e.g., `onModuleDestroy` in Nest.js applications) or maintaining a clear state between unit-tests.
3636
- __High Efficiency :gear:__: All state-altering operations have a constant time complexity, O(1).
3737
- __Comprehensive Documentation :books:__: The class is thoroughly documented, enabling IDEs to provide helpful tooltips that enhance the coding experience.
3838
- __Robust Error Handling__: Uncaught errors from background jobs triggered by `startExecution` are captured and can be accessed using the `extractUncaughtErrors` method.
@@ -106,15 +106,16 @@ interface ModelInfo {
106106

107107
const totalAllowedWeight = 180;
108108
const estimatedMaxNumberOfConcurrentJobs = 12;
109-
const trainingSemaphore = new ZeroBackpressureWeightedSemaphore<void>(
110-
totalAllowedWeight,
111-
estimatedMaxNumberOfConcurrentJobs // Optional argument; can reduce dynamic slot allocations for optimization purposes.
112-
);
113109

114110
async function trainModels(models: AsyncGenerator<ModelInfo>) {
111+
const trainingSemaphore = new ZeroBackpressureWeightedSemaphore<void>(
112+
totalAllowedWeight,
113+
// Optional argument; can reduce dynamic slot allocations for optimization purposes.
114+
estimatedMaxNumberOfConcurrentJobs
115+
);
115116
let fetchedModelsCounter = 0;
116117

117-
for (const model of models) {
118+
for await (const model of models) {
118119
++fetchedModelsCounter;
119120

120121
// Until the semaphore can start training the current model, adding more
@@ -151,24 +152,32 @@ interface ModelInfo {
151152
// Additional model fields.
152153
};
153154

154-
interface CustomModelError extends Error {
155-
model: ModelInfo; // In this manner, later you can associate an error with its model.
156-
// Alternatively, a custom error may contain just a few fields of interest.
155+
// Since errors are extracted during a post-processing phase, using a
156+
// custom error class allows each error to be associated with its model.
157+
class CustomModelError extends Error {
158+
constructor(
159+
message: string,
160+
public readonly model: ModelInfo
161+
) {
162+
super(message);
163+
this.name = this.constructor.name;
164+
Object.setPrototypeOf(this, CustomModelError.prototype);
165+
}
157166
}
158167

159168
const totalAllowedWeight = 180;
160169
const estimatedMaxNumberOfConcurrentJobs = 12;
161-
const trainingSemaphore =
170+
171+
async function trainModels(models: AsyncGenerator<ModelInfo>) {
162172
// Notice the 2nd generic parameter (Error by default).
163-
new ZeroBackpressureWeightedSemaphore<void, CustomModelError>(
173+
const trainingSemaphore = new ZeroBackpressureWeightedSemaphore<void, CustomModelError>(
164174
totalAllowedWeight,
165-
estimatedMaxNumberOfConcurrentJobs // Optional argument; can reduce dynamic slot allocations for optimization purposes.
175+
// Optional argument; can reduce dynamic slot allocations for optimization purposes.
176+
estimatedMaxNumberOfConcurrentJobs
166177
);
167-
168-
async function trainModels(models: AsyncGenerator<ModelInfo>) {
169178
let fetchedModelsCounter = 0;
170179

171-
for (const model of models) {
180+
for await (const model of models) {
172181
++fetchedModelsCounter;
173182

174183
// Until the semaphore can start training the current model, adding more
@@ -253,13 +262,13 @@ app.get('/user/', async (req, res) => {
253262
});
254263
```
255264

256-
## Graceful Termination :hourglass:<a id="graceful-termination"></a>
265+
## Graceful Teardown :hourglass_flowing_sand:<a id="graceful-teardown"></a>
257266

258-
The `waitForAllExecutingJobsToComplete` method is essential for scenarios where it is necessary to wait for all ongoing jobs to finish, such as logging a success message or executing subsequent logic. Without this built-in capability, developers would have to implement periodic polling of the semaphore or other indicators to monitor progress, which can increase both implementation complexity and resource usage.
267+
The `waitForAllExecutingJobsToComplete` method is essential for scenarios where it is necessary to **wait for all ongoing jobs to finish**, such as logging a success message or executing subsequent logic. Without this built-in capability, developers would have to implement periodic polling of the semaphore or other indicators to monitor progress, which can increase both implementation complexity and resource usage.
259268

260-
A key use case for this method is ensuring stable unit tests. Each test should start with a clean state, independent of others, to avoid interference. This prevents scenarios where a job from Test A inadvertently continues to execute during Test B.
269+
Example use cases include application shutdowns (e.g., `onModuleDestroy` in Nest.js applications) or maintaining a clear state between unit-tests to ensure stability. Each test should start with a clean state, independent of others, to avoid interference. This prevents scenarios where a job from Test A inadvertently continues to execute during Test B.
261270

262-
If your component has a termination method (`stop`, `terminate`, or similar), keep that in mind.
271+
If your component has a termination method (`stop`, `terminate`, `dispose` or similar), keep that in mind.
263272

264273
## Error Handling for Background Jobs :warning:<a id="error-handling"></a>
265274

dist/zero-backpressure-weighted-promise-semaphore.d.ts

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
export type SemaphoreJob<T> = () => Promise<T>;
1717
/**
18-
* ZeroBackpressureWeightedSemaphore
19-
*
2018
* The `ZeroBackpressureWeightedSemaphore` class implements a Promise Semaphore for Node.js projects,
2119
* enabling users to limit the concurrency of *weighted* jobs.
2220
* Each job is associated with a natural-number weight (1, 2, 3, ...). The semaphore guarantees that the
@@ -70,7 +68,6 @@ export type SemaphoreJob<T> = () => Promise<T>;
7068
* - **Initialization**: O(estimatedMaxNumberOfConcurrentJobs) for both time and space.
7169
* - **startExecution, waitForCompletion**: O(1) for both time and space, excluding the job execution itself.
7270
* - All the getter methods have O(1) complexity for both time and space.
73-
*
7471
*/
7572
export declare class ZeroBackpressureWeightedSemaphore<T = void, UncaughtErrorType = Error> {
7673
private readonly _totalAllowedWeight;
@@ -83,36 +80,26 @@ export declare class ZeroBackpressureWeightedSemaphore<T = void, UncaughtErrorTy
8380
private _waitForSufficientWeight?;
8481
private _notifyPendingAllotment?;
8582
/**
86-
* Constructor
87-
*
88-
* @param totalAllowedWeight - The maximum allowed sum of weights (inclusive) for jobs executed concurrently.
89-
* @param estimatedMaxNumberOfConcurrentJobs - Estimated maximum number of concurrently executing jobs.
90-
* A higher estimate reduces the likelihood of additional slot
91-
* allocations during runtime. Please observe that the upper bound
92-
* is `totalAllowedWeight`, as the minimum weight is 1.
83+
* @param totalAllowedWeight The maximum allowed sum of weights (inclusive) for jobs executed concurrently.
84+
* @param estimatedMaxNumberOfConcurrentJobs Estimated maximum number of concurrently executing jobs.
85+
* A higher estimate reduces the likelihood of additional slot
86+
* allocations during runtime. Please observe that the upper bound
87+
* is `totalAllowedWeight`, as the minimum weight is 1.
9388
*/
9489
constructor(totalAllowedWeight: number, estimatedMaxNumberOfConcurrentJobs?: number);
9590
/**
96-
* totalAllowedWeight
97-
*
9891
* @returns The maximum allowed sum of weights (inclusive) for jobs executed concurrently.
9992
*/
10093
get totalAllowedWeight(): number;
10194
/**
102-
* availableWeight
103-
*
10495
* @returns The currently available, non-allotted amount of weight.
10596
*/
10697
get availableWeight(): number;
10798
/**
108-
* amountOfCurrentlyExecutingJobs
109-
*
11099
* @returns The number of jobs currently being executed by the semaphore.
111100
*/
112101
get amountOfCurrentlyExecutingJobs(): number;
113102
/**
114-
* amountOfUncaughtErrors
115-
*
116103
* Indicates the number of uncaught errors from background jobs triggered by `startExecution`,
117104
* that are currently stored by the instance.
118105
* These errors have not yet been extracted using `extractUncaughtErrors`.
@@ -124,8 +111,6 @@ export declare class ZeroBackpressureWeightedSemaphore<T = void, UncaughtErrorTy
124111
*/
125112
get amountOfUncaughtErrors(): number;
126113
/**
127-
* startExecution
128-
*
129114
* This method resolves once the given job has *started* its execution, indicating that the
130115
* semaphore has allotted sufficient weight for the job.
131116
* Users can leverage this to prevent backpressure of pending jobs:
@@ -145,15 +130,13 @@ export declare class ZeroBackpressureWeightedSemaphore<T = void, UncaughtErrorTy
145130
* `extractUncaughtError` method. Users are encouraged to specify a custom `UncaughtErrorType`
146131
* generic parameter to the class if jobs may throw errors.
147132
*
148-
* @param backgroundJob - The job to be executed once the semaphore is available.
149-
* @param weight - A natural number representing the weight associated with the job.
150-
* @throws - Error if the weight is not a natural number (1, 2, 3, ...).
133+
* @param backgroundJob The job to be executed once the semaphore is available.
134+
* @param weight A natural number representing the weight associated with the job.
135+
* @throws Error if the weight is not a natural number (1, 2, 3, ...).
151136
* @returns A promise that resolves when the job starts execution.
152137
*/
153138
startExecution(backgroundJob: SemaphoreJob<T>, weight: number): Promise<void>;
154139
/**
155-
* waitForCompletion
156-
*
157140
* This method executes the given job in a controlled manner, once the semaphore has allotted
158141
* sufficient weight for the job. It resolves or rejects when the job finishes execution, returning
159142
* the job's value or propagating any error it may throw.
@@ -167,16 +150,14 @@ export declare class ZeroBackpressureWeightedSemaphore<T = void, UncaughtErrorTy
167150
* This method allows you to execute the job with controlled concurrency. Once the job resolves
168151
* or rejects, you can continue the route handler's flow based on the result.
169152
*
170-
* @param job - The job to be executed once the semaphore is available.
171-
* @param weight - A natural number representing the weight associated with the job.
172-
* @throws - Error if the weight is not a natural number (1, 2, 3, ...).
173-
* Alternatively, an error thrown by the job itself.
153+
* @param job The job to be executed once the semaphore is available.
154+
* @param weight A natural number representing the weight associated with the job.
155+
* @throws Error if the weight is not a natural number (1, 2, 3, ...).
156+
* Alternatively, an error thrown by the job itself.
174157
* @returns A promise that resolves with the job's return value or rejects with its error.
175158
*/
176159
waitForCompletion(job: SemaphoreJob<T>, weight: number): Promise<T>;
177160
/**
178-
* waitForAllExecutingJobsToComplete
179-
*
180161
* This method allows the caller to wait until all *currently* executing jobs have finished,
181162
* meaning once all running promises have either resolved or rejected.
182163
*
@@ -186,13 +167,18 @@ export declare class ZeroBackpressureWeightedSemaphore<T = void, UncaughtErrorTy
186167
* Note that the returned promise only awaits jobs that were executed at the time this method
187168
* was called. Specifically, it awaits all jobs initiated by this instance that had not completed
188169
* at the time of invocation.
170+
* In certain scenarios - such as during application shutdown - you might also want to wait for
171+
* any potentially pending jobs. This can be achieved using the following pattern:
172+
* ```
173+
* while (semaphore.amountOfCurrentlyExecutingJobs > 0) {
174+
* await semaphore.waitForAllExecutingJobsToComplete();
175+
* }
176+
* ```
189177
*
190178
* @returns A promise that resolves when all currently executing jobs are completed.
191179
*/
192180
waitForAllExecutingJobsToComplete(): Promise<void>;
193181
/**
194-
* extractUncaughtErrors
195-
*
196182
* This method returns an array of uncaught errors, captured by the semaphore while executing
197183
* background jobs added by `startExecution`. The term `extract` implies that the semaphore
198184
* instance will no longer hold these error references once extracted, unlike `get`. In other
@@ -214,8 +200,6 @@ export declare class ZeroBackpressureWeightedSemaphore<T = void, UncaughtErrorTy
214200
private _allotWeight;
215201
private _getAvailableSlot;
216202
/**
217-
* _handleJobExecution
218-
*
219203
* This method manages the execution of a given job in a controlled manner. It ensures that
220204
* the job is executed within the constraints of the semaphore and handles updating the
221205
* internal state once the job has completed.
@@ -225,12 +209,12 @@ export declare class ZeroBackpressureWeightedSemaphore<T = void, UncaughtErrorTy
225209
* - Updates the internal state to make the allotted slot available again once the job is finished.
226210
* - Release the weight-allotment lock if the requested amount is available.
227211
*
228-
* @param job - The job to be executed in the given slot.
229-
* @param allottedSlot - The slot number in which the job should be executed.
230-
* @param weight - A natural number representing the weight associated with the job.
231-
* @param isBackgroundJob - A flag indicating whether the caller expects a return value to proceed
232-
* with its work. If `true`, no return value is expected, and any error
233-
* thrown by the job should not be propagated to the event loop.
212+
* @param job The job to be executed in the given slot.
213+
* @param allottedSlot The slot number in which the job should be executed.
214+
* @param weight A natural number representing the weight associated with the job.
215+
* @param isBackgroundJob A flag indicating whether the caller expects a return value to proceed
216+
* with its work. If `true`, no return value is expected, and any error
217+
* thrown by the job should not be propagated to the event loop.
234218
* @returns A promise that resolves with the job's return value or rejects with its error.
235219
* Rejection occurs only if triggered by `waitForCompletion`.
236220
*/

0 commit comments

Comments
 (0)