在下面的代码中,我有一个循环计时器,它从在线队列中获取消息列表。这段代码的目的是测试是否有可能让循环计时器线程只获取消息并将结果传递给主线程进行处理。
相反,我看到的是,尽管事件侦听器在主线程中,但在事件引发后调用的所有操作,在循环计时器线程中,总是在事件处理后完成。我知道这种情况有时可能会发生,具体取决于在任何给定时间 cpu 上的哪个线程,但我仍然应该不时地看到打印语句交织在一起。
我还通过向队列中添加大约 50 条消息来测试这种效果,我仍然看到相同的结果。
我的代码如下
public class Service implements NewWindowEventArgsListener
{
private final static String accessKey =
"secret";
private final static String secretKey =
"secret";
private final static String AWSSQSServiceUrl =
"secret";
private boolean IsWindowing = false;
private ScheduledExecutorService _windowTimer;
private long SQSWindow = 60000;
private NewWindowEventArgs handler = new NewWindowEventArgs();
private static List<Message> messages = new ArrayList<Message>();
public void StartProcesses()
{
if(this.IsWindowing)
{
return;
}
handler.addListener(this);
this._windowTimer = Executors.newSingleThreadScheduledExecutor();
Runnable task = new Runnable() {
public void run() {
WindowCallback();
}
};
this._windowTimer.scheduleAtFixedRate(task,
0, SQSWindow, TimeUnit.MILLISECONDS);
IsWindowing = true;
}
private void WindowCallback()
{
Date now = new Date();
System.out.println("The service is running: " + now);
int numberOfMessages = 0;
ArrayList<String> attributes = new ArrayList<String>();
AWSCredentials cred = new BasicAWSCredentials(accessKey, secretKey);
ClientConfiguration config = new ClientConfiguration();
config.setMaxErrorRetry(10);
AmazonSQS client = new AmazonSQSClient(cred, config);
client.setEndpoint(AWSSQSServiceUrl);
System.out.println("Receiving messages from the Queue.\n");
ReceiveMessageRequest receiveMessageRequest =
new ReceiveMessageRequest(AWSSQSServiceUrl);
receiveMessageRequest.setMaxNumberOfMessages(10);
GetQueueAttributesRequest numMessages =
new GetQueueAttributesRequest(AWSSQSServiceUrl);
attributes.add("ApproximateNumberOfMessages");
numMessages.setAttributeNames(attributes);
numberOfMessages = Integer.valueOf(
(client.getQueueAttributes(numMessages)).getAttributes().
get("ApproximateNumberOfMessages")).intValue();
System.out.println("Expected number of Messages: " + numberOfMessages);
do
{
messages.addAll(client.receiveMessage(receiveMessageRequest).
getMessages());
}while(messages.size() < numberOfMessages);
System.out.println("Starting the printing of messages");
if ( messages.size() > 0)
{
System.out.println("A message exists!");
System.out.println();
handler.NewWindowEvent(messages);
System.out.println("//////////////////////////////////");
System.out.println("\tEmptying message list");
messages.clear();
System.out.println("\tMessage list empty");
System.out.println("//////////////////////////////////");
System.out.println();
}
}
@Override
public void JetstreamService_NewWindow(List<Message> messages) {
System.out.println("Number of messages: " + messages.size() + "\n");
ObjectMapper mapper = new ObjectMapper();
try
{
for (Message message : messages)
{
//System.out.println(message.getBody() + "\n");
//byte[] bytes = DatatypeConverter.parseBase64Binary(message.getBody());
//String messageBody = new String(bytes, "UTF-8");
//System.out.println(messageBody + "\n");
AmazonSNSMessage b;
b = mapper.readValue(message.getBody(), AmazonSNSMessage.class);
String subject = b.getSubject().trim().toLowerCase();
System.out.println(subject);
if (subject.equals("heartbeatevent"))
{
HeartbeatEvent heartbeat = new HeartbeatEvent();
heartbeat.Deserialize(b.getMessage());
System.out.println(heartbeat.getHeaderEventTime() + "\n");
}
else if(subject.equals("logicaldeviceaddedevent"))
{
LogicalDeviceAddedEvent logical =
new LogicalDeviceAddedEvent();
logical.Deserialize(b.getMessage());
System.out.println(
logical.getLogicalDeviceAddedEventRegion() + "\n");
}
else if(subject.equals("logicaldeviceremovedevent"))
{
LogicalDeviceRemovedEvent logical =
new LogicalDeviceRemovedEvent();
logical.Deserialize(b.getMessage());
System.out.println(
logical.getHeaderEventId());
System.out.println(
logical.getHeaderEventTime() + "\n");
}
}
} catch (JsonParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JsonMappingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
有人可以解释为什么在主线程中没有处理这些消息,或者提供并解释为什么在所有消息都被处理后总是出现清晰的消息打印语句。