0

在下面的代码中,我有一个循环计时器,它从在线队列中获取消息列表。这段代码的目的是测试是否有可能让循环计时器线程只获取消息并将结果传递给主线程进行处理。

相反,我看到的是,尽管事件侦听器在主线程中,但在事件引发后调用的所有操作,在循环计时器线程中,总是在事件处理后完成。我知道这种情况有时可能会发生,具体取决于在任何给定时间 cpu 上的哪个线程,但我仍然应该不时地看到打印语句交织在一起。

我还通过向队列中添加大约 50 条消息来测试这种效果,我仍然看到相同的结果。

我的代码如下

public class Service implements NewWindowEventArgsListener
{

private final static String accessKey = 
        "secret";

private final static String secretKey = 
        "secret";

private final static String AWSSQSServiceUrl =
        "secret";


private boolean IsWindowing = false;
private ScheduledExecutorService _windowTimer;
private long SQSWindow = 60000;
private NewWindowEventArgs handler = new NewWindowEventArgs();
private static List<Message> messages = new ArrayList<Message>();


public void StartProcesses()
{
    if(this.IsWindowing)
    {
        return;
    }

    handler.addListener(this);

    this._windowTimer = Executors.newSingleThreadScheduledExecutor();

    Runnable task = new Runnable() {
        public void run() {
            WindowCallback();
        }
    };

    this._windowTimer.scheduleAtFixedRate(task,
            0, SQSWindow, TimeUnit.MILLISECONDS);

    IsWindowing = true;
}

private void WindowCallback()
{
    Date now = new Date();
    System.out.println("The service is running: " + now);

    int numberOfMessages = 0;
    ArrayList<String> attributes = new ArrayList<String>();
    AWSCredentials cred = new BasicAWSCredentials(accessKey, secretKey);
    ClientConfiguration config = new ClientConfiguration();
    config.setMaxErrorRetry(10);

    AmazonSQS client = new AmazonSQSClient(cred, config);

    client.setEndpoint(AWSSQSServiceUrl);

    System.out.println("Receiving messages from the Queue.\n");
    ReceiveMessageRequest receiveMessageRequest = 
            new ReceiveMessageRequest(AWSSQSServiceUrl);

    receiveMessageRequest.setMaxNumberOfMessages(10);

    GetQueueAttributesRequest numMessages = 
            new GetQueueAttributesRequest(AWSSQSServiceUrl); 

    attributes.add("ApproximateNumberOfMessages");
    numMessages.setAttributeNames(attributes);

    numberOfMessages = Integer.valueOf(
            (client.getQueueAttributes(numMessages)).getAttributes().
            get("ApproximateNumberOfMessages")).intValue();

    System.out.println("Expected number of Messages: " + numberOfMessages);

    do
    {
        messages.addAll(client.receiveMessage(receiveMessageRequest).
            getMessages());
    }while(messages.size() < numberOfMessages);

    System.out.println("Starting the printing of messages");

    if ( messages.size() > 0)
    {
        System.out.println("A message exists!");
        System.out.println();
        handler.NewWindowEvent(messages);
        System.out.println("//////////////////////////////////");
        System.out.println("\tEmptying message list");
        messages.clear();
        System.out.println("\tMessage list empty");
        System.out.println("//////////////////////////////////");
        System.out.println();
    }
}

@Override
public void JetstreamService_NewWindow(List<Message> messages) {
    System.out.println("Number of messages: " + messages.size() + "\n");

    ObjectMapper mapper = new ObjectMapper();

    try 
    {
        for (Message message : messages)
        {

            //System.out.println(message.getBody() + "\n");
            //byte[] bytes = DatatypeConverter.parseBase64Binary(message.getBody());

            //String messageBody = new String(bytes, "UTF-8");

            //System.out.println(messageBody + "\n");

            AmazonSNSMessage b;

            b = mapper.readValue(message.getBody(), AmazonSNSMessage.class);

            String subject = b.getSubject().trim().toLowerCase();
            System.out.println(subject);

            if (subject.equals("heartbeatevent"))
            {
                HeartbeatEvent heartbeat = new HeartbeatEvent();

                heartbeat.Deserialize(b.getMessage());

                System.out.println(heartbeat.getHeaderEventTime() + "\n");
            }

            else if(subject.equals("logicaldeviceaddedevent"))
            {
                LogicalDeviceAddedEvent logical = 
                        new LogicalDeviceAddedEvent();

                logical.Deserialize(b.getMessage());

                System.out.println(
                        logical.getLogicalDeviceAddedEventRegion() + "\n");
            }

            else if(subject.equals("logicaldeviceremovedevent"))
            {
                LogicalDeviceRemovedEvent logical = 
                        new LogicalDeviceRemovedEvent();

                logical.Deserialize(b.getMessage());

                System.out.println(
                        logical.getHeaderEventId());

                System.out.println(
                        logical.getHeaderEventTime() + "\n");
            }
        }
    } catch (JsonParseException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (JsonMappingException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }   
}

有人可以解释为什么在主线程中没有处理这些消息,或者提供并解释为什么在所有消息都被处理后总是出现清晰的消息打印语句。

4

1 回答 1

1

不确定我是否理解您不理解的内容,但我会尝试澄清会发生什么:

  • 你启动计时器,它开始等待下一个时间表

  • 当计时器回调被执行时,选择了一个线程:不是您在 UI 事件循环中忙或等待或工作的主线程,而是另一个线程;例如,在 .Net 中,计时器实现使用全局线程池中的线程

  • 所以你的回调,即WindowCallback,在这个线程上同步执行(似乎代码中没有任何异步)

  • 它到达handler.NewWindowEvent(messages):这里我们必须区分 2 个正交概念:语义、引发事件以及它的执行方式,就像任何方法调用一样,在同一个线程上同步

  • 所以JetstreamService_NewWindow被执行,处理消息并返回

  • 在 handler.NewWindowEvent 之后恢复执行并清除消息列表

这是一个总结它的模式(ala UML):

O 表示方法的开始

X 它的末端

    timer  WindowCallback   NewWindowEvent  JetstreamService_NewWindow
    O
    |        
    +------->O
    |        |
    |        +---------------->O
    |        |                 |
    |        |                 +------------------>O
    |        |                 |                   |
    |        |                 +<------------------X
    |        |                 |
    |        +<----------------X
    |        |clear
    |<-------X
    X

好吧,我绝对不是艺术家:(

总结一下:从计时器滴答开始,只有一个执行线程一个接一个地同步运行所有方法。

希望这有帮助

于 2013-06-17T23:29:10.960 回答