21

我正在尝试在 Dart 中编写一个可以并行处理多个请求的 HTTP 服务器。到目前为止,我一直未能成功实现“并行”部分。

这是我一开始尝试的:

import 'dart:io';

main() {
  HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
    server.listen((HttpRequest request) {
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.start();
      while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
      request.response.statusCode = HttpStatus.OK;
      request.response.write(stopwatch.elapsedMilliseconds.toString());
      request.response.close().catchError(print);
    });
  });
}

对于每个请求,它都会忙工作一秒钟,然后完成。我让它以这种方式处理请求,以便它的时间是可预测的,所以我可以很容易地在 Windows 任务管理器中看到请求的效果(CPU 核心跳到 100% 的使用率)。

我可以说这不是并行处理请求,因为:

  1. 如果我加载几个浏览器选项卡http://example:8080/然后全部刷新它们,这些选项卡会依次加载,每个选项卡之间大约 1 秒。

  2. 如果我将负载测试工具wrk与这些设置一起使用... wrk -d 10 -c 8 -t 8 http://example:8080/ ...它会在我给它的 10 秒内完成 5 到 8 个请求。如果服务器正在使用我所有的 8 个内核,我预计会有接近 80 个请求的数字。

  3. 当我在 wrk 测试期间打开 Windows 任务管理器时,我观察到我的核心只有一个接近 100% 的使用率,其余的几乎都处于空闲状态。

所以,然后我尝试使用隔离,希望为每个请求手动生成一个新的隔离/线程:

import 'dart:io';
import 'dart:isolate';

main() {
  HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
    server.listen((HttpRequest request) {
      spawnFunction(handleRequest).send(request);
    });
  });
}

handleRequest() {
  port.receive((HttpRequest request, SendPort sender) {
    Stopwatch stopwatch = new Stopwatch();
    stopwatch.start();
    while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
    request.response.statusCode = HttpStatus.OK;
    request.response.write(stopwatch.elapsedMilliseconds.toString());
    request.response.close().catchError(print);
  });
}

这根本不起作用。它不喜欢我试图将 HttpRequest 作为消息发送给隔离。这是错误:

#0      _SendPortImpl._sendInternal (dart:isolate-patch/isolate_patch.dart:122:3)
#1      _SendPortImpl._sendNow (dart:isolate-patch/isolate_patch.dart:95:18)
#2      _SendPortImpl.send (dart:isolate-patch/isolate_patch.dart:91:18)
#3      main.<anonymous closure>.<anonymous closure> (file:///C:/Development/dartbenchmark/simple2.dart:7:40)
#4      _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#5      _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#6      _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#7      _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#8      _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#9      StreamController.add (dart:async/stream_controller.dart:10:35)
#10     _HttpServer._handleRequest (http_impl.dart:1261:20)
#11     _HttpConnection._HttpConnection.<anonymous closure> (http_impl.dart:1188:33)
#12     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#13     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#14     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#15     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#16     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#17     StreamController.add (dart:async/stream_controller.dart:10:35)
#18     _HttpParser._doParse (http_parser.dart:415:26)
#19     _HttpParser._parse (http_parser.dart:161:15)
#20     _HttpParser._onData._onData (http_parser.dart:509:11)
#21     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#22     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#23     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#24     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#25     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#26     StreamController.add (dart:async/stream_controller.dart:10:35)
#27     _Socket._onData._onData (dart:io-patch/socket_patch.dart:726:42)
#28     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#29     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#30     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#31     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#32     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#33     StreamController.add (dart:async/stream_controller.dart:10:35)
#34     _RawSocket._RawSocket.<anonymous closure> (dart:io-patch/socket_patch.dart:452:52)
#35     _NativeSocket.multiplex (dart:io-patch/socket_patch.dart:253:18)
#36     _NativeSocket.connectToEventHandler.<anonymous closure> (dart:io-patch/socket_patch.dart:338:54)
#37     _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

