0

我正在尝试使用 运行空闲循环libmpdclient,但已经在第一次空闲调用时,我进入了明显不可恢复的错误状态。

我传递false给 的disable_timeout参数mpd_recv_idle,以便我可以从外部停止循环(它将在后台线程中运行),以确保干净的关闭过程。

这是我的测试代码:

#include <string>
#include <stdexcept>
#include <memory>

#include <mpd/client.h>

typedef std::unique_ptr<mpd_connection, decltype(&mpd_connection_free)>
mpd_connection_ptr;

void
check_error (const mpd_connection_ptr &c, const std::string &s)
{
  if (mpd_connection_get_error (c.get ()) != MPD_ERROR_SUCCESS)
    {
      throw std::runtime_error (s);
    }
}

int
main (void)
{
  const std::string host = "127.0.0.1";
  const uint16_t port = 7701;
  const int timeout = 1 * 1000;
  mpd_connection_ptr c {mpd_connection_new (host.c_str (), port, timeout),
                        &mpd_connection_free
                       };
  if (c == nullptr)
    {
      throw std::runtime_error ("connection_new returned nullptr");
    }
  if (!mpd_send_idle (c.get ()))
    {
      throw std::runtime_error ("mpd_send_idle returned false");
    }
  check_error (c, "mpd_send_idle caused error status");
  auto idle = mpd_recv_idle (c.get (), false);
  if (idle == 0)
    {
      if (mpd_connection_get_error (c.get ()) == MPD_ERROR_TIMEOUT)
        {
          if (!mpd_connection_clear_error (c.get ()))
            {
              throw std::runtime_error ("mpd_connection_clear_error returned false");
            }
        }
      else
        {
          check_error (c, "mpd_recv_idle caused error status");
        }
    }
  return 0;
}

当我运行这段代码时(mpd 在 127.0.0.1:7701 上运行,我用 进行了检查netstat),我得到了这个结果:

terminate called after throwing an instance of 'std::runtime_error'
  what():  mpd_connection_clear_error returned false

为什么我不能在这里清除超时错误,这似乎是一种可恢复的情况?

4

1 回答 1

1

通过研究libmpdclient源代码,我想我可以自己回答这个问题。

超时是库设计中不可恢复的错误。这就是为什么首先出现disable_timeout参数 for的原因mpd_recv_idle ()

同步空闲请求预计将“永远”阻塞(直到 MPD 响应请求)。这与我想要的不兼容,我可能不得不使用低级异步接口来实现我想要的。

这是我的解决方案(错误检查最少)。

程序等待用户按 ENTER 并在后台处理 MPD 空闲消息,这些消息可以每 200 毫秒中断一次。

少了什么东西:

  • 返回码解析
  • 空闲消息响应解析

这是代码:

#include <string>
#include <stdexcept>
#include <memory>
#include <iostream>
#include <thread>
#include <chrono>

#include <netinet/in.h>
#include <netdb.h>
#include <strings.h>
#include <unistd.h>

#include <mpd/async.h>
// #include <mpd/client.h>

typedef std::unique_ptr<mpd_async, decltype(&mpd_async_free)>
mpd_async_ptr;

void
check_error (const mpd_async_ptr &c, const std::string &s)
{
  if (mpd_async_get_error (c.get ()) != MPD_ERROR_SUCCESS)
    {
      throw std::runtime_error (s);
    }
}

mpd_async_event
async_poll (const mpd_async *async, timeval *tv)
{
  int events = mpd_async_events (async);
  if (events == 0)
    {
      throw std::runtime_error ("mpd_async_events failed");
    }
  int fd = mpd_async_get_fd (async);
  fd_set rfds, wfds, efds;
  FD_ZERO(&rfds);
  FD_ZERO(&wfds);
  FD_ZERO(&efds);

  if (events & MPD_ASYNC_EVENT_READ)
    {
      FD_SET(fd, &rfds);
    }
  if (events & MPD_ASYNC_EVENT_WRITE)
    {
      FD_SET(fd, &wfds);
    }
  if (events & (MPD_ASYNC_EVENT_HUP|MPD_ASYNC_EVENT_ERROR))
    {
      FD_SET(fd, &efds);
    }

  int ret = select (fd + 1, &rfds, &wfds, &efds, tv);
  if (ret > 0)
    {
      if (!FD_ISSET(fd, &rfds))
        {
          events &= ~MPD_ASYNC_EVENT_READ;
        }
      if (!FD_ISSET(fd, &wfds))
        {
          events &= ~MPD_ASYNC_EVENT_WRITE;
        }
      if (!FD_ISSET(fd, &efds))
        {
          events &= ~(MPD_ASYNC_EVENT_HUP| MPD_ASYNC_EVENT_ERROR);
        }
      return (mpd_async_event) events;
    }
  return (mpd_async_event) 0;
}

