@@ -20,7 +20,10 @@ class AIQAsyncIterator<T> implements AsyncIterator<T> {
20
20
* @param emitter 事象発生器への参照
21
21
* @param resolvers 反復結果解決関数の配列への参照
22
22
*/
23
- constructor ( emitter : EventEmitter , resolvers : IteratorResultResolver < T > [ ] ) {
23
+ constructor (
24
+ emitter : EventEmitter ,
25
+ resolvers : IteratorResultResolver < T > [ ] ,
26
+ ) {
24
27
this . #emitter = emitter
25
28
this . #resolvers = resolvers
26
29
}
@@ -30,10 +33,14 @@ class AIQAsyncIterator<T> implements AsyncIterator<T> {
30
33
* @returns 次の要素
31
34
*/
32
35
next ( ) : Promise < IteratorResult < T > > {
33
- return new Promise ( ( resolve : IteratorResultResolver < T > ) => (
34
- this . #resolvers. push ( resolve ) ,
35
- this . #emitter. emit ( 'deq' )
36
- ) )
36
+ return new Promise (
37
+ (
38
+ resolve : IteratorResultResolver < T > ,
39
+ ) => {
40
+ this . #resolvers. push ( resolve )
41
+ this . #emitter. emit ( 'deq' )
42
+ }
43
+ )
37
44
}
38
45
}
39
46
@@ -75,37 +82,62 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
75
82
*/
76
83
constructor ( ) {
77
84
this . #state[ 0 ] = AIQState . undefined
78
- const resolveAsync = createAsyncResolver ( {
79
- finish : ( ) => Atomics . exchange ( this . #state, 0 , AIQState . finished ) ,
80
- resolvers : this . #resolvers,
81
- } )
82
- this . #emitter. on ( 'deq' , async ( ) => {
83
- while ( this . #queue. length && this . #resolvers. length )
84
- await resolveAsync ( this . #queue. shift ( ) )
85
- } )
86
- this . #emitter. on ( 'enq' , async ( value : Terminatable < T > ) =>
87
- this . #resolvers. length ?
88
- await resolveAsync ( value ) :
89
- this . #queue. push ( value )
85
+ const resolveAsync = createAsyncResolver (
86
+ {
87
+ finish : ( ) =>
88
+ Atomics . exchange (
89
+ this . #state,
90
+ 0 ,
91
+ AIQState . finished ,
92
+ ) ,
93
+ resolvers : this . #resolvers,
94
+ }
95
+ )
96
+ this . #emitter. on (
97
+ 'deq' ,
98
+ async ( ) => {
99
+ while ( this . #queue. length
100
+ && this . #resolvers. length )
101
+ await resolveAsync (
102
+ this . #queue. shift ( )
103
+ )
104
+ }
105
+ )
106
+ this . #emitter. on (
107
+ 'enq' ,
108
+ async ( value : Terminatable < T > ) =>
109
+ this . #resolvers. length
110
+ ? await resolveAsync ( value )
111
+ : this . #queue. push ( value )
90
112
)
91
113
}
92
114
93
115
/**
94
116
* この待ち行列への要素の追加を終了する
95
117
* @param cb 終端が読み取られた後に呼ばれるコールバック関数
96
118
*/
97
- end ( cb ?: NoParameterCallback ) : Promise < void > {
119
+ end (
120
+ cb ?: NoParameterCallback ,
121
+ ) : Promise < void > {
98
122
return new Promise (
99
- ( resolve : Resolver < void > , reject : SingleParameterAction < unknown > ) => {
123
+ (
124
+ resolve : Resolver < void > ,
125
+ reject : SingleParameterAction < unknown > ,
126
+ ) => {
100
127
const state = Atomics . compareExchange (
101
128
this . #state,
102
129
0 ,
103
130
AIQState . undefined ,
104
131
AIQState . ending ,
105
132
)
106
133
if ( state !== AIQState . undefined )
107
- return reject ( new Error ( AIQState [ state ] ) )
108
- this . #emitter. emit ( 'enq' , new Terminator ( cb ) )
134
+ return reject (
135
+ new Error ( AIQState [ state ] )
136
+ )
137
+ this . #emitter. emit (
138
+ 'enq' ,
139
+ new Terminator ( cb ) ,
140
+ )
109
141
return resolve ( )
110
142
}
111
143
)
@@ -117,11 +149,22 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
117
149
*/
118
150
push ( value : T ) : Promise < void > {
119
151
return new Promise (
120
- ( resolve : Resolver < void > , reject : SingleParameterAction < unknown > ) => {
121
- const state = Atomics . load ( this . #state, 0 )
152
+ (
153
+ resolve : Resolver < void > ,
154
+ reject : SingleParameterAction < unknown > ,
155
+ ) => {
156
+ const state = Atomics . load (
157
+ this . #state,
158
+ 0 ,
159
+ )
122
160
if ( state !== AIQState . undefined )
123
- return reject ( new Error ( AIQState [ state ] ) )
124
- this . #emitter. emit ( 'enq' , value )
161
+ return reject (
162
+ new Error ( AIQState [ state ] )
163
+ )
164
+ this . #emitter. emit (
165
+ 'enq' ,
166
+ value ,
167
+ )
125
168
return resolve ( )
126
169
}
127
170
)
@@ -132,7 +175,10 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
132
175
* @returns 非同期反復子
133
176
*/
134
177
[ Symbol . asyncIterator ] ( ) : AsyncIterator < T > {
135
- return new AIQAsyncIterator ( this . #emitter, this . #resolvers)
178
+ return new AIQAsyncIterator (
179
+ this . #emitter,
180
+ this . #resolvers,
181
+ )
136
182
}
137
183
}
138
184
@@ -155,22 +201,26 @@ type AsyncResolverCreateParameter<T> = {
155
201
/**
156
202
* 反復結果解決関数型
157
203
*/
158
- type IteratorResultResolver < T > = Resolver < IteratorResult < T > >
204
+ type IteratorResultResolver < T > =
205
+ Resolver < IteratorResult < T > >
159
206
160
207
/**
161
208
* 引数無しコールバック関数型
162
209
*/
163
- type NoParameterCallback = ( ) => PromiseLike < void > | void
210
+ type NoParameterCallback =
211
+ ( ) => PromiseLike < void > | void
164
212
165
213
/**
166
214
* 解決関数型
167
215
*/
168
- type Resolver < T > = SingleParameterAction < T >
216
+ type Resolver < T > =
217
+ SingleParameterAction < T >
169
218
170
219
/**
171
220
* 引数1個の関数型
172
221
*/
173
- type SingleParameterAction < T > = ( arg : T ) => void
222
+ type SingleParameterAction < T > =
223
+ ( arg : T ) => void
174
224
175
225
/**
176
226
* 終端
@@ -180,59 +230,80 @@ class Terminator {
180
230
* コンストラクタ
181
231
* @param cb コールバック関数
182
232
*/
183
- constructor ( private readonly cb ?: NoParameterCallback ) {
233
+ constructor (
234
+ private readonly cb ?: NoParameterCallback ,
235
+ ) {
184
236
}
185
237
186
238
/**
187
239
* コールバック関数を呼び出す
188
240
*/
189
241
call ( ) : Promise < void > {
190
- return new Promise ( (
191
- resolve : Resolver < void > ,
192
- reject : SingleParameterAction < unknown > ,
193
- ) => {
194
- if ( this . cb )
195
- try {
196
- const result = this . cb ( )
197
- if ( result instanceof Promise )
198
- return result . catch ( reject ) . then ( resolve )
199
- }
200
- catch ( err : unknown ) {
201
- return reject ( err )
202
- }
203
- return resolve ( )
204
- } )
242
+ return new Promise (
243
+ (
244
+ resolve : Resolver < void > ,
245
+ reject : SingleParameterAction < unknown > ,
246
+ ) => {
247
+ if ( this . cb )
248
+ try {
249
+ const result = this . cb ( )
250
+ if ( result instanceof Promise )
251
+ return result . catch ( reject ) . then ( resolve )
252
+ }
253
+ catch ( err : unknown ) {
254
+ return reject ( err )
255
+ }
256
+ return resolve ( )
257
+ }
258
+ )
205
259
}
206
260
}
207
261
208
262
/**
209
263
* 終端可能型
210
264
*/
211
- type Terminatable < T > = Terminator | T
265
+ type Terminatable < T > =
266
+ Terminator | T
212
267
213
268
/**
214
269
* 反復結果解決関数を非同期的に処理する関数を作成する
215
270
* @param param パラメータ
216
271
* @returns 反復結果解決関数を非同期的に処理する関数を返す
217
272
*/
218
- const createAsyncResolver = < T > ( param : AsyncResolverCreateParameter < T > ) => {
219
- const resolveAsync = ( result : IteratorResult < T > ) =>
220
- new Promise ( ( callback : Resolver < void > ) => {
221
- const resolver = param . resolvers . shift ( )
222
- resolver ( result )
223
- callback ( )
224
- } )
225
- return async ( value : Terminatable < T > ) => {
273
+ const createAsyncResolver = < T > (
274
+ param : AsyncResolverCreateParameter < T > ,
275
+ ) => {
276
+ const resolveAsync = (
277
+ result : IteratorResult < T > ,
278
+ ) =>
279
+ new Promise (
280
+ (
281
+ callback : Resolver < void > ,
282
+ ) => {
283
+ const resolver = param . resolvers . shift ( )
284
+ resolver ( result )
285
+ callback ( )
286
+ }
287
+ )
288
+ return async (
289
+ value : Terminatable < T > ,
290
+ ) => {
226
291
if ( value instanceof Terminator ) {
227
292
const state = param . finish ( )
228
- await resolveAsync ( { done : true } as IteratorResult < T > )
293
+ await resolveAsync (
294
+ {
295
+ done : true
296
+ } as IteratorResult < T >
297
+ )
229
298
assert ( state === AIQState . ending )
230
299
await value . call ( )
231
300
}
232
301
else
233
- await resolveAsync ( {
234
- done : false ,
235
- value,
236
- } )
302
+ await resolveAsync (
303
+ {
304
+ done : false ,
305
+ value,
306
+ }
307
+ )
237
308
}
238
309
}
0 commit comments