Unhandled exception:
Illegal argument(s): Illegal argument in isolate message : (object is a closure)
#0      _throwDelayed.<anonymous closure> (dart:async/stream_impl.dart:22:5)
#1      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:15:17)
#2      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:25:9)
#3      Timer.run.<anonymous closure> (dart:async/timer.dart:17:21)
#4      Timer.run.<anonymous closure> (dart:async/timer.dart:25:13)
#5      Timer.Timer.<anonymous closure> (dart:async-patch/timer_patch.dart:9:15)
#6      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:99:28)
#7      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:107:7)
#8      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:115:23)
#9      _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

使用的版本:

  • Dart 编辑器版本 0.5.9_r22879
  • Dart SDK 版本 0.5.9.0_r22879

是否可以使用 Dart 使用我机器的所有可用内核并行处理这些请求?

4

3 回答 3

9

不久前,我编写了一个名为 dart-isoserver 的库来执行此操作。它现在已经严重腐烂了,但是您可以看到这种方法。

https://code.google.com/p/dart-isoserver/

我所做的是通过隔离端口代理 HttpRequest 和 HttpResponse,因为您不能直接发送它们。它奏效了,尽管有一些警告:

  1. 请求和响应的 I/O 通过主隔离,因此该部分不是并行的。不过,在工作隔离中完成的其他工作并没有阻塞主隔离。真正应该发生的是套接字连接应该可以在隔离之间转移。
  2. 隔离中的异常会导致整个服务器宕机。spawnFunction()现在有一个未捕获的异常处理程序参数,所以这在某种程度上是可以修复的,但 spawnUri() 没有。dart-isoserver 使用 spawnUri() 来实现热加载,因此必须将其删除。
  3. 隔离启动起来有点慢,对于 nginx 和 node.js 所针对的数千个并发连接用例,您可能不希望每个连接一个。带有工作队列的隔离池可能会执行得更好,尽管这会消除您可以在工作程序中使用阻塞 I/O 的好功能。

关于您的第一个代码示例的注释。正如您所注意到的,这绝对不会并行运行,因为 Dart 是单线程的。同一隔离区中的任何 Dart 代码都不会同时运行。

于 2013-05-23T01:18:33.467 回答
7

即使有当前 HttpServer 的限制,也可以通过在 Apache 或 Nginx 等反向代理服务器后面运行多个服务器进程来利用多个内核。在 Dart 中,您还可以分叉子进程来拆分计算密集型任务。

一个不错的起点是阅读有关缩放 node.js 的信息,因为这也使用单线程每进程架构。

编辑:答案现在已经过时了,现在可以在隔离之间共享请求,允许 Dart 进程使用多个核心。

请参阅ServerSocket.bind(shared)的文档。

“可选参数 shared 指定是否可以从同一个 Dart 进程额外绑定到相同的地址、端口和 v6Only 组合。如果 shared 为真并且执行了额外的绑定,那么传入的连接将分布在该组 ServerSockets 之间。一个使用它的方法是拥有多个隔离区,在这些隔离区之间分配传入连接。”

于 2013-05-23T09:12:19.160 回答
6

你需要:

  1. shared: true在 HttpServer.bind 中设置
  2. 产生一些 Isolate 来并行处理传入的请求。

这是一个准系统、最小的 Dart 服务器,它将传入的请求分发到 6 个 Isolate:

import 'dart:io';
import 'dart:isolate';

void main() async {
  for (var i = 1; i < 6; i++) {
    Isolate.spawn(_startServer, []);
  }

  // Bind one server in current Isolate
  _startServer();

  print('Serving at http://localhost:8080/');
  await ProcessSignal.sigterm.watch().first;
}

void _startServer([List args]) async {
  final server = await HttpServer.bind(
    InternetAddress.loopbackIPv4,
    8080,
    shared: true, // This is the magic sauce
  );

  await for (final request in server) {
    _handleRequest(request);
  }
}

void _handleRequest(HttpRequest request) async {
  // Fake delay
  await Future.delayed(const Duration(seconds: 2));

  request.response.write('Hello, world!');
  await request.response.close();
}
于 2020-02-07T12:48:34.580 回答