为此,我想到了两种方法。批处理和滑动窗口方法。批处理可能更容易,但使用滑动窗口将是更有效的实现。
使用 Promise.all() 进行批处理
此方法创建批量请求,最多达到指定的batchSize
只有当一批请求全部完成后,才会发出下一批请求。
您需要在此处添加一些错误处理,以防请求失败。
import fetch from "node-fetch";
// list of items that you might want to use to compose your URL (not actually used here)
let itemObject = [
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
];
(async () => {
// number of concurrent requests in one batch
const batchSize = 4;
// request counter
let curReq = 0;
// as long as there are items in the list continue to form batches
while (curReq < itemObject.length) {
// a batch is either limited by the batch size or it is smaller than the batch size when there are less items required
const end = itemObject.length < curReq + batchSize ? itemObject.length: curReq + batchSize;
// we know the number of concurrent request so reserve memory for this
const concurrentReq = new Array(batchSize);
// issue one request for each item in the batch
for (let index = curReq; index < end; index++) {
concurrentReq.push(fetch("https://postman-echo.com/get"))
console.log(`sending request ${curReq}...`)
curReq++;
}
// wait until all promises are done or one promise is rejected
await Promise.all(concurrentReq);
console.log(`requests ${curReq - batchSize}-${curReq} done.`)
}
})();
预期结果:
sending request 0...
sending request 1...
sending request 2...
sending request 3...
requests 0-4 done.
sending request 4...
sending request 5...
sending request 6...
sending request 7...
requests 4-8 done.
sending request 8...
sending request 9...
sending request 10...
sending request 11...
requests 8-12 done.
带信号量的滑动窗口
这种方法使用了一个滑动窗口 https://en.wikipedia.org/wiki/Sliding_window_protocol并在另一个请求完成后立即安排一个新请求,同时始终保持请求计数低于或等于最大数量n
任意时刻的并发请求。
你需要实现的是信号 https://en.wikipedia.org/wiki/Semaphore_(programming).
JavaScript 中有一个用于此目的的库,名为异步互斥体 https://github.com/DirtyHairy/async-mutex.
下面是使用该库向 Postman Echo API 同时发送 2 个请求的示例程序。在信号量允许的情况下,永远不会有更多的请求同时运行(在您的情况下,限制为 5,这里是 2)。
import { Semaphore } from "async-mutex";
import fetch from "node-fetch";
// list of items that you might want to use to compose your URL (not actually used here)
let itemObject = [
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
{ itemName: "", itemID: "" },
];
(async () => {
// allow two concurrent requests (adjust for however many are required)
const semaphore = new Semaphore(2);
itemObject.forEach(async (item, idx) => {
// acquire the semaphore
const [value, release] = await semaphore.acquire();
// at this point the semaphore has been acquired and the job needs to be done
try {
console.log(`sending request ${idx}...`)
const response = await fetch("https://postman-echo.com/get")
if(!response.ok){
console.log(`request failed with status code ${response.status}`)
}
}
catch (error) {
console.log("request failed.")
}
finally {
console.log(`request ${idx} done...`)
// release the semaphore again so a new request can be issued
release();
}
})
})();
预期输出(顺序可能有所不同):
sending request 0...
sending request 1...
request 1 done...
sending request 2...
request 2 done...
sending request 3...
request 3 done...
sending request 4...
request 0 done...
sending request 5...
request 4 done...
sending request 6...
request 5 done...
sending request 7...
request 6 done...
sending request 8...
request 7 done...
sending request 9...
request 8 done...
sending request 10...
request 9 done...
sending request 11...
request 10 done...
request 11 done...