0

我试图让 SDK 工作以从 SPARK 项目向 Azure 服务总线发送一条简单的消息。当我在本地 spark 上下文中本地运行代码时,代码运行良好。

但是当我将它提交给 YARN 时,起初我很难让集群找到 JAR 文件,现在它执行时出现以下错误:

java.lang.RuntimeException: Service or property not registered:  com.microsoft.windowsazure.services.servicebus.ServiceBusContract interface com.microsoft.windowsazure.services.servicebus.ServiceBusContract

我调用服务总线的代码如下,我已经尝试过使用和不使用类加载器部分,仍然是完全相同的问题:

  @throws[Exception]
  def SendMessageToSB(message: String) {


    try {

      // Get current context class loader
      val contextLoader = Thread.currentThread().getContextClassLoader();
      // Change context classloader to class context loader
      contextLoader.loadClass("com.microsoft.windowsazure.services.servicebus.ServiceBusContract")

      // Call Azure API and reset back the context loader

      val config: com.microsoft.windowsazure.Configuration = ServiceBusConfiguration.configureWithSASAuthentication("CTRATEST-NS$")
      val service: ServiceBusContract = ServiceBusService.create(config)
      val queueInfo: QueueInfo = new QueueInfo("inputdataqueue")
      try {
        val bm: BrokeredMessage = new BrokeredMessage(message)
        service.sendQueueMessage("inputdataqueue", bm)
      }
      catch {
        case e: Exception => {
          System.out.print("ServiceException encountered: ")
          System.out.println(e.getMessage)
          System.exit(-1)
        }
      }


      Thread.currentThread().setContextClassLoader(contextLoader);
    }
  }

我见过一个与storm类似的线程,但它指向了类加载器问题,因为我使用的是scala,所以这里没有解决。

Azure SDK 版本是通过 maven 构建的 0.9.0。

帮助!

4

1 回答 1

0

根据我的理解,根据我的经验,我认为您可以尝试下面的代码。

import com.microsoft.windowsazure.services.servicebus.ServiceBusContract


try {
    // Get current context class loader
    val contextLoader = Thread.currentThread().getContextClassLoader();
    // Change context classloader to class context loader
    Thread.currentThread().setContextClassLoader(ServiceBusContract.class.getClassLoader))
    // Call Azure API and reset back the context loader

    val config: com.microsoft.windowsazure.Configuration = ServiceBusConfiguration.configureWithSASAuthentication("CTRATEST-NS$")
    val service: ServiceBusContract = ServiceBusService.create(config)
    val queueInfo: QueueInfo = new QueueInfo("inputdataqueue")
    try {
        val bm: BrokeredMessage = new BrokeredMessage(message)
        service.sendQueueMessage("inputdataqueue", bm)
    } catch {
        case e: Exception => {
            System.out.print("ServiceException encountered: ")
            System.out.println(e.getMessage)
            System.exit(-1)
        }
    }
} catch {
    // handle exceptions  
} finally {
    // Reset back class loader
    Thread.currentThread().setContextClassLoader(contextLoader)
}

如果您想按名称将类加载到 JVM,您可以尝试使用下面的代码。

Class.forName("com.microsoft.windowsazure.services.servicebus.ServiceBusContract")
于 2016-05-04T07:18:06.123 回答