Permalink
Cannot retrieve contributors at this time
197 lines (163 sloc)
4.94 KB
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
codeql-action/node_modules/p-map/index.js
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import AggregateError from 'aggregate-error'; | |
/** | |
An error to be thrown when the request is aborted by AbortController. | |
DOMException is thrown instead of this Error when DOMException is available. | |
*/ | |
export class AbortError extends Error { | |
constructor(message) { | |
super(); | |
this.name = 'AbortError'; | |
this.message = message; | |
} | |
} | |
/** | |
TODO: Remove AbortError and just throw DOMException when targeting Node 18. | |
*/ | |
const getDOMException = errorMessage => globalThis.DOMException === undefined | |
? new AbortError(errorMessage) | |
: new DOMException(errorMessage); | |
/** | |
TODO: Remove below function and just 'reject(signal.reason)' when targeting Node 18. | |
*/ | |
const getAbortedReason = signal => { | |
const reason = signal.reason === undefined | |
? getDOMException('This operation was aborted.') | |
: signal.reason; | |
return reason instanceof Error ? reason : getDOMException(reason); | |
}; | |
export default async function pMap( | |
iterable, | |
mapper, | |
{ | |
concurrency = Number.POSITIVE_INFINITY, | |
stopOnError = true, | |
signal, | |
} = {}, | |
) { | |
return new Promise((resolve, reject_) => { | |
if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) { | |
throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`); | |
} | |
if (typeof mapper !== 'function') { | |
throw new TypeError('Mapper function is required'); | |
} | |
if (!((Number.isSafeInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency >= 1)) { | |
throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`); | |
} | |
const result = []; | |
const errors = []; | |
const skippedIndexesMap = new Map(); | |
let isRejected = false; | |
let isResolved = false; | |
let isIterableDone = false; | |
let resolvingCount = 0; | |
let currentIndex = 0; | |
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator](); | |
const reject = reason => { | |
isRejected = true; | |
isResolved = true; | |
reject_(reason); | |
}; | |
if (signal) { | |
if (signal.aborted) { | |
reject(getAbortedReason(signal)); | |
} | |
signal.addEventListener('abort', () => { | |
reject(getAbortedReason(signal)); | |
}); | |
} | |
const next = async () => { | |
if (isResolved) { | |
return; | |
} | |
const nextItem = await iterator.next(); | |
const index = currentIndex; | |
currentIndex++; | |
// Note: `iterator.next()` can be called many times in parallel. | |
// This can cause multiple calls to this `next()` function to | |
// receive a `nextItem` with `done === true`. | |
// The shutdown logic that rejects/resolves must be protected | |
// so it runs only one time as the `skippedIndex` logic is | |
// non-idempotent. | |
if (nextItem.done) { | |
isIterableDone = true; | |
if (resolvingCount === 0 && !isResolved) { | |
if (!stopOnError && errors.length > 0) { | |
reject(new AggregateError(errors)); | |
return; | |
} | |
isResolved = true; | |
if (skippedIndexesMap.size === 0) { | |
resolve(result); | |
return; | |
} | |
const pureResult = []; | |
// Support multiple `pMapSkip`'s. | |
for (const [index, value] of result.entries()) { | |
if (skippedIndexesMap.get(index) === pMapSkip) { | |
continue; | |
} | |
pureResult.push(value); | |
} | |
resolve(pureResult); | |
} | |
return; | |
} | |
resolvingCount++; | |
// Intentionally detached | |
(async () => { | |
try { | |
const element = await nextItem.value; | |
if (isResolved) { | |
return; | |
} | |
const value = await mapper(element, index); | |
// Use Map to stage the index of the element. | |
if (value === pMapSkip) { | |
skippedIndexesMap.set(index, value); | |
} | |
result[index] = value; | |
resolvingCount--; | |
await next(); | |
} catch (error) { | |
if (stopOnError) { | |
reject(error); | |
} else { | |
errors.push(error); | |
resolvingCount--; | |
// In that case we can't really continue regardless of `stopOnError` state | |
// since an iterable is likely to continue throwing after it throws once. | |
// If we continue calling `next()` indefinitely we will likely end up | |
// in an infinite loop of failed iteration. | |
try { | |
await next(); | |
} catch (error) { | |
reject(error); | |
} | |
} | |
} | |
})(); | |
}; | |
// Create the concurrent runners in a detached (non-awaited) | |
// promise. We need this so we can await the `next()` calls | |
// to stop creating runners before hitting the concurrency limit | |
// if the iterable has already been marked as done. | |
// NOTE: We *must* do this for async iterators otherwise we'll spin up | |
// infinite `next()` calls by default and never start the event loop. | |
(async () => { | |
for (let index = 0; index < concurrency; index++) { | |
try { | |
// eslint-disable-next-line no-await-in-loop | |
await next(); | |
} catch (error) { | |
reject(error); | |
break; | |
} | |
if (isIterableDone || isRejected) { | |
break; | |
} | |
} | |
})(); | |
}); | |
} | |
export const pMapSkip = Symbol('skip'); |