我在 SimpleClusterListener 的 F# 实现中观察到以下错误:
[错误][2017 年 3 月 20 日上午 11:32:53][线程 0008][[akka://ClusterSystem/system/endpoint tManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%400.0.0.0%3A2552 - 5/endpointWriter#1522364225]] 为非本地收件人 [[akka.tcp://ClusterSystem@localhost:2552/]] 丢弃消息 [Akka.Actor.ActorSelectionMessage] 到达 [akka.tcp://ClusterSystem@localhost :2552] 入站地址 [akka.tcp://Clust erSystem@0.0.0.0:2552]
我运行了 C# 实现(在下面的附录中引用),没有任何问题。此外,我使用的端口与 C# 实现使用的端口相同。
笔记:
我是 Akka.Net 的新手,因此,我正在努力解决我尝试移植的示例出错的地方。
我的实现如下:
主文件
module Program
open System
open System.Configuration
open Akka.Configuration.Hocon
open Akka.Configuration
open Akka.Actor
open Samples.Cluster.Simple
[<Literal>]
let ExitWithSuccess = 0
let createActor port =
let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection
let config = ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port)
.WithFallback(section.AkkaConfig)
let system = ActorSystem.Create ("ClusterSystem", config)
let actorRef = Props.Create(typeof<SimpleClusterListener>)
system.ActorOf(actorRef, "clusterListener") |> ignore
let startUp (ports:string list) = ports |> List.iter createActor
[<EntryPoint>]
let main args =
startUp ["2551"; "2552"; "0"]
Console.WriteLine("Press any key to exit")
Console.ReadLine() |> ignore
ExitWithSuccess
SimpleClusterListener.fs
namespace Samples.Cluster.Simple
open Akka.Actor
open Akka.Cluster
open Akka.Event
type SimpleClusterListener() =
inherit UntypedActor()
override this.PreStart() =
let cluster = Cluster.Get(UntypedActor.Context.System)
let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent>
typeof<ClusterEvent.UnreachableMember> |]
cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events)
override this.OnReceive(message:obj) =
let log = UntypedActor.Context.GetLogger()
match message with
| :? ClusterEvent.MemberUp as e -> log.Info("Member is up: {0}", e.Member)
| :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member)
| :? ClusterEvent.MemberRemoved as e -> log.Info("Member is removed: {0}", e.Member)
| _ -> ()
override this.PostStop() =
let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System)
cluster.Unsubscribe base.Self
上面的 OnReceive 方法永远不会被调用。但是,PreStart 方法可以。
附录:
如前所述,我在下面移植了 C# 实现。我成功运行了这段代码。因此,当我尝试移植它时,我很困惑我哪里出错了。
//-----------------------------------------------------------------------
// <copyright file="Program.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.Configuration;
using Akka.Configuration.Hocon;
using System;
using System.Configuration;
namespace Samples.Cluster.Simple
{
class Program
{
static void Main(string[] args)
{
StartUp(args.Length == 0 ? new String[] { "2551", "2552", "0" } : args);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
public static void StartUp(string[] ports)
{
var section = (AkkaConfigurationSection)ConfigurationManager.GetSection("akka");
foreach (var port in ports)
{
//Override the configuration of the port
var config =
ConfigurationFactory.ParseString("akka.remote.dot-netty.tcp.port=" + port)
.WithFallback(section.AkkaConfig);
//create an Akka system
var system = ActorSystem.Create("ClusterSystem", config);
//create an actor that handles cluster domain events
system.ActorOf(Props.Create(typeof(SimpleClusterListener)), "clusterListener");
}
}
}
}
//-----------------------------------------------------------------------
// <copyright file="SimpleClusterListener.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.Cluster;
using Akka.Event;
namespace Samples.Cluster.Simple
{
public class SimpleClusterListener : UntypedActor
{
protected ILoggingAdapter Log = Context.GetLogger();
protected Akka.Cluster.Cluster Cluster = Akka.Cluster.Cluster.Get(Context.System);
/// <summary>
/// Need to subscribe to cluster changes
/// </summary>
protected override void PreStart() =>
Cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.UnreachableMember) });
/// <summary>
/// Re-subscribe on restart
/// </summary>
protected override void PostStop() => Cluster.Unsubscribe(Self);
protected override void OnReceive(object message)
{
var up = message as ClusterEvent.MemberUp;
if (up != null)
{
var mem = up;
Log.Info("Member is Up: {0}", mem.Member);
}
else if (message is ClusterEvent.UnreachableMember)
{
var unreachable = (ClusterEvent.UnreachableMember)message;
Log.Info("Member detected as unreachable: {0}", unreachable.Member);
}
else if (message is ClusterEvent.MemberRemoved)
{
var removed = (ClusterEvent.MemberRemoved)message;
Log.Info("Member is Removed: {0}", removed.Member);
}
else if (message is ClusterEvent.IMemberEvent)
{
//IGNORE
}
else if (message is ClusterEvent.CurrentClusterState)
{
}
else
{
Unhandled(message);
}
}
}
}
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/>
</configSections>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/>
</startup>
<akka>
<hocon>
<![CDATA[
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
}
remote {
log-remote-lifecycle-events = DEBUG
dot-netty.tcp {
hostname = "localhost"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@localhost:2551",
"akka.tcp://ClusterSystem@localhost:2552"]
#auto-down-unreachable-after = 30s
}
}
]]>
</hocon>
</akka>
</configuration>