1

我需要使用tiberius收集表的所有行并输出它们。我的简单代码是:

extern crate futures;
extern crate tokio_core;
extern crate tiberius;

use futures::Future;
use tokio_core::reactor::Core;
use tiberius::SqlConnection;
use tiberius::stmt::ResultStreamExt;

fn main() {
    let mut core = Core::new().unwrap();

    let future = SqlConnection::connect(core.handle(), "server=tcp:127.0.0.1,1433;username=SA;password=qweasdZXC123!!;")
        .and_then(|conn| {
            let mut v: Vec<String> = Vec::new();

            conn.simple_query("SELECT id, name FROM test").for_each_row(|row| {

                let id: i32 = row.get(0);
                let name: &str = row.get(1);

                v.push(format!("{} - {}", id, name));

                Ok(())
            });

            println!("{:?}", v);

            Ok(())
        });

    core.run(future).unwrap();
}

此代码打印一个空向量,但我需要完整的字符串向量。我读过一些关于期货的文章,但作为 Rust 的新手,它们对我来说看起来太复杂了。

4

1 回答 1

1

在您的原始代码中,您有

conn.simple_query("SELECT id, name FROM users").for_each_row(|row| {
    // ...
}).wait().unwrap();

你说“它可以编译,但根据请求挂起。我认为有问题wait()。”。

如果您阅读 的文档Future::wait,您会看到这个警告,强调我的:

注意:此方法不适合在事件循环或类似 I/O 情况下调用,因为它会阻止事件循环继续进行(这会阻塞线程)。只有在保证与此未来相关的阻塞工作将由另一个线程完成时,才应调用此方法。

在您更新的代码中,您有

conn.simple_query("SELECT id, name FROM test").for_each_row(|row| {
    // ...
});

这构建了一个未来,但随后立即将其丢弃,因此外部向量不会发生任何事情。由于这个原因,期货箱中的所有期货都附有警告:

警告:必须使用的未使用的`futures::FutureResult`:除非被轮询,否则期货什么也不做

我已经提交了一个问题,以便图书馆添加这个。


这是一段完全未经测试的代码。我没有一个 SQL Server 实例来实际测试它,但它确实可以编译并且具有正确的形状。

extern crate futures;
extern crate futures_state_stream;
extern crate tokio_core;
extern crate tiberius;

use futures::{Future, Stream};
use futures_state_stream::StateStream;
use tiberius::SqlConnection;
use tokio_core::reactor::Core;

fn main() {
    let mut core = Core::new().unwrap();

    let connection_string = "server=tcp:127.0.0.1,1433;username=SA;password=qweasdZXC123!!;";

    let future = SqlConnection::connect(core.handle(), connection_string)
        .and_then(|conn| {

            let query = conn.query("SELECT * FROM test WHERE id > @P1", &[&0i32])
                .into_stream()
                .take(1);

            query.flatten()
                .map(|row| {
                    let id: i32 = row.get(0);
                    let name: &str = row.get(1);

                    format!("{} - {}", id, name)
                })
                .collect()
        });

    let all_rows = core.run(future).unwrap();
.
    println!("{:?}", all_rows);
}

要点:

  1. conn.query可以与多个查询语句一起使用,因此它返回结果集
  2. conn.query实际上实现了 a StateStream,而不是 a futures::Stream。出于示例的目的,我将其转换回futures::Streamwith .into_stream()conn这是不理想的,因为我们失去了事后恢复的能力。
  3. 我只使用.take(1).
  4. 由于我们现在有一个Streamof Streams,我们使用它Stream::flatten来移除嵌套。
  5. 每行都map连接到一个String.
  6. 被编成一个单一StreamcollectFutureVec<String>
于 2017-09-21T15:20:48.013 回答