Skip to content

Commit c281044

Browse files
committed
🔀 Merge branch 'devel' into release
- Performance improvement - Call 'reject' rather than throwing Error - Use Atomics - README.md is updated - logo of badges are updated - package.json is updated - 'badges' is added Signed-off-by: kei-g <km.8k6ce+github@gmail.com>
2 parents aa2ca7e + a706fb9 commit c281044

File tree

4 files changed

+86
-37
lines changed

4 files changed

+86
-37
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
# async-iterable-queue [![License](https://img.shields.io/github/license/kei-g/async-iterable-queue)](https://opensource.org/licenses/BSD-3-Clause) [![Libraries.io dependency status for latest release](https://img.shields.io/librariesio/release/npm/async-iterable-queue)](https://npmjs.com/package/async-iterable-queue?activeTab=dependencies) [![Travis CI](https://img.shields.io/travis/com/kei-g/async-iterable-queue?logo=travis)](https://www.travis-ci.com/github/kei-g/async-iterable-queue) [![npm](https://img.shields.io/npm/v/async-iterable-queue)](https://npmjs.com/package/async-iterable-queue)
1+
# async-iterable-queue [![License](https://img.shields.io/github/license/kei-g/async-iterable-queue)](https://opensource.org/licenses/BSD-3-Clause) [![Libraries.io dependency status for latest release](https://img.shields.io/librariesio/release/npm/async-iterable-queue?logo=nodedotjs)](https://npmjs.com/package/async-iterable-queue?activeTab=dependencies) [![Travis CI](https://img.shields.io/travis/com/kei-g/async-iterable-queue?logo=travis)](https://www.travis-ci.com/github/kei-g/async-iterable-queue) [![npm](https://img.shields.io/npm/v/async-iterable-queue?logo=npm)](https://npmjs.com/package/async-iterable-queue)
22