int
socket_connect (const std::string &host, uint16_t port)
{
  int sockfd = socket (AF_INET, SOCK_STREAM, 0);
  hostent *server = gethostbyname (host.c_str ());
  sockaddr_in server_addr;
  bzero ((char *) &server_addr, sizeof (server_addr));
  server_addr.sin_family = AF_INET;
  bcopy ((char *) server->h_addr, (char *) &server_addr.sin_addr.s_addr,
         server->h_length);
  server_addr.sin_port = htons (port);
  if (::connect (sockfd, (struct sockaddr*) &server_addr,
                 sizeof (server_addr)) < 0)
    {
      throw std::string ("ERROR connecting");
    }
  return sockfd;
}

void
mpd_notify_thread_proc (bool &app_is_running)
{
  const std::string host = "127.0.0.1";
  const uint16_t port = 7701;

  auto sockfd = socket_connect (host, port);
  mpd_async_ptr async_ptr {mpd_async_new (sockfd), mpd_async_free};
  auto async = async_ptr.get ();
  if (async == nullptr)
    {
      throw std::runtime_error ("mpd_async_new failed");
    }

  while (app_is_running)
    {
      timeval tv;
      tv.tv_sec = 0;
      tv.tv_usec = 200 * 1000;
      auto events = async_poll (async, &tv);
      if (events != 0)
        {
          if (!mpd_async_io (async, (mpd_async_event) events))
            {
              throw std::runtime_error ("connection was closed");
            }

          char* line_ptr;
          while ((line_ptr = mpd_async_recv_line (async)) != nullptr)
            {
              std::cout << "recv: " << line_ptr << "\n";
              std::string line {line_ptr};
              if (line.find ("OK") == 0)
                {
                  if (!mpd_async_send_command (async, "idle", nullptr))
                    {
                      throw std::runtime_error ("mpd_async_send_command failed");
                    }
                }
            }
        }
    }
}

int
main(void)
{
  bool app_is_running = true;
  std::thread mpd_notify_thread =
    std::thread (
      [&] ()
  {
    mpd_notify_thread_proc (app_is_running);
  });


  std::string response;
  getline (std::cin, response);
  std::cout << "shutting down...\n";
  app_is_running = false;
  mpd_notify_thread.join ();
}

改进的版本可以“尽可能快地”中断,方法是select ()不带 a调用timeval并观察关闭pipe ()

#include <string>
#include <stdexcept>
#include <memory>
#include <iostream>
#include <thread>
#include <chrono>

#include <netinet/in.h>
#include <netdb.h>
#include <strings.h>
#include <unistd.h>

#include <mpd/async.h>
// #include <mpd/client.h>

typedef std::unique_ptr<mpd_async, decltype(&mpd_async_free)>
mpd_async_ptr;

void
check_error (const mpd_async_ptr &c, const std::string &s)
{
  if (mpd_async_get_error (c.get ()) != MPD_ERROR_SUCCESS)
    {
      throw std::runtime_error (s);
    }
}

mpd_async_event
async_poll (const mpd_async *async, int *shutdown_fd)
{
  int events = mpd_async_events (async);
  if (events == 0)
    {
      throw std::runtime_error ("mpd_async_events failed");
    }
  int fd = mpd_async_get_fd (async);
  fd_set rfds, wfds, efds;
  FD_ZERO(&rfds);
  FD_ZERO(&wfds);
  FD_ZERO(&efds);

  if (events & MPD_ASYNC_EVENT_READ)
    {
      FD_SET(fd, &rfds);
    }
  if (events & MPD_ASYNC_EVENT_WRITE)
    {
      FD_SET(fd, &wfds);
    }
  if (events & (MPD_ASYNC_EVENT_HUP|MPD_ASYNC_EVENT_ERROR))
    {
      FD_SET(fd, &efds);
    }

  FD_SET(*shutdown_fd, &rfds);
  FD_SET(*shutdown_fd, &wfds);
  FD_SET(*shutdown_fd, &efds);

  int ret = select ((fd > *shutdown_fd ? fd : *shutdown_fd) + 1, &rfds, &wfds, &efds, NULL);
  if (ret > 0)
    {
      if (!FD_ISSET(fd, &rfds))
        {
          events &= ~MPD_ASYNC_EVENT_READ;
        }
      if (!FD_ISSET(fd, &wfds))
        {
          events &= ~MPD_ASYNC_EVENT_WRITE;
        }
      if (!FD_ISSET(fd, &efds))
        {
          events &= ~(MPD_ASYNC_EVENT_HUP| MPD_ASYNC_EVENT_ERROR);
        }
      if (FD_ISSET(*shutdown_fd, &rfds))
    {
      *shutdown_fd = 0;
    }
      if (FD_ISSET(*shutdown_fd, &wfds))
    {
      *shutdown_fd = 0;
    }
      if (FD_ISSET(*shutdown_fd, &efds))
    {
      *shutdown_fd = 0;
    }
      return (mpd_async_event) events;
    }
  return (mpd_async_event) 0;
}

