1

嗨,我正在学习使用 Akka.net,我想做的是创建一个简单的 TCP 服务器,它会定期将数据发送到 tcp 连接。(然后将由 processingjs 客户端获取并显示在输出中)

不知道我在这里缺少什么。你们中的任何一位专家都可以对这个问题有所了解吗?

这些是我的演员:

using Akka.Actor;
using Akka.IO;
using System;
using System.Text;
using System.Threading.Tasks;

namespace ActorTcpProcessing
{
    public class ProcessingServer : ReceiveActor
    {  
        public ProcessingServer()
        { 
            Receive<Tcp.Bound>(TcpBoundHandler);
            Receive<Tcp.Connected>(TcpConnectedHandler);
        }

        internal static Props Create()
        {
            return Props.Create(() => new ProcessingServer());
        }

        private void TcpBoundHandler(Tcp.Bound bound)
        {
            Console.WriteLine("Listening on {0}", bound.LocalAddress);
        }
         
        private void TcpConnectedHandler(Tcp.Connected connected)
        { 
            var tcpConnection = Sender; 

            var connectionHandler = Context.ActorOf(SensorReader.CreateReader(tcpConnection));
           
            tcpConnection.Tell(new Tcp.Register(connectionHandler));
        }
    }

    internal class SensorReader : ReceiveActor
    {
        private readonly IActorRef _tcpConnection;

        public SensorReader(IActorRef tcpConnection)
        {
            _tcpConnection = tcpConnection;

            Receive<SensorDataReceived>(TcpWriteSensorData);
        }

        internal static Props CreateReader(IActorRef connectionHandler)
        {
            return Props.Create(() => new SensorReader(connectionHandler));
        }

        private void TcpWriteSensorData(SensorDataReceived receivedData)
        {
            _tcpConnection.Tell(Tcp.Write.Create(ByteString.FromString(receivedData.SensorData + "\n")));
        }
    }
}

我的印象是,当我SensorReaderTcp.Connected消息上注册时,我需要做的就是向SensorData流发送消息。像这样:_sys.EventStream.Publish(new SensorData("sensor-data[x]"));

但是,这似乎不起作用。我收到以下日志消息:

[INFO][11/09/2021 10:22:05 AM][Thread 0004][akka://tcp-processing/deadLetters] Message [Bound] from [akka://tcp-processing/system/IO-TCP/$a#1070400244] to [akka://tcp-processing/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://tcp-processing/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

这就是我将 Tcp.Bind 消息发送到我的服务器的方式。

using System;
using Akka.IO;
using Akka.Actor;
using System.Net;
using Topshelf;

namespace ActorTcpProcessing
{
    class StreamTcpService : ServiceControl
    {
        ActorSystem _sys;

        public bool Start(HostControl hostControl)
        {
            _sys = ActorSystem.Create("tcp-processing");
            var server = _sys.ActorOf(ProcessingServer.Create(), nameof(ProcessingServer));
            var tcpManager = _sys.Tcp();

            
            tcpManager.Tell(new Tcp.Bind(
                server, 
                new IPEndPoint(IPAddress.Any, 10002)));

            _sys.EventStream.Publish(new SensorData("sensor-data[x]"));

            return true;
        }

        public bool Stop(HostControl hostControl)
        {
            _sys.Terminate().ContinueWith(task =>
            {
                if (task.IsCompleted && !task.IsCanceled)
                    Console.WriteLine("Stream Tcp stopped");
            });

            return true;
        }
    }
}

如您所见,我将 指定server为 Bound 消息的处理程序。还是我完全错了?

另外,我必须提到,当我启动我的 processingjs 客户端时,我确实收到一条Tcp.Connected消息到 my ProcessingServer,这反过来创建SensorReader并注册为处理程序。

我还想知道我EventStream是否正确使用了发布?

_sys.EventStream.Publish(new SensorData("sensor-data[x]"));

对此的任何帮助都将受到高度赞赏。谢谢你。

4

1 回答 1

2

你的问题是:

tcpManager.Tell(new Tcp.Bind(server,new IPEndPoint(IPAddress.Any, 10002)));

这有点误导,但server在 的构造函数中Tcp.Bind是所有传入连接的处理程序Tcp.Connected,而不是将接收Tcp.Bound消息的参与者。

演员将Tcp回复消息Tcp.BoundSenderTcp.Bind要解决您的问题,您应该传递IActorRef预期接收Tcp.Bound消息的 Actor 的引用而不是原始发送者。在你的情况下:

tcpManager.Tell(new Tcp.Bind(server, new IPEndPoint(IPAddress.Any, 10002)), server);
于 2021-09-12T14:38:13.567 回答