我目前正在研究 zeromq 3.2.2(在 linux 上),我将它用于 C++ 应用程序和 PHP 应用程序之间的 IPC。我让他们都连接到以下“位置”:
“ipc:///tmp/unibroker.5558.pipe”
==>(经销商回复)
当我尝试将 C++ 客户端连接到 C++ 服务器时,这很有效,但是,当我尝试从 PHP 做同样的事情时,我什么也得不到!我最好的猜测是来自 linux 的访问权限,所以我做了 chmod 777 /tmp/unibroker.5558.pipe... 但它没有帮助:-(
[编辑] 我想补充一点,当我使用 tcp 时它确实有效,但我想使用 IPC 进行性能测试
有任何想法吗?
这是我的代码:
function setCache( $topic, $value )
{
$this->pBackPort->send( "", ZMQ::MODE_SNDMORE );
$this->pBackPort->send( $topic, ZMQ::MODE_SNDMORE );
$this->pBackPort->send( "INT", ZMQ::MODE_SNDMORE );
$this->pBackPort->send( $value );
}
function changePort( $backPort, $frontPort )
{
$this->pFrontPort = new ZMQSocket($this->pContext, ZMQ::SOCKET_SUB );
//$this->pFrontPort->connect('tcp://127.0.0.1:'.$frontPort);
echo 'ipc:///tmp/unibroker.'.$frontPort.'.pipe';
$this->pFrontPort->connect('ipc:///tmp/unibroker.'.$frontPort.'.pipe');
$this->pBackPort = new ZMQSocket($this->pContext, ZMQ::SOCKET_DEALER);
//$this->pBackPort->connect('tcp://127.0.0.1:'.$backPort);
$this->pFrontPort->connect('ipc:///tmp/unibroker.'.$backPort.'.pipe');
}
这是绑定方面:
UniBroker::UniBroker( unsigned short backPort, unsigned short frontPort )
: pBackPort( backPort ), pFrontPort( frontPort )/*, Interconnected( pInterconnected )*/
{
Version = new char[20];
int major, minor, patch;
zmq_version (&major, &minor, &patch);
sprintf (Version, "%d.%d.%d", major, minor, patch);
Neighbour = NULL;
evt = NULL;
pContext = zctx_new ();
pSync = zsocket_new (pContext, ZMQ_REP);
pBackend = zsocket_new (pContext, ZMQ_XPUB);
char connection[50];
#ifdef __LINUX__
sprintf(connection, "ipc:///tmp/unibroker.%d.pipe",pBackPort );
#else
sprintf(connection, "tcp://*:%d",pBackPort );
#endif;
zsocket_bind (pSync, connection);
#ifdef __LINUX__
sprintf(connection, "ipc:///tmp/unibroker.%d.pipe",pFrontPort );
#else
sprintf(connection, "tcp://*:%d",pFrontPort );
#endif
zsocket_bind (pBackend, connection);
int verbose = 1;
zmq_setsockopt( pBackend, ZMQ_XPUB_VERBOSE, &verbose, sizeof( int ) );
}
void UniBroker::setCache( string topic, string value, bool external )
{
pCache[topic] = value;
zstr_sendm(pSync, "");
zstr_sendm(pSync, topic.c_str() );
zstr_sendm(pSync, external ? "EXT" : "INT" );
zstr_send(pSync, value.c_str() );
}
void UniBroker::Run( int timeout )
{
zmq_pollitem_t items [] = {
{ pBackend, 0, ZMQ_POLLIN, 0 },
{ pSync, 0, ZMQ_POLLIN, 0 }
};
if (zmq_poll (items, 2, timeout) == -1)
{
return;
}
// Any new topic data we cache and then forward
if (items [1].revents & ZMQ_POLLIN)
{
char *topic = zstr_recv (pSync);
char *type = zstr_recv (pSync);
char *current = zstr_recv (pSync);
const char *previous = pCache[ topic ].c_str();
if( current && strcmp( current , previous ) != 0 )
{
pCache[ topic ] = current;
zstr_sendm (pBackend, topic);
zstr_sendm (pBackend, type);
zstr_send (pBackend, current);
if( Neighbour != NULL )
{
Neighbour->setCache(topic, current, true);
}
UniEvent *event = pEvents[topic];
if( event )
{
(*event).external = ( strcmp( "EXT", type ) == 0 );
(*event)( topic, current );
}
event = pEvents["*"];
if( event )
{
(*event).external = ( strcmp( "EXT", type ) == 0 );
(*event)( topic, current );
}
}
zstr_send(pSync, "");
}
if (items [0].revents & ZMQ_POLLIN)
{
zframe_t *frame = zframe_recv (pBackend);
if( frame )
{
// Event is one byte 0=unsub or 1=sub, followed by topic
byte *event = zframe_data (frame);
if (event [0] == 1) {
char *topic = new char[zframe_size(frame)+1];
memcpy (topic, event + 1, zframe_size (frame) - 1);
topic[zframe_size(frame)]=NULL;
printf("\t\tSubscribed for topic %s\r\n", topic);
if( strlen( topic ) == 0 )
{
map<string,string>::iterator it = pCache.begin();
while( it != pCache.end() )
{
const char *previous = (*it).second.c_str();
if (previous && strlen( previous ) > 0 ) {
zstr_sendm (pBackend, "%s", (*it).first.c_str());
zstr_sendm (pBackend, "%s", "INT");
zstr_send (pBackend, "%s", previous);
}
it++;
}
}
else
{
const char *previous = pCache[topic].c_str();
if (previous && strlen( previous ) > 0 ) {
zstr_sendm (pBackend, "%s", topic);
zstr_sendm (pBackend, "%s", "INT");
zstr_send (pBackend, "%s", previous);
}
}
delete (topic);
}
zframe_destroy (&frame);
}
}
}