2

如何使用 Akka.FSharp API 在 Akka.NET 集群中实现故障转移?

我有以下集群节点用作种子:

open Akka
open Akka.FSharp
open Akka.Cluster
open System
open System.Configuration

let systemName = "script-cluster"
let nodeName = sprintf "cluster-node-%s" Environment.MachineName
let akkaConfig = Configuration.parse("""akka {  
                                          actor {
                                            provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
                                          }
                                          remote {
                                            log-remote-lifecycle-events = off
                                            helios.tcp {
                                                hostname = "127.0.0.1"
                                                port = 2551       
                                            }
                                          }
                                          cluster {
                                            roles = ["seed"]  # custom node roles
                                            seed-nodes = ["akka.tcp://script-cluster@127.0.0.1:2551"]
                                            # when node cannot be reached within 10 sec, mark is as down
                                            auto-down-unreachable-after = 10s
                                          }
                                        }""")
let actorSystem = akkaConfig |> System.create systemName

let clusterHostActor =
    spawn actorSystem nodeName (fun (inbox: Actor<ClusterEvent.IClusterDomainEvent>) -> 
        let cluster = Cluster.Get actorSystem
        cluster.Subscribe(inbox.Self, [| typeof<ClusterEvent.IClusterDomainEvent> |])
        inbox.Defer(fun () -> cluster.Unsubscribe(inbox.Self))
        let rec messageLoop () = 
            actor {
                let! message = inbox.Receive()                        
                // TODO: Handle messages
                match message with
                | :? ClusterEvent.MemberJoined as event -> printfn "Member %s Joined the Cluster at %O" event.Member.Address.Host DateTime.Now
                | :? ClusterEvent.MemberLeft as event -> printfn "Member %s Left the Cluster at %O" event.Member.Address.Host DateTime.Now
                | other -> printfn "Cluster Received event %O at %O" other DateTime.Now

                return! messageLoop()
            }
        messageLoop())

然后我有一个可能会死的任意节点:

open Akka
open Akka.FSharp
open Akka.Cluster
open System
open System.Configuration

let systemName = "script-cluster"
let nodeName = sprintf "cluster-node-%s" Environment.MachineName
let akkaConfig = Configuration.parse("""akka {  
                                          actor {
                                            provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
                                          }
                                          remote {
                                            log-remote-lifecycle-events = off
                                            helios.tcp {
                                                hostname = "127.0.0.1"
                                                port = 0       
                                            }
                                          }
                                          cluster {
                                            roles = ["role-a"]  # custom node roles
                                            seed-nodes = ["akka.tcp://script-cluster@127.0.0.1:2551"]
                                            # when node cannot be reached within 10 sec, mark is as down
                                            auto-down-unreachable-after = 10s
                                          }
                                        }""")
let actorSystem = akkaConfig |> System.create systemName

let listenerRef =  
    spawn actorSystem "temp2"
    <| fun mailbox ->
        let cluster = Cluster.Get (mailbox.Context.System)
        cluster.Subscribe (mailbox.Self, [| typeof<ClusterEvent.IMemberEvent>|])
        mailbox.Defer <| fun () -> cluster.Unsubscribe (mailbox.Self)
        printfn "Created an actor on node [%A] with roles [%s]" cluster.SelfAddress (String.Join(",", cluster.SelfRoles))
        let rec seed () = 
            actor {
                let! (msg: obj) = mailbox.Receive ()
                match msg with
                | :? ClusterEvent.MemberRemoved as actor -> printfn "Actor removed %A" msg
                | :? ClusterEvent.IMemberEvent           -> printfn "Cluster event %A" msg
                | _ -> printfn "Received: %A" msg
                return! seed () }
        seed ()

在集群中实施故障转移的推荐做法是什么?

具体来说,是否有一个代码示例说明当其中一个节点不再可用时集群的行为方式?

  • 我的集群节点应该启动替换还是有不同的行为?
  • 是否有一个配置可以自动处理这个我可以设置而无需编写代码?
  • 我必须在哪里实现什么代码?
4

1 回答 1

3

首先,当节点加入/离开过程完成时,依赖MemberUpMemberRemoved事件(两者都实现 ClusterEvent.IMemberEvent 接口,所以订阅它)是一个更好的主意,因为它们标记阶段。加入和离开事件不一定能确保节点在发出信号的时间点完全可操作。

关于故障转移场景:

  • 可以通过 Akka.Cluster.Sharding 插件自动旋转替换(阅读文章12以获取有关它如何工作的更多信息)。Akka.FSharp 中没有对应的,但您可以使用Akkling.Cluster.Sharding插件代替:请参阅示例代码
  • 另一种方法是在每个节点上预先创建替换参与者。您可以使用集群路由器分布式发布/订阅将消息路由到它们。然而,这更像是一种情况,当您有无状态场景时,每个参与者都可以随时完美地接手另一个参与者的工作。这是在生活在许多不同节点上的许多参与者之间分配工作的更通用的解决方案。
  • 你也可以设置观察者而不是处理演员。通过使用监视功能,您可以命令您的演员监视另一个演员(无论它住在哪里)。如果节点发生故障,有关垂死演员的信息将以消息的形式Terminated发送给其所有观察者。这样您就可以实现自己的逻辑,即在另一个节点上重新创建参与者。这实际上是最通用的方式,因为它不使用任何额外的插件或配置,但行为需要自己描述。
于 2017-03-14T20:58:16.743 回答