对于自定义服务器,我打算使用int zmq::poll( zmq_pollitemt_t * items
, int nitems, long timeout = -1)
. 我认为它是围绕 unix poll 函数的包装器,但包含zmq::socket_t
在文件描述符旁边。该功能按预期工作,直到我按 ctrl+x 或$kill my_server_pid
在终端中运行。我希望民意调查以 -1 终止或抛出包含 a和消息的 a zmq::error_t
(派生自)。这应该表明存在中断。然后我的服务器应该优雅地处理信号并保存一些数据并关闭。std::exception
errno
strerr
下面我有一段代码演示了这个问题。首先我展示一下我的环境以及我是如何编译它的:
mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -v
Using built-in specs.
COLLECT_GCC=g++
COLLECT_LTO_WRAPPER=/usr/lib/gcc/x86_64-linux-gnu/4.6/lto-wrapper
Target: x86_64-linux-gnu
Configured with: ../src/configure -v --with-pkgversion='Ubuntu/Linaro 4.6.3-1ubuntu5' --with-bugurl=file:///usr/share/doc/gcc-4.6/README.Bugs --enable-languages=c,c++,fortran,objc,obj-c++ --prefix=/usr --program-suffix=-4.6 --enable-shared --enable-linker-build-id --with-system-zlib --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --with-gxx-include-dir=/usr/include/c++/4.6 --libdir=/usr/lib --enable-nls --with-sysroot=/ --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --enable-gnu-unique-object --enable-plugin --enable-objc-gc --disable-werror --with-arch-32=i686 --with-tune=generic --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu
Thread model: posix
gcc version 4.6.3 (Ubuntu/Linaro 4.6.3-1ubuntu5)
mja@gijs:~/Desktop/sample_programs/zeromq/poll$ pkg-config --modversion `enter code here`libzmq
2.2.0
mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -pthread -std=c++0x -Wall -Wextra -pedantic -o poll polling.cpp $(pkg-config --cflags --libs libzmq )
mja@gijs:~/Desktop/sample_programs/zeromq/poll$
现在是 polling.cpp 的代码:
#include <zmq.hpp>
#include <thread>
#include <cstdlib>
#include <string>
#include <iostream>
#include <signal.h>
const char* bind_addres = "tcp://*:2345";
const char* connect_addres = "tcp://localhost:2345";
inline void
send_str( zmq::socket_t& sock, const std::string& s) throw (zmq::error_t) {
zmq::message_t msg ( s.size() );
memcpy( msg.data(), s.c_str(), msg.size() );
sock.send( msg );
}
inline void recv_str( zmq::socket_t& sock, std::string& s) throw( zmq::error_t) {
zmq::message_t msg;
sock.recv(&msg);
s = std::string( static_cast<const char*>(msg.data()), msg.size());
}
static int interrupted = 0;
static void handle_signal ( int signal )
{
interrupted = signal;
std::cerr << "Interrupted by signal: " << signal << std::endl;
}
void catch_signals (void)
{
struct sigaction action;
action.sa_handler = handle_signal;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int get_interrupted(void)
{
std::cout << "interrupted = " << interrupted << std::endl;
return interrupted;
}
void req_thread ( zmq::context_t* c ){
zmq::socket_t sock( *c, ZMQ_REQ);
sock.connect(connect_addres);
zmq::pollitem_t items[]{
{ sock, 0, ZMQ_POLLIN, 0}
};
while (true){
try {
// zmq 3.x.x takes ms instead of us so change to eg 1000 or be patient.
int rc = zmq::poll(items, 1, 1000000);
if (rc > 0){
if ( items[0].revents & ZMQ_POLLIN){
std::string s;
recv_str(sock, s);
std::cout << s << std::endl;
}
}
else if ( rc == 0){ //timeout
send_str( sock, "Hello");
}
else{
std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl;
}
}
catch( zmq::error_t& e ){
std::cout << __func__ << " " << __LINE__ << e.what() << std::endl;
}
}
}
void rep_thread ( zmq::context_t* c ){
zmq::socket_t sock( *c, ZMQ_REP);
sock.bind(bind_addres);
zmq::pollitem_t items[]{
{ sock, 0, ZMQ_POLLIN, 0}
};
while (true){
try{
int rc = zmq::poll(items, 1 , -1);
if (rc > 0){
if ( items[0].revents & ZMQ_POLLIN){
std::string s;
recv_str(sock, s);
s+=" world!";
send_str(sock, s);
}
}
else{
std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl;
}
}
catch( zmq::error_t& e ){
std::cout << __func__ << " " << __LINE__ << e.what() << std::endl;
}
}
}
int main(){
zmq::context_t context(1);
catch_signals();
std::thread t1 ( rep_thread, &context);
std::thread t2 ( req_thread, &context);
t1.join();
t2.join();
return 0;
}
最后我展示了一些示例输出,说明我的问题zmq_poll
似乎不受在终端中按 ctrl+c 的影响:
mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./poll
Hello world!
Hello world!
Hello world!
^CInterrupted by signal: 2
Hello world!
Hello world!
^Z
[1]+ Stopped ./poll
mja@gijs:~/Desktop/sample_programs/zeromq/poll$ kill -9 %1
[1]+ Stopped ./poll
mja@gijs:~/Desktop/sample_programs/zeromq/poll$
所以这里可以通过终端上的输出看到没有 zmq::error_t 被抛出, zmq::poll() 也没有返回 -1;
它应该如何工作在下一个示例 simple_poll.cpp 中可以看到一个 zmq::error_t 被抛出:
#include<zmq.hpp>
#include<signal.h>
#include<iostream>
static int interrupted = 0;
static void handle_signal ( int signal )
{
interrupted = signal;
/*just to show that the signal handler works*/
std::cerr << "Interrupted by signal: " << signal << std::endl;
}
void catch_signals (void)
{
struct sigaction action;
action.sa_handler = handle_signal;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
using namespace std;
int main(){
zmq::context_t context(1);
catch_signals();
zmq::socket_t sock( context, ZMQ_REP );
/*listen on port 2346 on all available interfaces.*/
sock.bind("tcp://*:2346");
zmq::pollitem_t items[] = {
{sock, 0 , ZMQ_POLLIN, 0}
};
try {
/*wait for a event*/
zmq::poll( items, 1, -1);
/*for zmq users read message and respond*/
}
catch (zmq::error_t& e){
cout << "error occured: " <<e.what() << endl;
cout << "We were interrupted by: " << interrupted << endl;
}
return 0;
}
这会在终端中的 ctrl+x 上产生以下结果,表明 zmq::error_t 已被捕获并且信号已被处理。
mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./simple_poll ^CInterrupted by signal: 2
error occured: Interrupted system call
We were interrupted by: 2
mja@gijs:~/Desktop/sample_programs/zeromq/poll$