0

我正在尝试使用 JMX BulkLoader 将数据从远程节点 ETL 到 Cassandra 到集群上

https://github.com/PatrickCallaghan/datastax-analytics-example/blob/master/src/main/java/com/datastax/jmxloader/JmxBulkLoader.java

但是,在成功建立 JMX 连接后,似乎无法批量加载。

请注意,批量加载是从远程节点发送到 cassandra 集群的。

似乎它似乎希望在 cassandra 集群的本地运行(即 localhost 到 cassandra 集群)

我在这里有什么遗漏吗。任何人都可以建议

下面的异常

java.lang.IllegalArgumentException:在 org.apache.cassandra.service.StorageService.bulkLoadAsync(StorageService.java:3962) 的 org.apache.cassandra.service.StorageService.bulkLoadInternal(StorageService.java:3970) 的无效目录 /XXXXXXXXX .reflect.GeneratedMethodAccessor21.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at sun.reflect.misc.Trampoline .invoke(MethodUtil.java:75) 在 sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java :606) 在 sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:279) 在 com.sun.jmx.mbeanserver。StandardMBeanIntrospector.invokeM2(StandardMBeanIntrospector.java:112) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBeanIntrospector.java:46) at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237) at com .sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138) 在 com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252) 在 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor .java:819) 在 com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.PerInterface.invoke(PerInterface.java:138) 在 com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252) 在 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) 在 com .sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.PerInterface.invoke(PerInterface.java:138) 在 com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252) 在 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) 在 com .sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.

class JmxBulkLoader(host: String, port: Int) {

  private var connector: JMXConnector = _

  private var storageBean: StorageServiceMBean = _

  private var timer: Timer = new Timer()

  connect("http://hostip , 7199)

 private def connect(host: String, port: Int) {

    val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,

      port))

    Logger.info(" Connected to JMX Entity " + jmxUrl)

    val env = new HashMap[String, Any]()

    connector = JMXConnectorFactory.connect(jmxUrl, env)

    val mbeanServerConn = connector.getMBeanServerConnection

    val name = new ObjectName("org.apache.cassandra.db:type=StorageService")

    storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])

  }

  def close() {

    connector.close()

  }

  def bulkLoad(path: String): Boolean = {

    try {

      val timer = new Stopwatch().start

      val result = storageBean.bulkLoadAsync(path)

      timer.stop

      Logger.info("Async Result of Bulk Load " + result)

      Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")

      true

    } catch {

      case e: Exception =>

        Logger.error("Error in Bulk Loading " + e.printStackTrace())

        false

    }

  }

}
4

2 回答 2

2

It almost as if it seems that it expects to be running in locality of the cassandra cluster (i.e. localhost to cassandra cluster)

Not quite. But think of it: You're calling a Cassandra node's mbean function with a string parameter. This call gets executed by the Cassandra process you're calling (i.e., connecting to). The parameter specifies a path on the side of the node you're connecting to.

You have to make sure the path exists on the target and holds the data you expect (e.g., via shared storage or copying the files beforehand).

于 2014-11-29T18:29:14.010 回答
1
  1. 该表应该存在于 Cassandra
  2. cassandra 节点应该可以访问该目录(本地)。
  3. 该目录应以键空间和目标表名称结尾:
    /some_path/$KeySpaceName/$TableName
于 2015-04-10T11:40:58.073 回答