Skip to content
Permalink
9bfb9ba527
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
285 lines (226 sloc) 5.92 KB
import pTimeout from 'p-timeout';
const normalizeEmitter = emitter => {
const addListener = emitter.on || emitter.addListener || emitter.addEventListener;
const removeListener = emitter.off || emitter.removeListener || emitter.removeEventListener;
if (!addListener || !removeListener) {
throw new TypeError('Emitter is not compatible');
}
return {
addListener: addListener.bind(emitter),
removeListener: removeListener.bind(emitter),
};
};
export function pEventMultiple(emitter, event, options) {
let cancel;
const returnValue = new Promise((resolve, reject) => {
options = {
rejectionEvents: ['error'],
multiArgs: false,
resolveImmediately: false,
...options,
};
if (!(options.count >= 0 && (options.count === Number.POSITIVE_INFINITY || Number.isInteger(options.count)))) {
throw new TypeError('The `count` option should be at least 0 or more');
}
// Allow multiple events
const events = [event].flat();
const items = [];
const {addListener, removeListener} = normalizeEmitter(emitter);
const onItem = (...arguments_) => {
const value = options.multiArgs ? arguments_ : arguments_[0];
// eslint-disable-next-line unicorn/no-array-callback-reference
if (options.filter && !options.filter(value)) {
return;
}
items.push(value);
if (options.count === items.length) {
cancel();
resolve(items);
}
};
const rejectHandler = error => {
cancel();
reject(error);
};
cancel = () => {
for (const event of events) {
removeListener(event, onItem);
}
for (const rejectionEvent of options.rejectionEvents) {
removeListener(rejectionEvent, rejectHandler);
}
};
for (const event of events) {
addListener(event, onItem);
}
for (const rejectionEvent of options.rejectionEvents) {
addListener(rejectionEvent, rejectHandler);
}
if (options.resolveImmediately) {
resolve(items);
}
});
returnValue.cancel = cancel;
if (typeof options.timeout === 'number') {
const timeout = pTimeout(returnValue, options.timeout);
timeout.cancel = cancel;
return timeout;
}
return returnValue;
}
export function pEvent(emitter, event, options) {
if (typeof options === 'function') {
options = {filter: options};
}
options = {
...options,
count: 1,
resolveImmediately: false,
};
const arrayPromise = pEventMultiple(emitter, event, options);
const promise = arrayPromise.then(array => array[0]); // eslint-disable-line promise/prefer-await-to-then
promise.cancel = arrayPromise.cancel;
return promise;
}
export function pEventIterator(emitter, event, options) {
if (typeof options === 'function') {
options = {filter: options};
}
// Allow multiple events
const events = [event].flat();
options = {
rejectionEvents: ['error'],
resolutionEvents: [],
limit: Number.POSITIVE_INFINITY,
multiArgs: false,
...options,
};
const {limit} = options;
const isValidLimit = limit >= 0 && (limit === Number.POSITIVE_INFINITY || Number.isInteger(limit));
if (!isValidLimit) {
throw new TypeError('The `limit` option should be a non-negative integer or Infinity');
}
if (limit === 0) {
// Return an empty async iterator to avoid any further cost
return {
[Symbol.asyncIterator]() {
return this;
},
async next() {
return {
done: true,
value: undefined,
};
},
};
}
const {addListener, removeListener} = normalizeEmitter(emitter);
let isDone = false;
let error;
let hasPendingError = false;
const nextQueue = [];
const valueQueue = [];
let eventCount = 0;
let isLimitReached = false;
const valueHandler = (...arguments_) => {
eventCount++;
isLimitReached = eventCount === limit;
const value = options.multiArgs ? arguments_ : arguments_[0];
if (nextQueue.length > 0) {
const {resolve} = nextQueue.shift();
resolve({done: false, value});
if (isLimitReached) {
cancel();
}
return;
}
valueQueue.push(value);
if (isLimitReached) {
cancel();
}
};
const cancel = () => {
isDone = true;
for (const event of events) {
removeListener(event, valueHandler);
}
for (const rejectionEvent of options.rejectionEvents) {
removeListener(rejectionEvent, rejectHandler);
}
for (const resolutionEvent of options.resolutionEvents) {
removeListener(resolutionEvent, resolveHandler);
}
while (nextQueue.length > 0) {
const {resolve} = nextQueue.shift();
resolve({done: true, value: undefined});
}
};
const rejectHandler = (...arguments_) => {
error = options.multiArgs ? arguments_ : arguments_[0];
if (nextQueue.length > 0) {
const {reject} = nextQueue.shift();
reject(error);
} else {
hasPendingError = true;
}
cancel();
};
const resolveHandler = (...arguments_) => {
const value = options.multiArgs ? arguments_ : arguments_[0];
// eslint-disable-next-line unicorn/no-array-callback-reference
if (options.filter && !options.filter(value)) {
return;
}
if (nextQueue.length > 0) {
const {resolve} = nextQueue.shift();
resolve({done: true, value});
} else {
valueQueue.push(value);
}
cancel();
};
for (const event of events) {
addListener(event, valueHandler);
}
for (const rejectionEvent of options.rejectionEvents) {
addListener(rejectionEvent, rejectHandler);
}
for (const resolutionEvent of options.resolutionEvents) {
addListener(resolutionEvent, resolveHandler);
}
return {
[Symbol.asyncIterator]() {
return this;
},
async next() {
if (valueQueue.length > 0) {
const value = valueQueue.shift();
return {
done: isDone && valueQueue.length === 0 && !isLimitReached,
value,
};
}
if (hasPendingError) {
hasPendingError = false;
throw error;
}
if (isDone) {
return {
done: true,
value: undefined,
};
}
return new Promise((resolve, reject) => {
nextQueue.push({resolve, reject});
});
},
async return(value) {
cancel();
return {
done: isDone,
value,
};
},
};
}
export {TimeoutError} from 'p-timeout';