int
socket_connect (const std::string &host, uint16_t port)
{
  int sockfd = socket (AF_INET, SOCK_STREAM, 0);
  hostent *server = gethostbyname (host.c_str ());
  sockaddr_in server_addr;
  bzero ((char *) &server_addr, sizeof (server_addr));
  server_addr.sin_family = AF_INET;
  bcopy ((char *) server->h_addr, (char *) &server_addr.sin_addr.s_addr,
         server->h_length);
  server_addr.sin_port = htons (port);
  if (::connect (sockfd, (struct sockaddr*) &server_addr,
                 sizeof (server_addr)) < 0)
    {
      throw std::string ("ERROR connecting");
    }
  return sockfd;
}

void
mpd_notify_thread_proc (int shutdown_fd)
{
  const std::string host = "127.0.0.1";
  const uint16_t port = 7701;

  auto sockfd = socket_connect (host, port);
  mpd_async_ptr async_ptr {mpd_async_new (sockfd), mpd_async_free};
  auto async = async_ptr.get ();
  if (async == nullptr)
    {
      throw std::runtime_error ("mpd_async_new failed");
    }

  while (shutdown_fd != 0)
    {
      auto events = async_poll (async, &shutdown_fd);
      if (shutdown_fd == 0)
    {
      break;
    }
      if (events != 0)
        {
          if (!mpd_async_io (async, (mpd_async_event) events))
            {
              throw std::runtime_error ("connection was closed");
            }

          char* line_ptr;
          while ((line_ptr = mpd_async_recv_line (async)) != nullptr)
            {
              std::cout << "recv: " << line_ptr << "\n";
              std::string line {line_ptr};
              if (line.find ("OK") == 0)
                {
                  if (!mpd_async_send_command (async, "idle", nullptr))
                    {
                      throw std::runtime_error ("mpd_async_send_command failed");
                    }
                }
            }
        }
    }
}

int
main(void)
{
  int shutdown_pipe[2];
  pipe (shutdown_pipe);
  std::thread mpd_notify_thread = std::thread ([&] ()
  {
    mpd_notify_thread_proc (shutdown_pipe[0]);
  });

  std::string response;
  getline (std::cin, response);
  std::cout << "shutting down...\n";
  close (shutdown_pipe[1]);
  mpd_notify_thread.join ();
  close (shutdown_pipe[0]);
}

如果您愿意以基于事件的样式(使用libuvand uvw)编写代码,则更好的解决方案:

#include <string>
#include <stdexcept>
#include <memory>
#include <iostream>

#include <uvw.hpp>

int
main(void)
{
  auto loop = uvw::Loop::getDefault ();
  if (loop == nullptr)
    {
      throw std::runtime_error ("loop init failed");
    }
  auto tcp = loop->resource<uvw::TcpHandle>();
  if (tcp == nullptr)
    {
      throw std::runtime_error ("tcp init failed");
    }

  tcp->once<uvw::ConnectEvent> ([] (const uvw::ConnectEvent &, uvw::TcpHandle &tcp) mutable
  {
    tcp.read ();
  });

  tcp->once<uvw::ErrorEvent> ([] (const uvw::ErrorEvent &, uvw::TcpHandle &) mutable
  {
    std::cerr << "Connection error\n";
  });

  std::string buf;
  tcp->on<uvw::DataEvent> ([&] (const uvw::DataEvent &event, uvw::TcpHandle &tcp) mutable
  {
    std::string data {event.data.get (), event.length};
    buf += data;

    std::string::size_type pos;
    while ((pos = buf.find ('\n')) != std::string::npos)
      {
    std::string line = buf.substr (0, pos);
    buf.erase (0, pos + 1);
    if (!line.compare (0, 2, "OK"))
      {
        const std::string idle = "idle\n";
        std::unique_ptr<char[]> ptr {new char[idle.size ()]};
        idle.copy (ptr.get (), idle.size ());
        tcp.write (std::move (ptr), idle.size ());
      }
    else
      {
        std::cout << line << "\n";
      }
      }
  });

  tcp->connect ("127.0.0.1", 7701);

  loop->run<uvw::Loop::Mode::DEFAULT> ();
}
于 2017-01-06T16:50:51.347 回答