1

我已经实现了一个简单的 Spring Boot 应用程序,它接收网络消息,使用 appender.writeText(str) 将其排队到 SingleChronicleQueue,另一个线程使用 tailer.readText() 轮询消息。经过一些处理后,处理过的消息被放置在另一个 SingleChronicleQueue 中以发送出去。我在应用程序中有三个队列。

该应用程序每晚都会轮换文件,而第一个奇怪的事情是文件大小(对于每个 Q)是相同的(对于每个 Q 是不同的)。最大的 cq4 文件每天大约 220MB。

我面临的问题是,从开始到现在的三天内,内存从 480MB 增长到 1.6GB,这是不合理的。

我有一个想法,我在配置中遗漏了一些东西,或者我的实现很天真/糟糕。(我不会在每次使用后关闭 appender 和 tailer,应该这样)。

这是一个精简的示例,也许有人可以阐明。

@Service
public class QueuesService {
    private static Logger LOG = LoggerFactory.getLogger(QueuesService.class);

    @Autowired
    AppConfiguration conf;

    private SingleChronicleQueue Q = null;
    private ExcerptAppender QAppender = null;
    private ExcerptTailer QTailer = null;

    public QueuesService() {
    }

    @PostConstruct
    private void init() {

        Q = SingleChronicleQueueBuilder.binary(conf.getQueuePath()).indexSpacing(1).build();
        QAppender = Q.acquireAppender();
        QTailer = Q.createTailer();
    }

    public ExcerptAppender getQAppender() {
        return QAppender;
    }

    public ExcerptTailer getQTailer() {
        return QTailer;
    }
}


@Service
public class ProcessingService {
    private static Logger LOG = LoggerFactory.getLogger(ProcessingService.class);

    @Autowired
    AppConfiguration conf;

    @Autowired
    private TaskExecutor taskExecutor;

    @Autowired
    private QueuesService queueService;

    private QueueProcessor processor = null;

    public ProcessingService() {
    }

    @PostConstruct
    private void init() {
        processor = new QueueProcessor();
        processor.start();
    }

    @Override
    public Message processMessage(Message msg, Map<String, Object> metadata) throws SomeException {

        String strMsg = msg.getMessage().toString();

        if (LOG.isInfoEnabled()) {
            LOG.info("\n" + strMsg);
        }

        try {
            queueService.getQAppender().writeText(strMsg);

            if (LOG.isInfoEnabled()) {
                LOG.info("Added new message to queue. index: " + queueService.getQAppender().lastIndexAppended());
            }
        }
        catch(Exception e) {
            LOG.error("Unkbown error. reason: " + e.getMessage(), e);
        }
    }

    class QueueProcessor extends Thread {

        public void run() {
            while (!interrupted()) {
                try {
                    String msg = queueService.getEpicQTailer().readText();

                    if (msg != null) {
                        long index = queueService.getEpicQTailer().index();
                        // process
                    }
                    else {
                        Thread.sleep(10);
                    }
                }
                catch (InterruptedException e) {
                    LOG.warn(e);
                    this.interrupt();
                    break;
                }
            }

            ThreadPoolTaskExecutor tp = (ThreadPoolTaskExecutor) taskExecutor;
            tp.shutdown();
        }
    }
}
4

1 回答 1

1

Chronicle Queue 旨在使用比主内存(或堆)大得多的虚拟内存,而不会对您的系统产生重大影响。这使您可以快速随机访问数据。

这是一个在 3 小时内写入 1 TB 的进程的示例。

在此处输入图像描述

https://vanilla-java.github.io/2017/01/27/Chronicle-Queue-storing-1-TB-in-virtual-memory-on-a-128-GB-machine.html

这显示了随着队列的增长它会变慢多少

在此处输入图像描述

即使在 128 GB 的机器上它的大小为 1 TB,它在 2 秒内写入 1 GB 的时间也相当一致。

虽然这不会导致技术问题,但我们知道这确实会引起那些也觉得这种“奇怪”的人的关注,我们计划采用一种减少虚拟内存使用的模式(即使在某些用例中会慢一点)

于 2017-02-04T09:49:26.490 回答