1

抱歉,如果这看起来我想为我保存我的问题,但我前段时间偶然发现了 stact,它似乎是一个方便熟悉的库,我认为它可以很好地解决手头的问题。

我看了一下源代码,里面有很多!:) 不知道从哪里开始。

我需要的是一个组件(Actor?),它会定期发送异步 Web 请求并将结果存储在本地(首先会对结果进行一些解析)。其他线程将在不同时间询问结果。

据我所知,我需要一个调度器、一个光纤和一个通道来处理将结果返回给任何请求它们的人。类似于以下内容:

private static readonly ConcurrentDictionary<Uri, ServerLoad> ServerLoads = new ConcurrentDictionary<string, ServerLoad>();
public Channel<Request<IEnumerable<ServerLoad>>> ServerLoadChannel { get; private set; 
public LoadRetriever(Inbox inbox, Fiber fiber, Scheduler scheduler, ILoadBalancerConfiguration config)
{
    this.inbox = inbox;
    this.fiber = fiber;
    this.scheduler = scheduler;

    this.scheduler.Schedule(
        0,
        config.FetchIntervalMilliSecs,
        fiber,
        () =>
            {
                foreach (var server in config.Servers)
                {
                    // need someway to send async web request to url in
                    // server.LoadRetrievalAddress and save/update result
                    // in ServerLoads dictionary
                }
            });

    this.ServerLoadChannel = new ConsumerChannel<Request<IEnumerable<ServerLoad>>>(this.fiber, 
        request => request.Respond(ServerLoads.Values));
}

}

这暂时都是理论上的,可能完全错误,我遇到的主要问题是执行异步请求(WebClient.DownloadStringAsync() 和 DownloadStringCompleted)。与 AsyncResultChannel 相反的东西

任何提示/示例/推动正确方向将不胜感激!

(试图为 stact 创建一个标签,但我的声誉不够好:S)

4

2 回答 2

2

好吧,这里有几件事。首先,您应该创建一个作为 Actor 的类,使用:

public class MyServerCoordinator : Actor 
{}

然后,该参与者将加载配置并为每个服务器实例创建另一个参与者。这将允许每个配置的服务器按照自己的时间表检索其信息。

然后你需要为每种类型创建一个 ActorFactory ,使用:

var factory = ActorFactory.Create((i,f,s) => new MyServerCoordinator(i,f,s));
var actor = factory.GetActor();

对于消息传递,您应该为要在参与者之间交换的每条消息创建一个消息类,而不是使用原始 CLR 类型。

此外,在每个 actor 的构造函数中,您可以设置一个 Receive 循环来分别处理计划更新的请求。参与者可以通过发送请求来相互通信,其中包括对发送者的响应通道。这将允许每个服务器的参与者与协调者对话。

MyServerCoordinator(Inbox inbox)
{
    inbox.Loop(loop =>
    {
        loop.Receive<Request<ServerData>>(request =>
        {
            request.Respond(new ServerDataResult(_myCurrentValue)); 
            loop.Continue();
        });
    }
}

每个参与者都有自己的纤程,因此不需要使用并发集合类型或任何锁定机制。Actor 模型通过消息传递处理并发,因此不使用信号量等。

您还需要查看单元测试中的一些示例,以了解如何使用 AnonymousActor.New() 内联声明与非参与者代码中的参与者交互。

于 2011-08-29T02:28:14.533 回答
1

Cashbox https://github.com/Cashbox/Cashbox可能是一个不错的起点。我用旧版本的 Stact 编写了它,但大多数概念应该可以很好地移植。

https://github.com/Cashbox/Cashbox/blob/master/src/Cashbox/Engines/FileStorageEngine.cs是所有基于actor的魔法发生的地方。https://github.com/Cashbox/Cashbox/blob/master/src/Cashbox/Engines/FileStorageEngine.cs#L182是关于发出请求并等待响应的全部内容。

于 2011-08-29T02:21:48.090 回答