我正在尝试使用 JMX BulkLoader 将数据从远程节点 ETL 到 Cassandra 到集群上
但是,在成功建立 JMX 连接后,似乎无法批量加载。
请注意,批量加载是从远程节点发送到 cassandra 集群的。
似乎它似乎希望在 cassandra 集群的本地运行(即 localhost 到 cassandra 集群)
我在这里有什么遗漏吗。任何人都可以建议
下面的异常
java.lang.IllegalArgumentException:在 org.apache.cassandra.service.StorageService.bulkLoadAsync(StorageService.java:3962) 的 org.apache.cassandra.service.StorageService.bulkLoadInternal(StorageService.java:3970) 的无效目录 /XXXXXXXXX .reflect.GeneratedMethodAccessor21.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at sun.reflect.misc.Trampoline .invoke(MethodUtil.java:75) 在 sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java :606) 在 sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:279) 在 com.sun.jmx.mbeanserver。StandardMBeanIntrospector.invokeM2(StandardMBeanIntrospector.java:112) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBeanIntrospector.java:46) at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237) at com .sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138) 在 com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252) 在 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor .java:819) 在 com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.PerInterface.invoke(PerInterface.java:138) 在 com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252) 在 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) 在 com .sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.PerInterface.invoke(PerInterface.java:138) 在 com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252) 在 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) 在 com .sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.
class JmxBulkLoader(host: String, port: Int) {
private var connector: JMXConnector = _
private var storageBean: StorageServiceMBean = _
private var timer: Timer = new Timer()
connect("http://hostip , 7199)
private def connect(host: String, port: Int) {
val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,
port))
Logger.info(" Connected to JMX Entity " + jmxUrl)
val env = new HashMap[String, Any]()
connector = JMXConnectorFactory.connect(jmxUrl, env)
val mbeanServerConn = connector.getMBeanServerConnection
val name = new ObjectName("org.apache.cassandra.db:type=StorageService")
storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])
}
def close() {
connector.close()
}
def bulkLoad(path: String): Boolean = {
try {
val timer = new Stopwatch().start
val result = storageBean.bulkLoadAsync(path)
timer.stop
Logger.info("Async Result of Bulk Load " + result)
Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")
true
} catch {
case e: Exception =>
Logger.error("Error in Bulk Loading " + e.printStackTrace())
false
}
}
}