@@ -3,9 +3,10 @@ import { identity } from '../util/identity.js';
33import { wrapWithAbort } from './operators/withabort.js' ;
44import { throwIfAborted } from '../aborterror.js' ;
55import { safeRace } from '../util/safeRace.js' ;
6+ import { returnAsyncIterator } from '../util/returniterator.js' ;
67
78// eslint-disable-next-line @typescript-eslint/no-empty-function
8- const NEVER_PROMISE = new Promise ( ( ) => { } ) ;
9+ const NEVER_PROMISE = new Promise < never > ( ( ) => { } ) ;
910
1011type MergeResult < T > = { value : T ; index : number } ;
1112
@@ -28,39 +29,46 @@ export class CombineLatestAsyncIterable<TSource> extends AsyncIterableX<TSource[
2829 const length = this . _sources . length ;
2930 const iterators = new Array < AsyncIterator < TSource > > ( length ) ;
3031 const nexts = new Array < Promise < MergeResult < IteratorResult < TSource > > > > ( length ) ;
31- let hasValueAll = false ;
32- const values = new Array < TSource > ( length ) ;
33- const hasValues = new Array < boolean > ( length ) ;
34- let active = length ;
3532
36- hasValues . fill ( false ) ;
33+ let active = length ;
34+ let allValuesAvailable = false ;
35+ const values = new Array < TSource > ( length ) ;
36+ const hasValues = new Array < boolean > ( length ) . fill ( false ) ;
3737
3838 for ( let i = 0 ; i < length ; i ++ ) {
3939 const iterator = wrapWithAbort ( this . _sources [ i ] , signal ) [ Symbol . asyncIterator ] ( ) ;
4040 iterators [ i ] = iterator ;
4141 nexts [ i ] = wrapPromiseWithIndex ( iterator . next ( ) , i ) ;
4242 }
4343
44- while ( active > 0 ) {
45- const next = safeRace ( nexts ) ;
46- const {
47- value : { value : value$ , done : done$ } ,
48- index,
49- } = await next ;
50- if ( done$ ) {
51- nexts [ index ] = < Promise < MergeResult < IteratorResult < TSource > > > > NEVER_PROMISE ;
52- active -- ;
53- } else {
54- values [ index ] = value$ ;
55- hasValues [ index ] = true ;
56-
57- const iterator$ = iterators [ index ] ;
58- nexts [ index ] = wrapPromiseWithIndex ( iterator$ . next ( ) , index ) ;
59-
60- if ( hasValueAll || ( hasValueAll = hasValues . every ( identity ) ) ) {
61- yield values ;
44+ try {
45+ while ( active > 0 ) {
46+ const next = safeRace ( nexts ) ;
47+
48+ const {
49+ value : { value, done } ,
50+ index,
51+ } = await next ;
52+
53+ if ( done ) {
54+ nexts [ index ] = NEVER_PROMISE ;
55+ active -- ;
56+ } else {
57+ values [ index ] = value ;
58+ hasValues [ index ] = true ;
59+ allValuesAvailable = allValuesAvailable || hasValues . every ( identity ) ;
60+
61+ nexts [ index ] = wrapPromiseWithIndex ( iterators [ index ] . next ( ) , index ) ;
62+
63+ if ( allValuesAvailable ) {
64+ yield values ;
65+ }
6266 }
6367 }
68+ } finally {
69+ for ( const iterator of iterators ) {
70+ await returnAsyncIterator ( iterator ) ;
71+ }
6472 }
6573 }
6674}
@@ -176,5 +184,5 @@ export function combineLatest<T, T2, T3, T4, T5, T6>(
176184 */
177185export function combineLatest < T > ( ...sources : AsyncIterable < T > [ ] ) : AsyncIterableX < T [ ] > ;
178186export function combineLatest < T > ( ...sources : any [ ] ) : AsyncIterableX < T [ ] > {
179- return new CombineLatestAsyncIterable < T > ( sources ) ;
187+ return new CombineLatestAsyncIterable ( sources ) ;
180188}
0 commit comments