0

Tcp.OuttgoingConnection 从混音器收集数据并异步发送到处理数据的 sourceQueue。

发出命令后,不能保证下一位数据是响应。我如何反馈响应?

一种“肮脏”的方式是有一个静态变量,当使用线程暂停处理时我将数据放入其中以等待它,但这非常低效。是否有一个 akka 机制可以观察一个值的变化并给出一个 Future?

这是当前代码:

    public Q16SocketThread(ActorSystem system) {
        Logger.debug("Loading Q16SocketThread.");
        this.system = system;

        final Flow<ByteString, ByteString, CompletionStage<Tcp.OutgoingConnection>> connection =
                Tcp.get(system).outgoingConnection(ipAddress, port);

        int bufferSize = 10;
        final SourceQueueWithComplete<ByteBuffer> sourceQueue =
                Source.<ByteBuffer>queue(bufferSize, OverflowStrategy.fail())
                        .map(input -> Hex.encodeHexString(input.array()))
                        .to(Sink.foreach(this::startProcessing))
                        .run(system);

        final Flow<ByteString, ByteString, NotUsed> repl =
                Flow.of(ByteString.class)
                        .map(ByteString::toByteBuffer)
                        .map(sourceQueue::offer)
                        .map(
                                text -> {
                                    //Logger.debug("Server: " + Hex.encodeHexString(text.array()));
                                    String hexCmd;
                                    if (!nextCmd.isEmpty()) {
                                        hexCmd = nextCmd.take();
                                    } else {
                                        hexCmd = "fe";
                                    }
                                    return ByteString.fromArray(Hex.decodeHex(hexCmd));
                                }).async();

        CompletionStage<Tcp.OutgoingConnection> connectionCS = connection.join(repl).run(system);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, message -> {
                    if (message.equalsIgnoreCase("start")) {
                        Logger.debug("Q16 thread started.");
                        nextCmd.put(sysExHeaderAllCall + "1201F7");
                    } else if (message.equalsIgnoreCase("stop")) {
                        Logger.debug("Stopping of data gathering");
                        nextCmd.put(sysExHeaderAllCall + "1200F7");
                        //self().tell(PoisonPill.getInstance(), ActorRef.noSender());
                    } else if (message.equalsIgnoreCase("version")){
                        Logger.debug("Requesting version.");
                        nextCmd.put(sysExHeaderAllCall + "1001F7");
                    }
                }).build();
    }
4

1 回答 1

0

我通过观察变量来理解使用询问模式并接收消息。在您的情况下,您希望将消息包装在 Future 中。是你的意思吗?

如果是这样,来自 Akka 文档(https://doc.akka.io/docs/akka/2.5/futures.html#use-with-actors)可能会有所帮助:

通常有两种方法可以从 AbstractActor 获得回复:第一种是通过发送的消息(actorRef.tell(msg, sender)),它仅在原始发送者是 AbstractActor 时才有效),第二种是通过 Future。

使用 ActorRef 的 ask 方法发送消息将返回一个 Future。要等待并检索实际结果,最简单的方法是:

import akka.dispatch.*;
import jdocs.AbstractJavaTest;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import akka.util.Timeout;

Timeout timeout = Timeout.create(Duration.ofSeconds(5));
Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration());
于 2020-11-14T11:54:35.500 回答