2

是否可以在单个异步函数中封装对同一个 dart 隔离的重复发送/响应?

背景:

为了设计一个方便的API,我想让一个函数异步返回一个隔离生成的结果,例如

var ans = await askIsolate(isolateArgs);

如果我直接使用 spawnUri 调用生成的响应,这很好用,例如

Future<String> askIsolate(Map<String,dynamic> isolateArgs) {

ReceivePort response = new ReceivePort();
var uri = Uri.parse(ISOLATE_URI);

Future<Isolate> remote = Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort);
return remote.then((i) => response.first)
               .catchError((e) { print("Failed to spawn isolate"); })
               .then((msg) => msg.toString());
}

然而,上述方法的缺点是,如果我需要重复调​​用 askIsolate,则每次都必须生成隔离。

相反,我想与正在运行的隔离进行通信,这当然可以通过让隔离返回一个 sendPort 给调用者来实现。但我相信自2013 年 Isolate 重构以来 ,这需要调用者在 receivePort 上收听后续消息,从而无法将封装在单个异步函数中。

是否有一些我缺少的机制来实现这一点?

4

3 回答 3

2

答案取决于您打算如何使用隔离

  • 您是否打算让它无限期地运行,向它发送输入并期望异步接收答案?

  • 您是否想一次向隔离发送多个(但有限)输入,期望异步接收答案,然后关闭隔离?

我猜是后者,当它收到所有答案时,您的askIsolate()函数需要立即返回,而不是完成。Future

循环可await for用于侦听流并使用事件,直到它关闭。

我对分离物不熟悉,所以我希望这没问题,我还没有测试过。我假设隔离终止并且响应关闭。

String askIsolate(Map<String,dynamic> isolateArgs) async {

  ReceivePort response = new ReceivePort();
  var uri = Uri.parse(ISOLATE_URI);

  Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort)
    .catchError((e)) {
     throw ...;
   });

  List<String> answers = new List<String>;

  await for(var answer in response) {
    out.add(answer.toString());
  }

  return answers;
}

笔记:

  • response是您正在收听的答案流。它是在产生隔离之前创建的,因此您不需要(并且可能不应该)等待隔离未来完成后再收听它。

  • 我做了askIsolate()async 是因为这样可以很容易地立即返回一个在函数返回时完成的未来 - 没有那些繁琐的.then(...)链,我个人觉得它令人困惑且难以阅读。

顺便说一句,您的原始then(...).catchError(...)样式代码最好这样编写:

  Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort)
    .catchError((e) { ... });

   return response.first)
     .then((msg) => msg.toString());

我相信在创建隔离后延迟将 catchError 处理程序附加到该行可能会让未来在处理程序到位之前完成错误。

请参阅:https ://www.dartlang.org/articles/futures-and-error-handling/#potential-problem-failing-to-register-error-handlers-early 。

于 2016-04-08T17:28:06.253 回答
2

我还建议查看IsolateRunnerin package:isolate,它旨在解决这样的问题 - 在同一个隔离中多次调用函数,而不是在隔离创建时只调用一次。

如果你不想这样,还有其他更原始的选择

异步函数可以等待期货或流,而 aReceivePort是一个流。对于快速破解,您可能可以await for在响应流上做一些事情,但这不是很方便。

将from包裹ReceivePort起来是一个更好的选择。这使您可以将单个事件转换为期货。就像是:StreamQueuepackage:async

myFunc() async {  
  var responses = new ReceivePort();
  var queue = new StreamQueue(responses);
  // queryFunction sends its own SendPort on the port you pass to it.
  var isolate = await isolate.spawn(queryFunction, [], responses.sendPort);
  var queryPort = await queue.next();
  for (var something in somethingToDo) {
    queryPort.send(something);
    var response = await queue.next();
    doSomethingWithIt(response);
  }
  queryPort.send("shutdown command");  
  // or isolate.kill(), but it's better to shut down cleanly.
  responses.close();  // Don't forget to close the receive port.
}
于 2016-04-10T21:48:49.140 回答
1

下面是一个基于 lrn 上述评论的快速工作示例。该示例通过 spawnURI 初始化一个隔离,然后通过传递一个新的 ReceivePort 与隔离进行通信,期望在该接收端口上进行回复。这允许 askIsolate 直接从正在运行的 spawnURI 隔离返回响应。

为清楚起见,省略了注释错误处理。

隔离代码:

import 'dart:isolate';
import 'dart:convert' show JSON;

main(List<String> initArgs, SendPort replyTo) async {
  ReceivePort receivePort = new ReceivePort();
  replyTo.send(receivePort.sendPort);

  receivePort.listen((List<dynamic> callArgs) async {
    SendPort thisResponsePort = callArgs.removeLast(); //last arg must be the offered sendport
    thisResponsePort.send("Map values: " + JSON.decode(callArgs[0]).values.join(","));
  });
}

调用代码:

import 'dart:async';
import 'dart:isolate';
import 'dart:convert';


const String ISOLATE_URI = "http://localhost/isolates/test_iso.dart";
SendPort isolateSendPort = null;

Future<SendPort> initIsolate(Uri uri) async {
    ReceivePort response = new ReceivePort();
    await Isolate.spawnUri(uri, [], response.sendPort, errorsAreFatal: true);
    print("Isolate spawned from $ISOLATE_URI");
    return await response.first;
}


Future<dynamic> askIsolate(Map<String,String> args) async {
  if (isolateSendPort == null) {
    print("ERROR: Isolate has not yet been spawned");
    isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); //try again
  }

  //Send args to the isolate, along with a receiveport upon which we listen for first response 
  ReceivePort response = new ReceivePort();
  isolateSendPort.send([JSON.encode(args), response.sendPort]);
  return await response.first;
}

main() async {
  isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI));

  askIsolate({ 'foo':'bar', 'biz':'baz'}).then(print);
  askIsolate({ 'zab':'zib', 'rab':'oof'}).then(print);
  askIsolate({ 'One':'Thanks', 'Two':'lrn'}).then(print);
}  

输出

Isolate spawned from http://localhost/isolates/test_iso.dart
Map values: bar,baz
Map values: zib,oof
Map values: Thanks,lrn
于 2016-04-12T13:58:40.497 回答