0

我正在为 Service Fabric 编写 Akka.Persistence 的实现,但我似乎无法让快照工作。当它尝试恢复状态时,它会获取最新的快照,但不会重播自最新快照以来的事件。我不清楚如果我只是没有正确连接组件,或者我的持久性库的实现不正确。我的演员是一个简单的计数器,我的状态只是当前的计数。我希望首先调用 Recover,然后在最后一个快照和最高序列号之间的每个日记条目中调用 Recover。日志中有一个函数 ReplayMessagesAsync(...) 看起来应该这样做,但它没有被调用。我的计数器的代码如下,我的其余代码是:代码

using Akka.Actor;
using Akka.Persistence;
using Akka.Persistence.ServiceFabric.Journal;
using Akka.Persistence.ServiceFabric.Snapshot;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace AkkaPersistence.Actors
{
    public class Counter : ReceivePersistentActor
    {
        public class GetCount { }

        private int counter;

        private CounterState State = new CounterState();

        private int _msgsSinceLastSnapshot = 0;

        public Counter()
        {
            Recover<Evt>(evt =>
            {
                State.Update(evt);
            });

            Recover<SnapshotOffer>(offer => {
                var snapshotEntry = offer.Snapshot as SnapshotEntry;
                if (snapshotEntry != null)
                {
                    State = (CounterState)snapshotEntry.Snapshot;
                }
            });

            Command<string>(str => Persist(str, s =>
            {
                ++counter;
                var evt = new Evt(s);
                State.Update(evt);

                if (++_msgsSinceLastSnapshot % 10 == 0)
                {
                    //time to save a snapshot
                    SaveSnapshot(State.Copy());
                }
            }));

            Command<GetCount>(get => Sender.Tell(State.Count));

            Command<SaveSnapshotSuccess>(success =>
            {
                ServiceEventSource.Current.Message($"Saved snapshot");
                DeleteMessages(success.Metadata.SequenceNr);
            });

            Command<SaveSnapshotFailure>(failure => {
                // handle snapshot save failure...
                ServiceEventSource.Current.Message($"Snapshot failure");
            });

        }

        public override string PersistenceId
        {
            get
            {
                return "counter";
            }
        }
    }

    internal class CounterState
    {
        private long count = 0L;

        public long Count
        {
            get { return count; }
            set { count = value; }
        }

        public CounterState(long count)
        {
            this.Count = count;
        }

        public CounterState() : this(0)
        {
        }

        public CounterState Copy()
        {
            return new CounterState(count);
        }

        public void Update(Evt evt)
        {
            ++Count;
        }
    }

    public class Evt
    {
        public Evt(string data)
        {
            Data = data;
        }

        public string Data { get; }

    }

    public class Cmd
    {
        public Cmd(string data)
        {
            Data = data;
        }

        public string Data { get; }
    }

}
4

1 回答 1

1

有几件事我错了:1)我需要返回传入的内容,而不是我的 SnapshotEntry,它是我的持久性机制的实现细节。2) 一个简单的失误,因为我从保存字符串转换为尝试将对象保存为日志的一部分。3) 最后还有一个问题,那就是根本问题,就是子对象的序列化失败。在这段代码中,我不想包含子对象的类型,因此我为 Journal 添加了一个自定义序列化器(Wire 序列化器)以及已经存在的 SnapshotSerializer,它现在正在工作。

于 2016-08-22T19:28:56.073 回答