1

SyncArbiter我正在尝试使用一个供不同参与者使用的 10 个 Redis 连接池。假设我们有一个名叫 Bob 的演员,他必须使用 Redis 演员来完成它的任务。

虽然这可以通过以下方式实现:

// crate, use and mod statements have been omitted to lessen clutter

/// FILE main.rs
pub struct AppState {
    pub redis: Addr<Redis>,
    pub bob: Addr<Bob>
}

fn main() {
    let system = actix::System::new("theatre");

    server::new(move || {
        let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
        let bob_addr = SyncArbiter::start(10, || Bob::new());

        let state = AppState {
            redis: redis_addr,
            bob: bob_addr
        };

        App::with_state(state).resource("/bob/eat", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::bob::eat)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

/// FILE controllers/bob.rs
pub struct Food {
  name: String,
  kcal: u64
}

pub fn eat(
    (req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
    state
        .bob
        .send(Eat::new(req.into_inner()))
        .from_err()
        .and_then(|res| match res {
            Ok(val) => {
                println!("==== BODY ==== {:?}", val);
                Ok(HttpResponse::Ok().into())
            }
            Err(_) => Ok(HttpResponse::InternalServerError().into()),
        })
}

/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
    pub client: Client
}

pub struct RunCommand(Cmd);

impl RunCommand {
    pub fn new(cmd: Cmd) -> Self {
        RunCommand(cmd)
    }
}

impl Message for RunCommand {
    type Result = Result<RedisResult<String>, ()>;
}

impl Actor for Redis {
    type Context = SyncContext<Self>;
}

impl Handler<RunCommand> for Redis {
    type Result = Result<RedisResult<String>, ()>;

    fn handle(&mut self, msg: RunCommand, _context: &mut Self::Context) -> Self::Result {
        println!("Redis received command!");
        Ok(Ok("OK".to_string()))
    }
}

impl Redis {
    pub fn new(url: &str) -> Result<Self, RedisError> {
        let client = match Client::open(url) {
            Ok(client) => client,
            Err(error) => return Err(error)
        };

        let redis = Redis {
            client: client,
        };

        Ok(redis)
    }
}

/// FILE actors/bob.rs
pub struct Bob;

pub struct Eat(Food);

impl Message for Eat {
    type Result = Result<Bob, ()>;
}

impl Actor for Eat {
    type Context = SyncContext<Self>;
}

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        println!("Bob received {:?}", &msg);

        // How to get a Redis actor and pass data to it here?

        Ok(msg.datapoint)
    }
}

impl Bob {
    pub fn new() -> () {
        Bob {}
    }
}

从 Bob 中的上述句柄实现,我们不清楚 Bob 是如何获得 Redis Actor 的地址的。或将任何消息发送给任何Actor运行在SyncArbiter.

Arbiter使用常规和 a也可以实现相同的目的,Registry但据我所知,Actix 不允许多个相同的演员(例如,我们不能使用常规启动 10 个 Redis 演员Arbiter)。

正式化我的问题:

  • 有没有Registry适合SyncArbiter演员的
  • 我可以在常规中启动多个相同类型的演员吗Arbiter
  • 是否有更好/更规范的方式来实现连接池

编辑

版本:

  • 行动 0.7.9
  • actix_web 0.7.19
  • 期货=“0.1.26”
  • 生锈 1.33.0
4

1 回答 1

2

我自己找到了答案。

开箱即用,无法从注册表中检索Actor带有 a 的 a 。SyncContext

鉴于我上面的例子。为了让actorBob向actor发送任何类型的消息,Redis它需要知道Redisactor的地址。Bob可以得到Redis明确的地址 - 包含在发送给它的消息中或从某种共享状态中读取。

我想要一个类似于 Erlang 的系统,所以我决定不通过消息传递参与者的地址,因为它看起来太费力,容易出错,而且在我看来,它违背了拥有基于参与者的并发模型的目的(因为没有一个参与者可以向任何其他参与者发送消息)演员)。

因此,我研究了共享状态的想法,并决定实现我自己的SyncRegistry,它类似于 Actix 标准Registry——它完全符合我的要求,但不适用于带有SyncContext.

这是我编写的幼稚解决方案:https ://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177

使用以下设置:

fn main() {
    let system = actix::System::new("theatre");

    let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
    SyncRegistry::set(addr);
    let addr = SyncArbiter::start(10, || Bob::new());
    SyncRegistry::set(addr);


    server::new(move || {
        let state = AppState {};

        App::with_state(state).resource("/foo", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::foo::create)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

参与者Bob可以通过Redis以下方式从程序中的任何点获取地址:

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        let redis = match SyncRegistry::<Redis>::get() {
            Some(redis) => redis,
            _ => return Err(())
        };

        let cmd = redis::cmd("XADD")
            .arg("things_to_eat")
            .arg("*")
            .arg("data")
            .arg(&msg.0)
            .to_owned();

        redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
    }
}
于 2019-04-10T17:56:46.283 回答