add sequence partition operation
This commit is contained in:
@@ -456,6 +456,14 @@ export abstract class BaseAsyncSequence<TElement> extends AsyncSequenceMarker im
|
||||
return new OrderByAsyncSequence<TElement, TBy>(this, true, selector, comparer);
|
||||
}
|
||||
|
||||
partition(equater?: MaybeAsyncEquater<TElement>): AsyncSequence<Awaited<TElement>[]> {
|
||||
return new PartitionAsyncSequence<TElement>(this, equater);
|
||||
}
|
||||
|
||||
partitionBy<TBy>(selector: MaybeAsyncConverter<TElement, TBy>, equater?: MaybeAsyncEquater<TBy>): AsyncSequence<Awaited<TElement>[]> {
|
||||
return new PartitionByAsyncSequence<TElement, TBy>(this, selector, equater);
|
||||
}
|
||||
|
||||
distinct(equater?: MaybeAsyncEquater<TElement>): AsyncSequence<TElement> {
|
||||
return new DistinctAsyncSequence<TElement>(this, equater);
|
||||
}
|
||||
@@ -849,6 +857,14 @@ export class DelegatedAsyncSequence<TElement> extends AsyncSequenceMarker implem
|
||||
return this.#sequence.orderByDescending(selector, comparer);
|
||||
}
|
||||
|
||||
partition(equater?: MaybeAsyncEquater<TElement> | undefined): AsyncSequence<Awaited<TElement>[]> {
|
||||
return this.#sequence.partition(equater);
|
||||
}
|
||||
|
||||
partitionBy<TBy>(selector: MaybeAsyncConverter<TElement, TBy>, equater?: MaybeAsyncEquater<TBy> | undefined): AsyncSequence<Awaited<TElement>[]> {
|
||||
return this.#sequence.partitionBy(selector, equater);
|
||||
}
|
||||
|
||||
distinct(equater?: ((first: TElement, second: TElement) => MaybePromise<boolean>) | undefined): AsyncSequence<TElement> {
|
||||
return this.#sequence.distinct(equater);
|
||||
}
|
||||
@@ -2141,3 +2157,62 @@ export class CacheAsyncSequence<T> extends BaseAsyncSequence<T> {
|
||||
yield* this.#cache ??= await this.#sequence.toArray();
|
||||
}
|
||||
}
|
||||
|
||||
class PartitionAsyncSequence<T> extends BaseAsyncSequence<Awaited<T>[]> {
|
||||
readonly #sequence: AsyncSequence<T>;
|
||||
readonly #equater: MaybeAsyncEquater<T>;
|
||||
|
||||
constructor(sequence: AsyncSequence<T>, equater: MaybeAsyncEquater<T> | undefined) {
|
||||
super();
|
||||
|
||||
this.#sequence = sequence;
|
||||
this.#equater = equater ?? strictEquals;
|
||||
}
|
||||
|
||||
override async *iterator() {
|
||||
const partitions = createAsyncEqualityMap<T, Awaited<T>[]>(this.#equater);
|
||||
|
||||
for await (const obj of this.#sequence) {
|
||||
const partition = await partitions.get(obj);
|
||||
|
||||
if (partition) {
|
||||
partition.push(obj);
|
||||
} else {
|
||||
await partitions.set(obj, [obj]);
|
||||
}
|
||||
}
|
||||
|
||||
yield* partitions.values();
|
||||
}
|
||||
}
|
||||
|
||||
class PartitionByAsyncSequence<TElement, TBy> extends BaseAsyncSequence<Awaited<TElement>[]> {
|
||||
readonly #sequence: AsyncSequence<TElement>;
|
||||
readonly #selector: MaybeAsyncConverter<TElement, TBy>;
|
||||
readonly #equater: MaybeAsyncEquater<TBy>;
|
||||
|
||||
constructor(sequence: AsyncSequence<TElement>, selector: MaybeAsyncConverter<TElement, TBy>, equater: MaybeAsyncEquater<TBy> | undefined) {
|
||||
super();
|
||||
|
||||
this.#sequence = sequence;
|
||||
this.#selector = selector;
|
||||
this.#equater = equater ?? strictEquals;
|
||||
}
|
||||
|
||||
override async *iterator() {
|
||||
const partitions = createAsyncEqualityMap<TBy, Awaited<TElement>[]>(this.#equater);
|
||||
|
||||
for await (const obj of this.#sequence) {
|
||||
const key = await this.#selector(obj);
|
||||
const partition = await partitions.get(key);
|
||||
|
||||
if (partition) {
|
||||
partition.push(obj);
|
||||
} else {
|
||||
await partitions.set(key, [obj]);
|
||||
}
|
||||
}
|
||||
|
||||
yield* partitions.values();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user