1

我正在创建一个由 Log4J 附加程序组成的库,该附加程序将事件异步发送到远程服务器。当一个日志语句被创建时,appender 将异步记录事件到一个本地队列中,然后一个消费者池将检索并发送到远程。

完全在内存中的解决方案是创建一个 BlockingQueue 来处理并发问题。但是,我希望保留队列,以便如果远程服务器不可用,我不会无限制地增长队列或在有界队列的情况下开始丢弃消息。

我正在考虑使用嵌入式 H2 数据库在本地存储事件,然后使用轮询机制来检索事件并发送到远程。我宁愿使用 BlockingQueue 而不是轮询数据库表。

JMS 是答案吗?

编辑:

如果 JMS 是答案,而且似乎是这样,那么是否有人对可以配置为仅接受进程内消息的轻量级、可嵌入 JMS 解决方案提出建议?换句话说,我不想也可能不会被允许打开一个 TCP 套接字来监听。

编辑:

我现在嵌入了 ActiveMQ,它似乎正在工作。谢谢大家。

4

4 回答 4

2

Bob Lee 不久前开源了一个非常简单的磁盘支持队列,https://github.com/square/retrofit/blob/master/modules/android/src/retrofit/io/QueueFile.java - 可能会有所帮助,并且如果您可以接受本地持久性,那么引入肯定比 JMS 容易得多。

这个类是独立的——它可以被复制和粘贴。

于 2010-11-15T21:18:33.700 回答
1

您可以使用 JMS 将消息异步发送到远程机器(假设它当然可以接收它们),Log4j 有一个 JMS Appender 可以用于此目的。

于 2010-11-15T13:45:30.657 回答
1

您绝对可以为此目的使用 JMS。据我了解,您正在使用 Log4J JMS 附加程序。该组件将消息发送到预先配置的 JMS 目的地(通常是队列)。您可以将此队列配置为持久化。在这种情况下,插入队列的所有消息都将自动存储在某个持久存储(通常是数据库)中。不幸的是,此配置是特定于供应商的(取决于 JMS 供应商),但通常非常简单。请参阅您的 JMS 提供者的文档。

于 2010-11-15T13:57:08.790 回答
0

看看这是否有效

这段代码应该适合你——它是一个内存中的持久阻塞队列——需要一些文件调整但应该可以工作

       package test;

     import java.io.BufferedReader;
     import java.io.BufferedWriter;
     import java.io.File;
     import java.io.FileReader;
     import java.io.FileWriter;
     import java.io.IOException;
     import java.util.ArrayList;
     import java.util.Collections;
     import java.util.LinkedList;
     import java.util.List;

     public class BlockingQueue {

    //private static Long maxInMenorySize = 1L;
    private static Long minFlushSize = 3L;

    private static String baseDirectory = "/test/code/cache/";
    private static String fileNameFormat = "Table-";

    private static String  currentWriteFile = "";

    private static List<Object>  currentQueue = new LinkedList<Object>();
    private static List<Object>  lastQueue = new LinkedList<Object>();

    static{
        try {
            load();
        } catch (IOException e) {
            System.out.println("Unable To Load");
            e.printStackTrace();
        }
    }

    private static void load() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()==0){
            //currentQueue = lastQueue = new ArrayList<Object>();
            currentWriteFile = baseDirectory + "Table-1";
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString()+ "\n");
                lastQueue.remove(0);
            }
            writer.close();
        }else{
            if(fileList.size()>0){
                    BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                    String line=null;
                    while ((line=reader.readLine())!=null){
                        currentQueue.add(line);
                    }
                    reader.close();
                    File toDelete = new File(fileList.get(0));
                    toDelete.delete();
            }

            if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1)));
                currentWriteFile = fileList.get(fileList.size()-1);
                String line=null;
                while ((line=reader.readLine())!=null){
                    lastQueue.add(line);
                }
                reader.close();
                //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9));
            }
        }

    }

    private void loadFirst() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                String line=null;
                while ((line=reader.readLine())!=null){
                    currentQueue.add(line);
                }
                reader.close();
                File toDelete = new File(fileList.get(0));
                toDelete.delete();
        }
    }

    public Object pop(){
        if(currentQueue.size()>0)
            return  currentQueue.remove(0);

        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        if(currentQueue.size()>0)
            return  currentQueue.remove(0);
        else
            return null;
    }

    public synchronized Object waitTillPop() throws InterruptedException{
        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            if(currentQueue.size()==0)
                wait();
        }
        return currentQueue.remove(0);
    }

    public synchronized void push(Object data) throws IOException{
        lastQueue.add(data);
        this.notifyAll();
        if(lastQueue.size()>=minFlushSize){
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString() + "\n");
                lastQueue.remove(0);
            }
            writer.close();

            currentWriteFile  = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
                    (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1);
        }
    }

    public static void main(String[] args) {
        try {
            BlockingQueue bq = new BlockingQueue();

            for(int i =0 ; i<=8 ; i++){
                bq.push(""+i);
            }

            System.out.println(bq.pop());
            System.out.println(bq.pop());
            System.out.println(bq.pop());

            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());



        } catch (Exception e) {
            e.printStackTrace();
        }
    }
于 2016-01-19T11:45:47.043 回答