0

我目前正在研究 zeromq 3.2.2(在 linux 上),我将它用于 C++ 应用程序和 PHP 应用程序之间的 IPC。我让他们都连接到以下“位置”:

“ipc:///tmp/unibroker.5558.pipe”

==>(经销商回复)

当我尝试将 C++ 客户端连接到 C++ 服务器时,这很有效,但是,当我尝试从 PHP 做同样的事情时,我什么也得不到!我最好的猜测是来自 linux 的访问权限,所以我做了 chmod 777 /tmp/unibroker.5558.pipe... 但它没有帮助:-(

[编辑] 我想补充一点,当我使用 tcp 时它确实有效,但我想使用 IPC 进行性能测试

有任何想法吗?

这是我的代码:

function setCache( $topic, $value )
    {
        $this->pBackPort->send( "", ZMQ::MODE_SNDMORE );
        $this->pBackPort->send( $topic, ZMQ::MODE_SNDMORE );
        $this->pBackPort->send( "INT", ZMQ::MODE_SNDMORE );
        $this->pBackPort->send( $value );
    }

    function changePort( $backPort, $frontPort )
    {
        $this->pFrontPort = new ZMQSocket($this->pContext, ZMQ::SOCKET_SUB );
        //$this->pFrontPort->connect('tcp://127.0.0.1:'.$frontPort);
        echo 'ipc:///tmp/unibroker.'.$frontPort.'.pipe';
        $this->pFrontPort->connect('ipc:///tmp/unibroker.'.$frontPort.'.pipe');

        $this->pBackPort = new ZMQSocket($this->pContext, ZMQ::SOCKET_DEALER);
        //$this->pBackPort->connect('tcp://127.0.0.1:'.$backPort);
        $this->pFrontPort->connect('ipc:///tmp/unibroker.'.$backPort.'.pipe');
    }

这是绑定方面:

UniBroker::UniBroker( unsigned short backPort, unsigned short frontPort )
    : pBackPort( backPort ), pFrontPort( frontPort )/*, Interconnected( pInterconnected )*/
{
    Version = new char[20];
    int major, minor, patch;
    zmq_version (&major, &minor, &patch);
    sprintf (Version, "%d.%d.%d", major, minor, patch);

    Neighbour = NULL;
    evt = NULL;
    pContext = zctx_new ();

    pSync = zsocket_new (pContext, ZMQ_REP);
    pBackend = zsocket_new (pContext, ZMQ_XPUB);

    char connection[50];
#ifdef __LINUX__
    sprintf(connection, "ipc:///tmp/unibroker.%d.pipe",pBackPort );
#else
    sprintf(connection, "tcp://*:%d",pBackPort );
#endif;
    zsocket_bind (pSync, connection);

#ifdef __LINUX__
    sprintf(connection, "ipc:///tmp/unibroker.%d.pipe",pFrontPort );
#else
    sprintf(connection, "tcp://*:%d",pFrontPort );
#endif
    zsocket_bind (pBackend, connection);

    int verbose = 1;
    zmq_setsockopt( pBackend, ZMQ_XPUB_VERBOSE, &verbose, sizeof( int ) );
}

void UniBroker::setCache( string topic, string value, bool external )
{
    pCache[topic] = value;

    zstr_sendm(pSync, "");
    zstr_sendm(pSync, topic.c_str() );
    zstr_sendm(pSync, external ? "EXT" : "INT" );
    zstr_send(pSync, value.c_str() );
}

void UniBroker::Run( int timeout )
{
    zmq_pollitem_t items [] = {
        { pBackend,  0, ZMQ_POLLIN, 0 },
        { pSync,  0, ZMQ_POLLIN, 0 }
    };
    if (zmq_poll (items, 2, timeout) == -1)
    {
        return;
    }

    //  Any new topic data we cache and then forward
    if (items [1].revents & ZMQ_POLLIN)
    {
        char *topic = zstr_recv (pSync);
        char *type = zstr_recv (pSync);
        char *current = zstr_recv (pSync);
        const char *previous = pCache[ topic ].c_str();

        if( current && strcmp( current , previous ) != 0 )
        {
            pCache[ topic ] = current;
            zstr_sendm (pBackend, topic);
            zstr_sendm (pBackend, type);
            zstr_send (pBackend, current);

            if( Neighbour != NULL )
            {
                Neighbour->setCache(topic, current, true);
            }

            UniEvent *event = pEvents[topic];

            if( event )
            {
                (*event).external = ( strcmp( "EXT", type ) == 0 );
                (*event)( topic, current );
            }

            event = pEvents["*"];
            if( event )
            {
                (*event).external = ( strcmp( "EXT", type ) == 0 );
                (*event)( topic, current );
            }
        }

        zstr_send(pSync, "");
    }

    if (items [0].revents & ZMQ_POLLIN)
    {
        zframe_t *frame = zframe_recv (pBackend);

        if( frame )
        {
            //  Event is one byte 0=unsub or 1=sub, followed by topic
            byte *event = zframe_data (frame);
            if (event [0] == 1) {
                char *topic = new char[zframe_size(frame)+1];
                memcpy (topic, event + 1, zframe_size (frame) - 1);
                topic[zframe_size(frame)]=NULL;

                printf("\t\tSubscribed for topic %s\r\n", topic);
                if( strlen( topic ) == 0 )
                {
                    map<string,string>::iterator it = pCache.begin();
                    while( it != pCache.end() )
                    {
                        const char *previous = (*it).second.c_str();

                        if (previous && strlen( previous ) > 0 ) {
                            zstr_sendm (pBackend, "%s", (*it).first.c_str());
                            zstr_sendm (pBackend, "%s", "INT");
                            zstr_send (pBackend, "%s", previous);
                        }

                        it++;
                    }
                }
                else
                {
                    const char *previous = pCache[topic].c_str();

                    if (previous && strlen( previous ) > 0 ) {
                        zstr_sendm (pBackend, "%s", topic);
                        zstr_sendm (pBackend, "%s", "INT");
                        zstr_send (pBackend, "%s", previous);
                    }
                }

                delete (topic);
            }
            zframe_destroy (&frame);
        }
    }
}
4

0 回答 0