1

我想我用错了线程,所以我想问一下这是否是好的设计。基本上我有一个程序从队列中提取数据然后处理它(处理是纯数学,所以它 100% cpu 密集型),然后如果数据是好的,它发送到一个“好”队列,否则它要么完全丢弃,要么是它的一部分被发送回初始的“工作”队列以进行进一步处理。这就是它的高级逻辑,当我的队列在内存中时,我的程序使用了所有内核并且速度非常快。随着数据的增长,我决定使用队列服务器来存储队列,然后将处理分配到多台机器上,现在它的速度很慢(每个内核只使用了 40%-60%)。

我试图分析我的代码(使用 yourkit 和 netbeans 中的内置代码),它说大部分时间(80%)都花在了队列程序上。我想我可以通过将所有外部程序的东西推到另一个线程来保持我的程序中不断地处理数字,但这对性能没有帮助,我想知道我是不是弄错了。我不确定,但我想知道是否从现有线程(父线程)启动线程(子线程),是否必须在父线程完成之前完成子线程?

我的代码很大,不需要 99%,所以我只写它的高级版本(它可能无法编译,但应该让你知道我在做什么)。

public class worker {

    private static ExecutorService executor;
    static {
        final int numberOfThreads = 4;
        executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());    
    }
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        // TODO Auto-generated method stub
        System.out.println("starting worker..");
        //Connection information goes here
        channel.basicQos(50); //this is part of the connection, the queue server only gives 50 messages without acknowledgment

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //gets data from queue
            String message = new String(delivery.getBody());  
            executor.submit(new DoWork(channel, message, delivery));
        }

class DoWork implements Runnable{ //where the main work happens
    //setup variables, basically passing queue connection information as well as data here, so I only need to rely on one connection    
    public void run() {
        new Thread(new Awk_to_Queue(channel, delivery)).start(); //this sends an Awk_to_Queue to the queue, I launch a thread for this so my program can keep working.

        if (data is good) {
            new Thread(new Send_to_Queue("success_queue", message1, channel)).start();                
            continue;
        }  else if (Data is not good but not bad either ) {
        new Thread(new Send_to_Queue("task_queue", message2, channel)).start();             
        } 

class Send_to_Queue implements Runnable{       
    public void run() {
        //takes data in and sends to queue in the way I used to previous do it, but just does it in a thread. queue connection is passed over so I only need to have one connection
    }
}


class Awk_to_Queue implements Runnable{
    public void run() {
        //awk's queue so queue server can send one more piece of data to queue up
    }
}

它在那里。如果它有点难以阅读,我很抱歉(我删除了很多东西只是为了向你展示我正在做的事情的结构)。我做错了什么,分叉线程不会影响速度(它没有看到它变得更快,也没有改变探查器的结果)?是我分叉线程(new Thread(new Awk_to_Queue(channel, delivery)).start();)的方式有问题,还是像我的线程设计一样?

4

1 回答 1

6

想到两件事:

1)读取远程队列的唯一线程似乎是在 main() 方法中运行无限循环的主线程。不管你把东西塞进它的速度有多快,你处理它们的速度永远不会超过你把它们取出来的速度。

2) Spawning new Thread();s 是一个“昂贵”的操作。不断为单个短任务创建新线程只是通过内存分配和本机资源搅动。您应该将那些“队列放置”卸载到ExecutorService可以调整大小的一秒钟,而不是产生无限数量的线程。

于 2012-04-16T22:23:15.063 回答