我正在尝试使用 NodeJS worker_threads 发出并行 HTTPs 请求。除了https.get()
函数之外,一切都并行工作。它阻止其余的请求。这是我到目前为止所尝试的。
import 'source-map-support/register';
import http from 'http';
import https, { RequestOptions } from 'https';
import { URL } from 'url';
import { isMainThread, parentPort, Worker, workerData } from 'worker_threads';
export class Downloader {
downloadStringsAsync<Type>(urls: string[],
callback: Function,
timeout?: number,
parameters?: Record<string, any>): Promise<Type[]> {
let threadCount = 8;
return new Promise<Type[]>((resolve, reject) => {
// if thread count is greater than the number of urls...
if (urls.length < threadCount) {
// we set the number of urls as thread count...
threadCount = urls.length;
}
const urlsPerThread = Math.floor(urls.length / threadCount);
let remainingUrls = urls.length % threadCount;
let startIndex = 0;
let endIndex = urlsPerThread;
let count = 0;
let contents: Type[] = [];
for (let i = 0; i < threadCount; i++) {
if (remainingUrls > 0) {
endIndex++;
remainingUrls--;
}
// spawns new worker thread...
this.spawnWorkerAsync<Type>({
identifier: i,
startIndex: startIndex,
endIndex: endIndex,
timeout: timeout,
urls: urls
}, callback, parameters).then((_contents) => {
contents = contents.concat(_contents);
count++;
if (count === threadCount) {
resolve(contents);
}
}).catch(error => {
console.log(`An error occurred in download worker thread.`, {
identifier: i,
startIndex: startIndex,
endIndex: endIndex,
error: error,
});
reject(error);
});
startIndex = endIndex;
endIndex += urlsPerThread;
}
});
}
private spawnWorkerAsync<Type>(workerData: Record<string, any>,
callback: Function,
parameters?: Record<string, any>): Promise<Type[]> {
return new Promise((resolve, reject) => {
const {startIndex, endIndex} = workerData;
let count = 0;
const urlCount = endIndex - startIndex;
const contents: Type[] = [];
const worker = new Worker(__filename, {
workerData: workerData
});
worker.on('message', (value) => {
const content = callback(value, parameters);
contents.push(content);
count++;
count === urlCount && resolve(contents);
});
worker.on('error', (error) => {
console.log(error);
reject(error);
});
worker.on('exit', (exitCode) => {
if (exitCode !== 0) {
const error = new Error(`Worker stopped with exit code ${exitCode}`);
console.log(error);
reject(error);
}
});
});
}
private static extractPath(host: string, urlString: string): string {
const indexOfHost = urlString.indexOf(host);
if (indexOfHost === -1) { return ''; }
const path = urlString.substring(indexOfHost + host.length);
return path;
}
private static downloadStringAsync(urlString: string, timeout?: number): Promise<string> {
return new Promise<string>((resolve, reject) => {
// parses the URL...
const url = new URL(urlString);
// checks if protocol is 'http' or 'https'...
const isHttpsProtocol = urlString.startsWith('https://');
// selects appropriate http client based on URL...
const httpClient = isHttpsProtocol ? https : http;
// creating agent and setting 'keepAlive'...
const agent = new httpClient.Agent({ keepAlive: true });
// preparing request options...
const requestOptions: RequestOptions = {
method: 'GET',
host: url.host,
port: isHttpsProtocol ? 443 : 80,
path: this.extractPath(url.host, urlString),
agent: agent,
headers: { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36' },
};
// requests for content...
// ################## THIS IS WHERE THE BLOCK IS. REQUESTS WAIT HERE FOR OTHER REQUESTS TO FINISH #########################
const request = httpClient.get(requestOptions, (response) => {
const { statusCode } = response;
if (statusCode === 200) {
// initializing content with empty string...
let content = '';
// when data is received, it appends received chunk to content...
response.on('data', (chunk) => {
content += chunk;
});
// when data receive ends, it resolves content...
response.on('end', () => {
resolve(content);
});
} else if (response.statusCode === 301 || response.statusCode === 302) {
const { location } = response.headers;
if (!location) {
reject(new Error(`Server responded with status code ${statusCode} for URL ${urlString} with no location set in the header.`));
return;
}
// NOTE: NEED TO ADD SUPPORT FOR RELATIVE PATH IN LOCATION HEADER...
// follows redirects until 200 status code is received...
this.downloadStringAsync(location)
.then(content => resolve(content))
.catch(error => reject(error));
} else {
reject(new Error(`Server responded with status code ${statusCode} for URL ${urlString}.`));
}
});
// handles error...
request.on('error', error => { reject(error); });
// if timeout is provided, we set request timeout...
timeout && request.setTimeout(timeout, () => {
// destroys the request object on timeout...
request.destroy();
// rejects a new error...
reject(new Error(`Request timed out for URL ${urlString} after ${timeout / 1000.0} seconds.`));
});
});
}
public static executeWorkerTask(): void {
const {
startIndex, endIndex, urls,
identifier, timeout,
} = workerData;
for (let i = startIndex; i < endIndex; i++) {
this.downloadStringAsync(urls[i], timeout)
.then((content) => {
parentPort?.postMessage({
identifier: identifier,
url: urls[i],
index: i,
content: content,
});
})
.catch((error) => {
parentPort?.postMessage({
identifier: identifier,
url: urls[i],
index: i,
error: error,
});
});
}
}
}
// if this file isn't executed by main thread...
if (!isMainThread) {
// executes worker thread task...
Downloader.executeWorkerTask();
} else {
const downloader = new Downloader();
downloader.downloadStringsAsync<string>([
'https://www.google.com',
'https://www.wikipedia.com',
'https://www.facebook.com',
'https://www.youtube.com',
'https://www.wwe.com',
], (response: any, parameters?: Record<string, any>) => {
const {url, content, error, identifier} = response;
if (error || !content || content.length === 0) { return ''; }
return content;
}).then(contents => {
console.log(contents.length);
}).catch(error => {
console.log(error);
});
}
代码正在运行,但 HTTP 请求是同步发出的,等待彼此。非常感谢任何帮助。
请不要建议任何第 3 方模块。我正在使用带有 target =“es2020”和 lib = [“es2020”] 的 Typescript v4.3.4。
提前致谢。