0

我写了一个函数,它返回一对QDateTime可观察的,就像这个:

rxcpp::observable<std::tuple<QDateTime, QDateTime>> experimentOne(const QDateTimeAxis * const axis 
{
   return rxcpp::observable<>::create<std::tuple<QDateTime, QDateTime>>(
     [axis](rxcpp::subscriber<std::tuple<QDateTime, QDateTime>> s) {

       auto rangeCallback = [s](QDateTime minv, QDateTime maxv) {

          if (s.is_subscribed()) {

              // send to the subscriber
              s.on_next(std::make_tuple<QDateTime, QDateTime>(std::move(minv), std::move(maxv)));
          }

       };

       QObject::connect(axis, &QDateTimeAxis::rangeChanged, rangeCallback);
   }); 
}

因此,使用此功能,我可以订阅 a 轴上日期范围的更改QChart

我还写了另一个函数,给定两个日期,返回一个带有来自 sqlite db 的值的 observable,如下所示

rxcpp::observable<std::tuple<double, double>> Database::getValueRange(const std::string& table, unsigned long start, unsigned long end)
{

   return rxcpp::observable<>::create<std::tuple<double, double>>(
      [this, table, start, end](rxcpp::subscriber<std::tuple<double, double>> s) {

    // get the prepared statement for the query 1, i.e. ohlcv values
    // within a date range
    sqlite3_stmt *stmt = this->m_query3_stms[table].get();

    // bind first parameter, the start timestamp
    int rc = sqlite3_bind_int64(stmt, 1, start);
    checkSqliteCode(rc, m_db.get());

    // bind the second parameter, the end timestamp
    rc = sqlite3_bind_int64(stmt, 2, end);
    checkSqliteCode(rc, m_db.get());

    // step through the query results
    while ( sqlite3_step(stmt)==SQLITE_ROW && s.is_subscribed() ) {

        // extract name values from the current result row
        float minv = sqlite3_column_double(stmt, 0);
        float maxv = sqlite3_column_double(stmt, 1);

        // send to the subscriber
        s.on_next(std::make_tuple<double, double>(minv, maxv));
    }

    // reset the statement for reuse
    sqlite3_reset(stmt);

    // send complete to the subscriber
    s.on_completed();

   });
}

如何在 RxCpp 中以惯用的形式将第一个函数(两个日期)的值作为输入传递给第二个函数?在管道结束时,我可以根据输入日期订阅来自数据库的值吗?

4

1 回答 1

1

为每对新日期值创建新值范围的规范方法是使用 map 后跟一个展平运算符

auto valueRanges = experimentOne(/*params*/).
    map(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2){
      return getValueRange(/*params*/).
          map(rxcpp::util::apply_to([=](double db1, double db2){ 
              return std::make_tuple(d1, d2, db1, db2); 
          }));
    })).
    /* switch_on_next() or merge() or concat() */
    /* this will produce std::tuple< QDateTime, QDateTime, double, double>
  • switch_on_next将取消先前的值范围并开始新的值范围。
  • merge将尽快生成所有值范围。
  • concat将按顺序一次生成一个值范围。

在值范围在不同线程上运行的情况下,必须传递线程安全协调,merge以便安全地交错值范围。

要选择特定范围,请使用filter()。如果您希望能够将范围拆分为单独的表达式,publish()请先使用共享范围。

auto hotValueRanges = valueRanges.
    publish().ref_count();

auto aDateRange = hotValueRanges.
    filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
        return isADesiredDate(d1, d2);
    })).
    subscribe(/*use the range*/);

auto anotherDateRange = hotValueRanges.
    filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
        return isAnotherDesiredDate(d1, d2);
    })).
    subscribe(/*use the range*/);
于 2018-04-18T04:50:52.030 回答