20

我想做的是创建一种“管道”(就像进程之间的管道),但在同一程序中的 c++ iostream 之间。我有一个需要输入流作为参数的函数,但我的数据来自输出流。那么有没有一种标准方法可以将 a 的输出通过管道传输到 astd::ostream的输入中std::istream

4

2 回答 2

24

您可以创建一个std::streambuf输出到一个缓冲区并std::overflow()在缓冲区变满时阻塞的位置。另一方面,您将有一个输入缓冲区,underflow()当缓冲区变空时会阻塞。显然,阅读和写作将在两个不同的线程中。

棘手的事情是如何同步两个缓冲区:流在访问缓冲区时不使用任何同步操作。只有当任何一个虚函数被调用时,你才能拦截操作并处理同步。另一方面,不使用缓冲区是相当低效的。我解决这个问题的方法是使用一个相对较小的输出缓冲区(例如 256char秒),并覆盖sync()以使用此函数将字符传输到输入缓冲区。将streambuf使用互斥锁进行同步,并使用条件变量来阻塞输出时的完整输入缓冲区和输入时的空输入缓冲区。为了支持完全关闭,还应该有一个函数设置一个标志,表明没有更多的输入,并且所有进一步的输出操作都应该失败。

创建实际实现表明两个缓冲区是不够的:访问输入和输出缓冲区的线程可能在各自的其他缓冲区阻塞时处于活动状态。因此,需要第三个中间缓冲区。通过对上述计划的这个小改动,下面是一些代码(它使用微小的缓冲区来确保存在实际的上溢和下溢;对于实际使用,至少输入缓冲区可能应该更大)。

// threadbuf.cpp                                                      -*-C++-*-
// ----------------------------------------------------------------------------
//  Copyright (C) 2013 Dietmar Kuehl http://www.dietmar-kuehl.de         
//                                                                       
//  Permission is hereby granted, free of charge, to any person          
//  obtaining a copy of this software and associated documentation       
//  files (the "Software"), to deal in the Software without restriction, 
//  including without limitation the rights to use, copy, modify,        
//  merge, publish, distribute, sublicense, and/or sell copies of        
//  the Software, and to permit persons to whom the Software is          
//  furnished to do so, subject to the following conditions:             
//                                                                       
//  The above copyright notice and this permission notice shall be       
//  included in all copies or substantial portions of the Software.      
//                                                                       
//  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,      
//  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES      
//  OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND             
//  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT          
//  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,         
//  WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING         
//  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR        
//  OTHER DEALINGS IN THE SOFTWARE. 
// ----------------------------------------------------------------------------


#include <algorithm>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <stdexcept>
#include <streambuf>
#include <string>
#include <thread>

// ----------------------------------------------------------------------------

class threadbuf
    : public std::streambuf
{
private:
    typedef std::streambuf::traits_type traits_type;
    typedef std::string::size_type      string_size_t;

    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::string             d_out;
    std::string             d_in;
    std::string             d_tmp;
    char*                   d_current;
    bool                    d_closed;

public:
    threadbuf(string_size_t out_size = 16, string_size_t in_size = 64)
        : d_out(std::max(string_size_t(1), out_size), ' ')
        , d_in(std::max(string_size_t(1), in_size), ' ')
        , d_tmp(std::max(string_size_t(1), in_size), ' ')
        , d_current(&this->d_tmp[0])
        , d_closed(false)
    {
        this->setp(&this->d_out[0], &this->d_out[0] + this->d_out.size() - 1);
        this->setg(&this->d_in[0], &this->d_in[0], &this->d_in[0]);
    }
    void close()
    {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            this->d_closed = true;
            while (this->pbase() != this->pptr()) {
                this->internal_sync(lock);
            }
        }
        this->d_condition.notify_all();
    }

