2

我想在我的 Java 应用程序中使用带有订阅过滤器的 Windows Azure 服务总线主题。我正在使用Windows Azure Plugin for Eclipse with Java(由 Microsoft Open Technologies 提供)和 Windows Azure SDK 2.0。

我发现 Microsoft 网站上显示如何使用 Java API 执行此操作的基本示例代码不起作用。

基本示例代码应如何使用 API 从 Java 应用程序以编程方式完成以下任务?

1) 在我现有的命名空间中为现有的“TestTopic”获取一个 ServiceBusContract 对象。

2) 使用默认的 MatchAll 过滤器创建一个“AllMessages”订阅。这个订阅接收所有发送到它的虚拟队列中的“TestTopic”的消息。

3) 使用 SqlFilter 过滤器创建一个“LowMessages”订阅以过滤“MessageNumber <=3”。此订阅的虚拟队列应该只接收 MessageNumber 自定义属性值小于或等于 3 的消息。

4) 创建一个“HighMessages”订阅,使用 SqlFilter 过滤“MessageNumber <=3”。此订阅的虚拟队列应仅接收 MessageNumber 自定义属性值大于 3 的消息。

5)发送一批示例代理消息到“TestTopic”(0<= MessageNumber <7)

6) 接收所有三个订阅的消息并显示它们已被过滤。

我相信我找到了该示例不起作用的原因,并且我将更正后的 Java 代码作为下面的答案提供给其他可能也希望查看如何执行此操作的基本示例的人。

4

1 回答 1

5

从 Azure 门户获取我的默认管理凭据后,如文档中所述,我的应用程序能够为我的服务总线命名空间获取 ServiceBusContract 对象,如下所示:

public class Service 
{
private Configuration config;
public Service() 
{
   String namespace = "jasper";
   String issuer = "owner";
   String key = "BB9BB9BBBbBBBbBbbBBbB99SS9b+Bb9BbB+bbBBbBB9=";
   String serviceBusRootUri = ".servicebus.windows.net";
   String wrapRootUri = "-sb.accesscontrol.windows.net/WRAPv0.9";       
   this.config = ServiceBusConfiguration.configureWithWrapAuthentication(
      namespace, 
      issuer, 
      key,
      serviceBusRootUri,
      wrapRootUri);     
}   
public ServiceBusContract getservice()
{       
   ServiceBusContract service = ServiceBusService.create(config);
   return service;
}   
}

然后可以使用以下 Java 代码创建对“TestTopic”主题的三个订阅。“TestTopic”必须已经存在于“jasper”命名空间中。“AllMessages”订阅是默认订阅,它接收发送到该主题的所有消息。“LowMessages”和“HighMessages”订阅使用 SqlFilter 规则根据 MessageNumber 自定义属性的值过滤消息。原始示例代码省略了提供规则名称和删除默认规则的必要性。如果不删除默认规则,订阅仍会收到所有消息。

public class Make_sub_rule 
{
public static void main(String[] args) throws ServiceException {        
   Service creds = new Service();
   ServiceBusContract service = creds.getservice();         
   SubscriptionInfo subInfo = new SubscriptionInfo("AllMessages");
   service.createSubscription("TestTopic", subInfo);
   System.out.println(subInfo.getName() + " Default Rules");

   SubscriptionInfo subInfo1 = new SubscriptionInfo("LowMessages");
   CreateSubscriptionResult result1 = service.createSubscription("TestTopic", subInfo1);
   RuleInfo ruleInfo1 = new RuleInfo("RULENAME1");
   ruleInfo1 = ruleInfo1.withSqlExpressionFilter("MessageNumber <= 3");
   CreateRuleResult ruleResult1 = 
      service.createRule("TestTopic", "LowMessages", ruleInfo1);
   service.deleteRule("TestTopic", "LowMessages", "$Default");
   System.out.println(subInfo1.getName() + " " + result1.toString());
   System.out.println(ruleInfo1.getName() + " " + ruleResult1.toString());

   SubscriptionInfo subInfo2 = new SubscriptionInfo("HighMessages");
   CreateSubscriptionResult result2 = service.createSubscription("TestTopic", subInfo2);
   RuleInfo ruleInfo2 = new RuleInfo("RULENAME2");
   ruleInfo2 = ruleInfo2.withSqlExpressionFilter("MessageNumber > 3");
   CreateRuleResult ruleResult2 = 
      service.createRule("TestTopic", "HighMessages", ruleInfo2);
   service.deleteRule("TestTopic", "HighMessages", "$Default");
   System.out.println(subInfo2.getName() + " " + result2.toString());
   System.out.println(ruleInfo2.getName() + " " + ruleResult2.toString());
    }
}

下面将一批代理消息发送到“TestTopic”并递增 MessageNumber 自定义属性的值。

public class SendSbMsTopicB 
{
public static void main(String[] args) throws ServiceException {        
   Service creds = new Service();
   ServiceBusContract service = creds.getservice(); 
   for (int i=0; i<7; i++) 
   {
   BrokeredMessage message = new BrokeredMessage("Test message" + i);
   message.setLabel("Day" + i);
   message.setProperty("MessageNumber", i);
   message.setProperty("CustomProperty", "CustomTestValue" + i);
   service.sendTopicMessage("TestTopic", message);
   System.out.println("send MessageNumber " + i + " to topic");
   }
}
}

重复运行以下代码以读取“AllMessages”、“LowMessages”或“HighMessages”订阅的虚拟队列(更改subscriptionName 的值)。每条消息在读取时都会从虚拟队列中删除,直到该订阅的队列中没有剩余消息为止。请注意,多个订阅可以接收相同的消息,并且过滤的订阅不会接收所有消息。

public class GetSbMessSub 
{
public static void main(String[] args) throws ServiceException {        
   Service creds = new Service();
   ServiceBusContract service = creds.getservice();     
   String subscriptionName = "LowMessages";
   ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
   opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
   ReceiveSubscriptionMessageResult resultQM = service.receiveSubscriptionMessage("TestTopic", subscriptionName, opts);
   BrokeredMessage message = resultQM.getValue();       
   if (message != null && message.getMessageId() != null) 
   {
   try {
   System.out.println("Subscription: " + subscriptionName);
   System.out.println("MessageNumber: " + message.getProperty("MessageNumber"));
   service.deleteMessage(message);}
   catch (Exception ex){
   System.out.println("Inner exception encountered!");
   service.unlockMessage(message);}
   }
   else {System.out.println("There are no more messages.");}                
   }    
}
于 2013-07-02T22:22:55.630 回答