0

我遇到了 PUB/SUB 模式的问题,其中 SUB 有时会随机(一天左右一次)停止接收数据。我想每秒添加一些从 PUB 到 SUB 的自定义心跳消息,但我必须从同一个线程发送它,因为 PUB 不是线程安全的。为此,我可以使用一些 java Timer/Scheduler 每秒执行一次,但问题是 Timer/Scheduler 创建了一个新线程。我正在从 PUB 中的外部 websocket 接收数据,但我看不到每个人都在谈论的发送心跳的方法。唯一的方法是使用 XPUB-XSUB 添加代理...

知道怎么做吗?

4

1 回答 1

0

问: 知道怎么做吗?

答:
是的,有几个——首先,ZeroMQ 有内部.poll()-er,它可以侦听与套接字相关的事件,不超过规定的数量[ms],因此可以通过这种方式实现私有事件感知循环(使用stumped inproc://voidPAIR-socket PAIR-archetype(由于没有任何传输类协议/堆栈而确实具有零开销)接收

或者使用这种简单的结构化但分段的控制回路。ZeroMQ 可以将“job-ToDo-command”(s)注入控制循环,但在没有收到任何这样的实际工作命令时,控制循环可以.send()在适当的时候“软”心跳,留下可定制的休息量/ sleep、re-.send() 和它自己的“软”标记的终止信号。

忍耐一下,这确实是一个“够幼稚”的——DEMO running(live)

// 如果看到这个,控制回路专家会撕掉所有剩余的头发...

;o)

public class Main
{
    public static void main(String[] args) throws InterruptedException
    {                                                           // DEMO
        long    aNAP          =  500;                           // DEMO  500 [ms] ~ 0.5 [s]
        long    aNEXT         = 1000;                           // DEMO 1000 [ms] ~ 1.0 [s]
        long    aLAST         = 5000;                           // DEMO 5000 [ms] ~ 5.0 [s]
        long    aNextFireTIME = aNEXT + System.currentTimeMillis();
        long    aLastFireTIME = aLAST + aNextFireTIME;
        
        String  ZERO_MSG_PAYLOAD = "";
        
        boolean aTerminateFLAG = true;
        
        System.out.print(   System.currentTimeMillis() );
        System.out.println( " starts now..." );
        
        while(  aTerminateFLAG )
        {
            while ( aNextFireTIME > System.currentTimeMillis() )    
            {                                                   // DEMO
            // test & serve any critical events' contexts
            // do some useful work
            // do  any slice of low-prio maintenance work
               Thread.sleep( Math.min( aNAP,                    // DEMO
                                       aNextFireTIME - System.currentTimeMillis()
                                       )
                               );
            // sleep to take a rest, releasing CPU for other tasks
            // also may test here any incoming service-socket messages to get handled
            // using .poll( aZmqBuiltInPollerTIMEOUT );
               System.out.print(   System.currentTimeMillis() );
               System.out.println( " aNAP taken till now..." );
               System.out.flush();
            }
            if ( aLastFireTIME < System.currentTimeMillis() )   // DEMO
            {
                aTerminateFLAG = false;
            }
            aNextFireTIME = aNEXT + System.currentTimeMillis(); // DEMO
            
            System.out.print(   System.currentTimeMillis() );
            System.out.println( " PUB.send( ZERO_MSG_PAYLOAD, 0 ); # a ZERO-sized 'soft' HeartBeat message" );
            System.out.println( aNextFireTIME );
            System.out.flush();
        }
    }
}

1641539382710 starts now...
1641539383211 aNAP taken till now...
1641539383710 aNAP taken till now...
1641539383710 PUB.send( ZERO_MSG_PAYLOAD, 0 ); # a ZERO-sized 'soft' HeartBeat message
1641539384710
1641539384211 aNAP taken till now...
1641539384710 aNAP taken till now...
1641539384710 PUB.send( ZERO_MSG_PAYLOAD, 0 ); # a ZERO-sized 'soft' HeartBeat message
1641539385710
1641539385210 aNAP taken till now...
1641539385712 aNAP taken till now...
1641539385712 PUB.send( ZERO_MSG_PAYLOAD, 0 ); # a ZERO-sized 'soft' HeartBeat message
1641539386712
1641539386212 aNAP taken till now...
1641539386712 aNAP taken till now...
1641539386712 PUB.send( ZERO_MSG_PAYLOAD, 0 ); # a ZERO-sized 'soft' HeartBeat message
1641539387712
1641539387212 aNAP taken till now...
1641539387720 aNAP taken till now...
1641539387720 PUB.send( ZERO_MSG_PAYLOAD, 0 ); # a ZERO-sized 'soft' HeartBeat message
1641539388720
1641539388228 aNAP taken till now...
1641539388720 aNAP taken till now...
1641539388721 PUB.send( ZERO_MSG_PAYLOAD, 0 ); # a ZERO-sized 'soft' HeartBeat message
1641539389721
...
于 2022-01-07T07:25:53.470 回答