我一直在研究这个数据收集模块,它应该从不同的比特币市场获取数据并将所有接收到的数据标准化,以便可以将其插入到 mongodb 数据库中以供以后使用。(模块写在node 4.3.x中)
我遇到的问题是我需要数据在数据库中的表示方式保持一致。因此,每当请求超时时,我不想捕获 get 请求并记录错误,而是要解析“0”。此外,接收到的数据包含需要按顺序削减的交易。这需要发生,以便可以正确削减交易,因此数据不会被写入两次。为此,我实现了两个队列:
1:TimestampQueue - 保存时间戳。[0] 中的时间戳是下一个预期响应
2:objectQueue - 保存收到的响应
=> 只要 objectQueue[0] 中的对象等于 timestampQueue[0] 中的时间戳 => 进行数据操作并插入数据库。
问题在于应该捕获超时的 axios.get 请求并没有始终如一地做到这一点。
它发生在随机时间范围之后,但平均而言,队列会在 2 小时后卡住。
为了让事情更清楚,这里有一些重要的代码片段: httpclient 发出 axios 请求:
get(url) {
return this.instance.get(url) //instance just defined a timeout. Nothing special
.then(response => {
return response.data;
})
.catch(error => {
throw error; //THIS SEEMINGLY DOESN'T GET EXECUTED IN THE DESCRIBED CASE
});
}
现在解析请求的marketHandler:
getMethodFromMarket(method, market, timestamp){
if(this.markets[market]){
if(this.markets[market].methods[method]) {
var url = this.markets[market].methods[method];
let result = {};
result[method] = {};
result[method][market] = {};
return this.client.get(url)
.then(data => {
result[method][market] = data;
log.debug("Successfully received " + method + " for " + market + " : " + timestamp);
return result;
})
.catch(err => {
result[method][market] = 0;
log.error(new Error("Failed to get " + method + " for " + market + ": " + timestamp));
log.error(err);
return result;
});
} else{
return Promise.reject(new Error(method + " not available for " + market));
}
} else {
return Promise.reject(new Error("Market not specified in config"));
}
}
向所有已定义市场(针对一种方法)发出请求并将它们连接到一个对象中的代码:
//returns promise to get *method* from all markets specified in
//config.json
getAllMarkets(method, timestamp){
let getMarketsPromises = [];
let result = {};
result[method] = {};
Object.keys(this.markets).forEach(market => {
result[method][market] = {};
getMarketsPromises.push(this.getMethodFromMarket(method, market, timestamp));
});
return Promise.all(getMarketsPromises)
.then(results => {
for(let i = 0; i < results.length; i++){
let market = Object.keys(results[i][method])[0];
result[method][market] = results[i][method][market];
}
log.debug("Got all markets for " + method + " for " + timestamp);
return result;
})
}
对所有方法和市场发出请求并将它们连接到从不同模块操作并插入数据库的最终对象中的代码:
//returns promise to get trades and depths from markets specified in
//config.json
getEverything(timestamp){
let getMethodPromises = [];
let result = {timestamp};
this.methods.forEach(method => {
result[method] = {};
getMethodPromises.push(this.getAllMarkets(method, timestamp))
});
return Promise.all(getMethodPromises)
.then(results =>{
for(let i = 0; i < results.length; i++){
let method = Object.keys(results[i])[0];
result[method] = results[i][method];
}
log.debug("Got everything for " + timestamp);
return result;
})
}
我已经测试了整个过程,没有任何数据操作。只有那些功能并将其插入数据库。
2个队列的实现:
//handles the incoming responses from markets and sorts
//them according to their timestamp
queueResponse(marketInfo){
this.marketInfoQueue.push(marketInfo);
this.marketInfoQueue.sort(function(a, b){
return a.timestamp - b.timestamp;
})
}
//returns queued Responses in order of timestamps.
getQueuedResponses(){
var i = 0;
var results = [];
log.debug("TimestampQueue: "+ this.timestampQueue[0] + " | objectQueue: " + this.marketInfoQueue[0].timestamp);
while(this.marketInfoQueue[i] && this.timestampQueue[i] == this.marketInfoQueue[i].timestamp){
results.push(this.marketInfoQueue.shift());
this.timestampQueue.shift();
i++;
}
return results;
}
//pushes new timestamp into timestampQueue to keep
//incoming responses in order
queueTimestamp(timestamp){
this.timestampQueue.push(timestamp);
}
我已经尝试解决这个问题超过 3 周了,我完全一无所知。
TLDR:Axios 获取请求未解决或拒绝。即使在 httpClient 模块中使用的实例中定义了 5000 毫秒的超时。