我正在尝试使用以下配置构建一个可以根据工作负载向上/向下扩展的参与者系统。
akka {
actor {
serializers {
wire = "Akka.Serialization.WireSerializer, Akka.Serialization.Wire"
}
serialization-bindings {
"System.Object" = wire
}
deployment {
/analysis {
router = round-robin-pool
routees.paths = ["/user/analysis"]
resizer {
enabled = on
lower-bound = 1
upper-bound = 20
}
}
}
}
}
在我的主循环中,我正在创建 3000 条消息并推送给演员进行处理
var runAnalysisProps = Props.Create<RunAnalysisActor>().WithRouter(FromConfig.Instance);
//var runAnalysisProps = Props.Create<RunAnalysisActor>().WithRouter(new RoundRobinPool(0, new DefaultResizer(lower:1, upper: 10 ))); **This do not help either **
var analysisRef = RunAnalysisActorSystem.ActorOf(runAnalysisProps, "analysis");
Logger.LogMessage("Started Posting all messages", ConsoleColor.Blue);
for (int i = 0; i < 3000; i++)
{
analysisRef.Tell(new AnalysisMessage(100 + i, $"FIC{100+ i}", $"c:\\temp\\fic{100 + i}.dat"));
}
Logger.LogMessage("Completed posting all messages", ConsoleColor.Blue);
在我看来,我们在上面的 for 循环中是否看到任何错误,它看起来像是将所有消息发送到一个参与者 ref 而不是路由器。
我ReceiveActor
是一个非常简单的日志记录应用程序
public RunAnalysisActor()
{
Receive<AnalysisMessage>((message)=> {
Logger.LogMessage($"Analysis Started => Id: {message.AnalysisId}, Loop: {message.LoopName}, File:{message.FilePath}");
var waitTime = rnd.Next(5) * 100;
//Logger.LogMessage($"Waiting => {waitTime}");
Thread.Sleep(10);
Logger.LogMessage($"Analysis Completed=> AId: {message.AnalysisId}, Loop: {message.LoopName}, File:{message.FilePath}");
});
}
我还覆盖AroundPreStart
并PostStop
查看启动和停止了多少演员。但我看到,当应用程序启动时,它只创建lower-bound
或最少 2 个演员,我期待这将基于pressure-threshold
默认 1 创建更多演员,但我看到它没有发生,并且当有没有任何消息要处理。
有人可以帮我理解我在这里做错了什么。- 谢谢
更新1:
我想我弄清楚了实际问题。在发布消息时,我需要更改我的逻辑以将路径附加到我的部署配置routees.paths = ["/user/analysis"]
,并使用和函数来使用类似这样的东西Ask
获得正确的参考 。analysisRef.Ask<Routees>
RunAnalysisActorSystem.Scheduler.Advanced.ScheduleRepeatedly(
TimeSpan.FromMilliseconds(500),
TimeSpan.FromMilliseconds(500),
() => {
if (analysisRef.Ask<Routees>(new GetRoutees()).Result.Members.Any())
{
var i = rand.Next();
analysisRef.Tell(new AnalysisMessage(100 + i, $"FIC{100 + i}", $"c:\\temp\\fic{100 + i}.dat"));
}
}
);