0

我正在处理的当前项目要求我实现一种方法来有效地将一组对象从一个连续运行的线程传递到主线程。当前设置如下所示。

我有一个创建新线程的主线程。这个新线程连续运行并调用基于计时器的方法。此方法从在线资源中获取一组消息并将它们组织在 TreeSet 中。

然后需要将此 TreeSet 传递回主线程,以便可以独立于循环计时器处理它包含的消息。

为了更好地参考,我的代码如下所示

// Called by the main thread on start.  
void StartProcesses()
{
    if(this.IsWindowing)
    {
        return;
    }

    this._windowTimer = Executors.newSingleThreadScheduledExecutor();

    Runnable task = new Runnable() {
        public void run() {
            WindowCallback();
        }
    };

    this.CancellationToken = false; 
    _windowTimer.scheduleAtFixedRate(task,
            0, this.SQSWindow, TimeUnit.MILLISECONDS);

    this.IsWindowing = true;
}

/////////////////////////////////////////////////////////////////////////////////

private void WindowCallback()
{
    ArrayList<Message> messages = new ArrayList<Message>();

    //TODO create Monitor
    if((!CancellationToken))
    {
        try
        {
            //TODO fix epochWindowTime
            long epochWindowTime = 0;
            int numberOfMessages = 0;
            Map<String, String> attributes;

            // Setup the SQS client
            AmazonSQS client = new AmazonSQSClient(new 
                    ClasspathPropertiesFileCredentialsProvider());

            client.setEndpoint(this.AWSSQSServiceUrl);

            // get the NumberOfMessages to optimize how to 
            // Receive all of the messages from the queue

            GetQueueAttributesRequest attributesRequest = 
                    new GetQueueAttributesRequest();
            attributesRequest.setQueueUrl(this.QueueUrl);
            attributesRequest.withAttributeNames(
                    "ApproximateNumberOfMessages");
            attributes = client.getQueueAttributes(attributesRequest).
                    getAttributes();

            numberOfMessages = Integer.valueOf(attributes.get(
                    "ApproximateNumberOfMessages")).intValue();

            // determine if we need to Receive messages from the Queue
            if (numberOfMessages > 0)
            {

                if (numberOfMessages < 10)
                {
                    // just do it inline it's less expensive than 
                    //spinning threads
                    ReceiveTask(numberOfMessages);
                }
                else
                {
                    //TODO Create a multithreading version for this
                    ReceiveTask(numberOfMessages);
                }
            }

            if (!CancellationToken)
            {

                //TODO testing
                _setLock.lock();

                Iterator<Message> _setIter = _set.iterator();
                //TODO
                while(_setIter.hasNext())
                {
                    Message temp = _setIter.next();

                    Long value = Long.valueOf(temp.getAttributes().
                            get("Timestamp"));
                    if(value.longValue() < epochWindowTime)
                    {
                        messages.add(temp);
                        _set.remove(temp);
                    }
                }

                _setLock.unlock();

                // TODO deduplicate the messages

                // TODO reorder the messages

                // TODO raise new Event with the results
            }

            if ((!CancellationToken) && (messages.size() > 0))
            {
                if (messages.size() < 10)
                {
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
                else
                {
                    //TODO Create a way to divide this work among 
                    //several threads
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
            }
        }catch (AmazonServiceException ase){
            ase.printStackTrace();
        }catch (AmazonClientException ace) {
            ace.printStackTrace();
        }
    }
}

从一些评论中可以看出,我目前首选的处理方法是在有消息的情况下在计时器线程中创建一个事件。然后主线程将监听这个事件并适当地处理它。

目前我不熟悉 Java 如何处理事件,或者如何创建/监听它们。我也不知道是否可以创建事件并让其中包含的信息在线程之间传递。

有人可以就我的方法是否可行给我一些建议/见解吗?如果是这样,我在哪里可以找到一些关于如何实现它们的信息,因为我目前的搜索尝试没有证明是富有成效的。

如果没有,我能否就如何解决这个问题提出一些建议,请记住,如果可能的话,我想避免管理套接字。

编辑1:

主线程还将负责根据它收到的消息发出命令,或发出命令以获取所需的信息。由于这个原因,主线程不能等待接收消息,并且应该以基于事件的方式处理它们。

4

1 回答 1

1

生产者-消费者模式:

一个线程(生产者)不断地将对象(消息)堆叠在队列中。另一个线程(消费者)从队列中读取并删除对象。

如果您的问题适合此,请尝试“BlockingQueue”。 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

它既简单又有效。

如果队列为空,消费者将被“阻塞”,这意味着线程等待(所以不要使用 cpu 时间)直到生产者放置一些对象。否则,消费者会不断地消费对象。如果队列已满,生产者将被阻塞,直到消费者消费一些对象以在队列中腾出空间,反之亦然。

这是一个示例:(队列在生产者和消费者中应该是相同的对象)


(生产者线程)

Message message = createMessage();
queue.put(message);

(消费者线程)

Message message = queue.take();
handleMessage(message);
于 2013-06-17T10:03:19.607 回答