Skip to content

Commit 16952f6

Browse files
committed
⚡ Use Atomics
Signed-off-by: kei-g <km.8k6ce+github@gmail.com>
1 parent 6000b25 commit 16952f6

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

async-iterable-queue.ts

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { EventEmitter } from 'stream'
2+
import { assert } from 'console'
23

34
/**
45
* 非同期反復可能な先入れ先出し型の待ち行列への非同期反復子
@@ -39,7 +40,11 @@ class AIQAsyncIterator<T> implements AsyncIterator<T> {
3940
/**
4041
* 非同期反復可能な先入れ先出し型の待ち行列の状態を表す型
4142
*/
42-
type AIQState = 'ending' | 'finished'
43+
enum AIQState {
44+
ending = 1,
45+
finished = 2,
46+
undefined = 0,
47+
}
4348

4449
/**
4550
* 非同期反復可能な先入れ先出し型の待ち行列
@@ -63,18 +68,14 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
6368
/**
6469
* この待ち行列の現在の状態
6570
*/
66-
#state?: AIQState
71+
readonly #state = new Uint8Array([AIQState.undefined])
6772

6873
/**
6974
* コンストラクタ
7075
*/
7176
constructor() {
7277
const resolveAsync = createAsyncResolver({
73-
finish: () => {
74-
const state = this.#state
75-
this.#state = 'finished'
76-
return state
77-
},
78+
finish: () => Atomics.exchange(this.#state, 0, AIQState.finished),
7879
resolvers: this.#resolvers,
7980
})
8081
this.#emitter.on('deq', async () => {
@@ -95,10 +96,14 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
9596
end(cb?: NoParameterCallback): Promise<void> {
9697
return new Promise(
9798
(resolve: Resolver<void>, reject: SingleParameterAction<unknown>) => {
98-
const state = this.#state
99-
if (state)
100-
return reject(new Error(state))
101-
this.#state = 'ending'
99+
const state = Atomics.compareExchange(
100+
this.#state,
101+
0,
102+
AIQState.undefined,
103+
AIQState.ending,
104+
)
105+
if (state !== AIQState.undefined)
106+
return reject(new Error(AIQState[state]))
102107
this.#emitter.emit('enq', new Terminator(cb))
103108
return resolve()
104109
}
@@ -112,9 +117,9 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
112117
push(value: T): Promise<void> {
113118
return new Promise(
114119
(resolve: Resolver<void>, reject: SingleParameterAction<unknown>) => {
115-
const state = this.#state
116-
if (state)
117-
return reject(new Error(state))
120+
const state = Atomics.load(this.#state, 0)
121+
if (state !== AIQState.undefined)
122+
return reject(new Error(AIQState[state]))
118123
this.#emitter.emit('enq', value)
119124
return resolve()
120125
}
@@ -220,8 +225,8 @@ const createAsyncResolver = <T>(param: AsyncResolverCreateParameter<T>) => {
220225
if (value instanceof Terminator) {
221226
const state = param.finish()
222227
await resolveAsync({ done: true } as IteratorResult<T>)
223-
if (state === 'ending')
224-
await value.call()
228+
assert(state === AIQState.ending)
229+
await value.call()
225230
}
226231
else
227232
await resolveAsync({

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
"main": "lib/async-iterable-queue.js",
4242
"name": "async-iterable-queue",
4343
"nyc": {
44-
"branches": 93,
44+
"branches": 100,
4545
"functions": 100,
4646
"lines": 100,
4747
"statements": 100

0 commit comments

Comments
 (0)