private:
    int_type underflow()
    {
        if (this->gptr() == this->egptr())
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            while (&this->d_tmp[0] == this->d_current && !this->d_closed) {
                this->d_condition.wait(lock);
            }
            if (&this->d_tmp[0] != this->d_current) {
                std::streamsize size(this->d_current - &this->d_tmp[0]);
                traits_type::copy(this->eback(), &this->d_tmp[0],
                                  this->d_current - &this->d_tmp[0]);
                this->setg(this->eback(), this->eback(), this->eback() + size);
                this->d_current = &this->d_tmp[0];
                this->d_condition.notify_one();
            }
        }
        return this->gptr() == this->egptr()
            ? traits_type::eof()
            : traits_type::to_int_type(*this->gptr());
    }
    int_type overflow(int_type c)
    {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        if (!traits_type::eq_int_type(c, traits_type::eof())) {
            *this->pptr() = traits_type::to_char_type(c);
            this->pbump(1);
        }
        return this->internal_sync(lock)
            ? traits_type::eof()
            : traits_type::not_eof(c);
    }
    int sync()
    {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        return this->internal_sync(lock);
    }
    int internal_sync(std::unique_lock<std::mutex>& lock)
    {
        char* end(&this->d_tmp[0] + this->d_tmp.size());
        while (this->d_current == end && !this->d_closed) {
            this->d_condition.wait(lock);
        }
        if (this->d_current != end)
        {
            std::streamsize size(std::min(end - d_current,
                                          this->pptr() - this->pbase()));
            traits_type::copy(d_current, this->pbase(), size);
            this->d_current += size;
            std::streamsize remain((this->pptr() - this->pbase()) - size);
            traits_type::move(this->pbase(), this->pptr(), remain);
            this->setp(this->pbase(), this->epptr());
            this->pbump(remain);
            this->d_condition.notify_one();
            return 0;
        }
        return traits_type::eof();
    }
};

// ----------------------------------------------------------------------------

static void writer(std::ostream& out)
{
    for (std::string line; std::getline(std::cin, line); )
    {
        out << "writer: '" << line << "'\n";
    }
}

// ----------------------------------------------------------------------------

static void reader(std::istream& in)
{
    for (std::string line; std::getline(in, line); )
    {
        std::cout << "reader: '" << line << "'\n";
    }
}

// ----------------------------------------------------------------------------

int main()
{
    try
    {
        threadbuf sbuf;
        std::ostream out(&sbuf);
        std::istream in(&sbuf);

        std::thread write(&::writer, std::ref(out));
        std::thread read(&::reader, std::ref(in));

        write.join();
        sbuf.close();
        read.join();
    }
    catch (std::exception const& ex)
    {
        std::cerr << "ERROR: " << ex.what() << "\n";
    }
}
于 2012-09-13T19:19:13.180 回答
3

Dietmar Kühl 的帖子非常完整。它提供了一种在线程间执行此操作的方法,做得很好。但是有一个简单的方法使用boost::iostreams. 我今天遇到了这个问题,因为我需要学习如何处理流(设备)和过滤器。我第一次尝试它就成功了。它是您问题的一条线解决方案。

boost::iostreams::copy(istream,ostream);

这是我所在位置的一个工作示例:

#include <iostream>
#include <sstream>
#include <boost/iostreams/copy.hpp>

const char* testz = R"(H4sIAJSHnl4AA61Uy3LCMAw8t19B + wF2QmAoM67ptZ / QU8Y4Alzix9hOH39fhRgnQGd66Qmx
Wq + 0kiZs86Xb2Qf4oKx5fixJ8bjhd / dsB9BshTxiPJsxD876WGuIohFRnECErd / XRmjgb + Jg
7cPs1UjCaEYTC7RQLXc2RC1CBP / SaOEl + e7fEGk1owMj0VMt1fBy + bRaVGWxXpJVWc3nK0bH
ZGJjO1B7YfbncohtYa / M6XW1KJ6KgtEByQQwSXy + KtdrrG + yHr0SzCUvvDNnWyW / a9dtWxUO
MLZj0YrhrTjCJ2yJgYiKA5YYojkqzT2jQ3BGg9udwP43YY57eAeJCi5DMvKyN9QHQ3u / doJD
lNbnrrz9HM0H23kJtXK8IvOqIuW6IAscwohnqrSdwYKMDkHGU034EG2H42pypp + ACrhqFfGc
uLEG0P8EmRJ7 + 06EgIxxEkOLOIQhM45j4vW6Lu4oG2SqARPVTuFFjy8PIBrw9c5bfbmbaeIs
dqvARBcPtYfQtXGiet32n8tP993LJH / pz2jxQpNN7f9TgcmB0RtbPT8dDqOTT8APTbkhEyYE
AAA =)";

int main(){
    std::istringstream source(testz, std::ios_base::in | std::ios_base::binary);

    namespace bio = boost::iostreams;
    bio::copy(source, std::cout);
}

增强流的真正美妙之处在于,您只需添加过滤器即可在流上进行转换。就我而言,以及我留下长testz字符串的原因是,我将在流程中添加一个过滤器,以解压缩数据。

bio::filtering_istreambuf in;
in.push(bio::gzip_decompressor());
in.push(source);
bio::copy(in, std::cout);

好吧,这还不行,我必须弄清楚我需要什么额外的过滤器才能将电子邮件附件转换为合格的 gzip 流。但这是处理流的一种非常优雅的方式。代码在顶部保持简单。我只需要把我的新过滤器推到过滤器前面gzip_decompressor

于 2020-04-22T17:49:03.887 回答