3-
[![npms.io (maintenance)](https://img.shields.io/npms-io/maintenance-score/async-iterable-queue)](https://npms.io/search?q=async-iterable-queue) [![npms.io (quality)](https://img.shields.io/npms-io/quality-score/async-iterable-queue)](https://npms.io/search?q=async-iterable-queue)
3+
[![npms.io (maintenance)](https://img.shields.io/npms-io/maintenance-score/async-iterable-queue?logo=npm)](https://npms.io/search?q=async-iterable-queue) [![npms.io (quality)](https://img.shields.io/npms-io/quality-score/async-iterable-queue?logo=npm)](https://npms.io/search?q=async-iterable-queue)
44

55
Async Iterable Queue

async-iterable-queue.ts

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { EventEmitter } from 'stream'
2+
import { assert } from 'console'
23

34
/**
45
* 非同期反復可能な先入れ先出し型の待ち行列への非同期反復子
@@ -39,7 +40,11 @@ class AIQAsyncIterator<T> implements AsyncIterator<T> {
3940
/**
4041
* 非同期反復可能な先入れ先出し型の待ち行列の状態を表す型
4142
*/
42-
type AIQState = 'ending' | 'finished'
43+
enum AIQState {
44+
ending = 1,
45+
finished = 2,
46+
undefined = 0,
47+
}
4348

4449
/**
4550
* 非同期反復可能な先入れ先出し型の待ち行列
@@ -63,18 +68,14 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
6368
/**
6469
* この待ち行列の現在の状態
6570
*/
66-
#state?: AIQState
71+
readonly #state = new Uint8Array([AIQState.undefined])
6772

6873
/**
6974
* コンストラクタ
7075
*/
7176
constructor() {
7277
const resolveAsync = createAsyncResolver({
73-
finish: () => {
74-
const state = this.#state
75-
this.#state = 'finished'
76-
return state
77-
},
78+
finish: () => Atomics.exchange(this.#state, 0, AIQState.finished),
7879
resolvers: this.#resolvers,
7980
})
8081
this.#emitter.on('deq', async () => {
@@ -93,28 +94,36 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
9394
* @param cb 終端が読み取られた後に呼ばれるコールバック関数
9495
*/
9596
end(cb?: NoParameterCallback): Promise<void> {
96-
const state = this.#state
97-
if (state)
98-
throw new Error(state)
99-
this.#state = 'ending'
100-
return new Promise((resolve: Resolver<void>) => (
101-
this.#emitter.emit('enq', new Terminator(cb)),
102-
resolve()
103-
))
97+
return new Promise(
98+
(resolve: Resolver<void>, reject: SingleParameterAction<unknown>) => {
99+
const state = Atomics.compareExchange(
100+
this.#state,
101+
0,
102+
AIQState.undefined,
103+
AIQState.ending,
104+
)
105+
if (state !== AIQState.undefined)
106+
return reject(new Error(AIQState[state]))
107+
this.#emitter.emit('enq', new Terminator(cb))
108+
return resolve()
109+
}
110+
)
104111
}
105112

106113
/**
107114
* この待ち行列の末尾に要素を追加する
108115
* @param value 要素の値
109116
*/
110117
push(value: T): Promise<void> {
111-
const state = this.#state
112-
if (state)
113-
throw new Error(state)
114-
return new Promise((resolve: Resolver<void>) => (
115-
this.#emitter.emit('enq', value),
116-
resolve()
117-
))
118+
return new Promise(
119+
(resolve: Resolver<void>, reject: SingleParameterAction<unknown>) => {
120+
const state = Atomics.load(this.#state, 0)
121+
if (state !== AIQState.undefined)
122+
return reject(new Error(AIQState[state]))
123+
this.#emitter.emit('enq', value)
124+
return resolve()
125+
}
126+
)
118127
}
119128

120129
/**
@@ -185,15 +194,12 @@ class Terminator {
185194
try {
186195
const result = this.cb()
187196
if (result instanceof Promise)
188-
result.catch(reject).then(resolve)
189-
else
190-
resolve()
197+
return result.catch(reject).then(resolve)
191198
}
192199
catch (err: unknown) {
193-
reject(err)
200+
return reject(err)
194201
}
195-
else
196-
resolve()
202+
return resolve()
197203
})
198204
}
199205
}
@@ -219,8 +225,8 @@ const createAsyncResolver = <T>(param: AsyncResolverCreateParameter<T>) => {
219225
if (value instanceof Terminator) {
220226
const state = param.finish()
221227
await resolveAsync({ done: true } as IteratorResult<T>)
222-
if (state === 'ending')
223-
await value.call()
228+
assert(state === AIQState.ending)
229+
await value.call()
224230
}
225231
else
226232
await resolveAsync({

package.json

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,38 @@
33
"email": "km.8k6ce+github@gmail.com",
44
"name": "kei-g"
55
},
6+
"badges": [
7+
{
8+
"description": "license",
9+
"href": "https://img.shields.io/github/license/kei-g/async-iterable-queue",
10+
"url": "https://opensource.org/licenses/BSD-3-Clause"
11+
},
12+
{
13+
"description": "dependency",
14+
"href": "https://img.shields.io/librariesio/release/npm/async-iterable-queue?logo=nodedotjs",
15+
"url": "https://npmjs.com/package/async-iterable-queue?activeTab=dependencies"
16+
},
17+
{
18+
"description": "Travis CI",
19+
"href": "https://img.shields.io/travis/com/kei-g/async-iterable-queue?logo=travis",
20+
"url": "https://www.travis-ci.com/github/kei-g/async-iterable-queue"
21+
},
22+
{
23+
"description": "version",
24+
"href": "https://img.shields.io/npm/v/async-iterable-queue?logo=npm",
25+
"url": "https://npmjs.com/package/async-iterable-queue"
26+
},
27+
{
28+
"description": "maintainance",
29+
"href": "https://img.shields.io/npms-io/maintenance-score/async-iterable-queue?logo=npm",
30+
"url": "https://npms.io/search?q=async-iterable-queue"
31+
},
32+
{
33+
"description": "quality",
34+
"href": "https://img.shields.io/npms-io/quality-score/async-iterable-queue?logo=npm",
35+
"url": "https://npms.io/search?q=async-iterable-queue"
36+
}
37+
],
638
"bugs": {
739
"url": "https://github.com/kei-g/async-iterable-queue/issues"
840
},
@@ -41,7 +73,7 @@
4173
"main": "lib/async-iterable-queue.js",
4274
"name": "async-iterable-queue",
4375
"nyc": {
44-
"branches": 93,
76+
"branches": 100,
4577
"functions": 100,
4678
"lines": 100,
4779
"statements": 100

test/async-iterable-queue.spec.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { AsyncIterableQueue } from '../async-iterable-queue'
22
import { describe, it } from 'mocha'
33
import { expect } from 'chai'
4-
import { throws } from 'assert'
54

65
const source = [
76
Math.LN2,
@@ -23,7 +22,13 @@ describe('failure mission', async () => {
2322
for (const value of source)
2423
await q.push(value)
2524
await q.end()
26-
throws(() => q.end())
25+
let error: Error
26+
await q.end()
27+
.catch((reason: unknown) => {
28+
if (reason instanceof Error)
29+
error = reason
30+
})
31+
.finally(() => expect(error).instanceOf(Error))
2732
}
2833
await Promise.all([popAsync(q), pushAsync()])
2934
})
@@ -33,7 +38,13 @@ describe('failure mission', async () => {
3338
for (const value of source)
3439
await q.push(value)
3540
await q.end()
36-
throws(() => q.push(Math.SQRT2))
41+
let error: Error
42+
await q.push(Math.SQRT2)
43+
.catch((reason?: unknown) => {
44+
if (reason instanceof Error)
45+
error = reason
46+
})
47+
.finally(() => expect(error).instanceOf(Error))
3748
}
3849
await Promise.all([popAsync(q), pushAsync()])
3950
})
@@ -44,7 +55,7 @@ describe('failure mission', async () => {
4455
await q.push(value)
4556
await q.end(() => {
4657
throw new Error()
47-
})
58+
}).catch((reason?: unknown) => expect(reason).instanceOf(Error))
4859
}
4960
await Promise.all([popAsync(q), pushAsync()])
5061
})

0 commit comments

Comments
 (0)