我尝试通过智能连接器样式连接到 SnappyData 存储,如http://snappydatainc.github.io/snappydata/howto/#how-to-access-snappydata-store-from-an-existing-spark-installation-using中的描述-smart-connector,但得到了由 java.nio.BufferUnderflowException 引起的 com.gemstone.gemfire.SerializationException 根。
这是堆栈跟踪:
com.gemstone.gemfire.SerializationException: Could not create an instance of com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor$UpdateAttributesMessage .
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.invokeFromData(InternalDataSerializer.java:2900)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.create(DSFIDFactory.java:819)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.readDSFID(InternalDataSerializer.java:3311)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.processNIOBuffer(Connection.java:3573)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.runNioReader(Connection.java:1840)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.run(Connection.java:1715)
at Remote Member 'host-07(41099)<v7>:32671' in java.lang.Thread.run(Thread.java:745)
at com.gemstone.gemfire.distributed.internal.ReplyException.fixUpRemoteEx(ReplyException.java:109)
at com.gemstone.gemfire.distributed.internal.ReplyException.handleAsUnexpected(ReplyException.java:87)
at com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor.waitForProfileResponse(UpdateAttributesProcessor.java:94)
at com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor.distribute(UpdateAttributesProcessor.java:79)
at com.gemstone.gemfire.distributed.internal.DistributionAdvisor.exchangeProfiles(DistributionAdvisor.java:1231)
at com.gemstone.gemfire.distributed.internal.DistributionAdvisor.initializationGate(DistributionAdvisor.java:485)
at com.pivotal.gemfirexd.internal.engine.distributed.GfxdDistributionAdvisor.handshake(GfxdDistributionAdvisor.java:235)
at com.pivotal.gemfirexd.internal.engine.store.GemFireStore$StoreAdvisee.start(GemFireStore.java:2565)
at com.pivotal.gemfirexd.internal.engine.store.GemFireStore$StoreAdvisee.access$000(GemFireStore.java:2501)
at com.pivotal.gemfirexd.internal.engine.store.GemFireStore.boot(GemFireStore.java:1208)
at com.pivotal.gemfirexd.internal.impl.services.monitor.BaseMonitor.boot(BaseMonitor.java:2245)
at com.pivotal.gemfirexd.internal.impl.services.monitor.TopService.bootModule(TopService.java:314)
at com.pivotal.gemfirexd.internal.impl.services.monitor.BaseMonitor.startModule(BaseMonitor.java:720)
at com.pivotal.gemfirexd.internal.impl.services.monitor.FileMonitor.startModule(FileMonitor.java:65)
at com.pivotal.gemfirexd.internal.iapi.services.monitor.Monitor.bootServiceModule(Monitor.java:497)
at com.pivotal.gemfirexd.internal.engine.db.FabricDatabase.bootStore(FabricDatabase.java:2358)
at com.pivotal.gemfirexd.internal.engine.db.FabricDatabase.boot(FabricDatabase.java:354)
at com.pivotal.gemfirexd.internal.impl.services.monitor.BaseMonitor.boot(BaseMonitor.java:2245)
at com.pivotal.gemfirexd.internal.impl.services.monitor.TopService.bootModule(TopService.java:314)
at com.pivotal.gemfirexd.internal.impl.services.monitor.BaseMonitor.bootService(BaseMonitor.java:2043)
at com.pivotal.gemfirexd.internal.impl.services.monitor.BaseMonitor.startProviderService(BaseMonitor.java:1906)
at com.pivotal.gemfirexd.internal.impl.services.monitor.BaseMonitor.findProviderAndStartService(BaseMonitor.java:1786)
at com.pivotal.gemfirexd.internal.impl.services.monitor.BaseMonitor.startPersistentService(BaseMonitor.java:1196)
at com.pivotal.gemfirexd.internal.iapi.services.monitor.Monitor.startPersistentService(Monitor.java:620)
at com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection.bootDatabase(EmbedConnection.java:3379)
at com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection.<init>(EmbedConnection.java:452)
at com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection30.<init>(EmbedConnection30.java:94)
at com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection40.<init>(EmbedConnection40.java:75)
at com.pivotal.gemfirexd.internal.jdbc.Driver40.getNewEmbedConnection(Driver40.java:95)
at com.pivotal.gemfirexd.internal.jdbc.InternalDriver.connect(InternalDriver.java:351)
at com.pivotal.gemfirexd.internal.jdbc.InternalDriver.connect(InternalDriver.java:219)
at com.pivotal.gemfirexd.internal.jdbc.InternalDriver.connect(InternalDriver.java:195)
at io.snappydata.jdbc.AutoloadedDriver.connect(AutoloadedDriver.java:153)
at com.pivotal.gemfirexd.internal.engine.fabricservice.FabricServiceImpl.startImpl(FabricServiceImpl.java:279)
at com.pivotal.gemfirexd.internal.engine.fabricservice.FabricServerImpl.start(FabricServerImpl.java:60)
at io.snappydata.impl.LeadImpl.internalStart(LeadImpl.scala:188)
at io.snappydata.impl.LeadImpl$.invokeLeadStart(LeadImpl.scala:489)
at org.apache.spark.scheduler.cluster.SnappyEmbeddedModeClusterManager.initialize(SnappyEmbeddedModeClusterManager.scala:88)
at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2506)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:489)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:831)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:823)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)
at com.meituan.hotel.pegasuscollect.spark.SmartConnectorMain.main(SmartConnectorMain.java:11)
at com.meituan.hotel.pegasuscollect.controller.SparkController.getCompActualPriceModelFromCache(SparkController.java:17)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.web.method.support.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:215)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:132)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandleMethod(RequestMappingHandlerAdapter.java:781)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:721)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:83)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:943)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:877)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:966)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:857)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:842)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:669)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1448)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:88)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1419)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:455)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:137)
at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:557)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:231)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1075)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:384)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:193)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1009)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:154)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
at org.eclipse.jetty.server.Server.handle(Server.java:368)
at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:489)
at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:942)
at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1004)
at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:640)
at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:628)
at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.gemstone.gemfire.SerializationException: Could not create an instance of com.pivotal.gemfirexd.internal.engine.distributed.GfxdDistributionAdvisor$GfxdProfile .
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.invokeFromData(InternalDataSerializer.java:2900)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.readGfxdMessage(DSFIDFactory.java:986)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.create(DSFIDFactory.java:775)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.readObject(InternalDataSerializer.java:3424)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.basicReadObject(InternalDataSerializer.java:3417)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.DataSerializer.readObject(DataSerializer.java:3342)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor$UpdateAttributesMessage.fromData(UpdateAttributesProcessor.java:356)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.invokeFromData(InternalDataSerializer.java:2887)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.create(DSFIDFactory.java:819)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.readDSFID(InternalDataSerializer.java:3311)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.processNIOBuffer(Connection.java:3573)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.runNioReader(Connection.java:1840)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.run(Connection.java:1715)
at Remote Member 'host-07(41099)<v7>:32671' in java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.BufferUnderflowException
at Remote Member 'host-07(41099)<v7>:32671' in java.nio.Buffer.nextGetIndex(Buffer.java:506)
at Remote Member 'host-07(41099)<v7>:32671' in java.nio.DirectByteBuffer.getInt(DirectByteBuffer.java:681)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.ByteBufferInputStream.readInt(ByteBufferInputStream.java:190)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.DataSerializer.readInteger(DataSerializer.java:881)
at Remote Member 'host-07(41099)<v7>:32671' in com.pivotal.gemfirexd.internal.engine.distributed.GfxdDistributionAdvisor$GfxdProfile.fromData(GfxdDistributionAdvisor.java:1574)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.invokeFromData(InternalDataSerializer.java:2887)
... 13 more
Caused by:
com.gemstone.gemfire.SerializationException: Could not create an instance of com.pivotal.gemfirexd.internal.engine.distributed.GfxdDistributionAdvisor$GfxdProfile .
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.invokeFromData(InternalDataSerializer.java:2900)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.readGfxdMessage(DSFIDFactory.java:986)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.create(DSFIDFactory.java:775)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.readObject(InternalDataSerializer.java:3424)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.basicReadObject(InternalDataSerializer.java:3417)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.DataSerializer.readObject(DataSerializer.java:3342)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor$UpdateAttributesMessage.fromData(UpdateAttributesProcessor.java:356)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.invokeFromData(InternalDataSerializer.java:2887)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.create(DSFIDFactory.java:819)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.readDSFID(InternalDataSerializer.java:3311)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.processNIOBuffer(Connection.java:3573)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.runNioReader(Connection.java:1840)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.run(Connection.java:1715)
at Remote Member 'host-07(41099)<v7>:32671' in java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.BufferUnderflowException
at Remote Member 'host-07(41099)<v7>:32671' in java.nio.Buffer.nextGetIndex(Buffer.java:506)
at Remote Member 'host-07(41099)<v7>:32671' in java.nio.DirectByteBuffer.getInt(DirectByteBuffer.java:681)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.ByteBufferInputStream.readInt(ByteBufferInputStream.java:190)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.DataSerializer.readInteger(DataSerializer.java:881)
at Remote Member 'host-07(41099)<v7>:32671' in com.pivotal.gemfirexd.internal.engine.distributed.GfxdDistributionAdvisor$GfxdProfile.fromData(GfxdDistributionAdvisor.java:1574)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.invokeFromData(InternalDataSerializer.java:2887)
... 13 more
Caused by:
java.nio.BufferUnderflowException
at Remote Member 'host-07(41099)<v7>:32671' in java.nio.Buffer.nextGetIndex(Buffer.java:506)
at Remote Member 'host-07(41099)<v7>:32671' in java.nio.DirectByteBuffer.getInt(DirectByteBuffer.java:681)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.ByteBufferInputStream.readInt(ByteBufferInputStream.java:190)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.DataSerializer.readInteger(DataSerializer.java:881)
at Remote Member 'host-07(41099)<v7>:32671' in com.pivotal.gemfirexd.internal.engine.distributed.GfxdDistributionAdvisor$GfxdProfile.fromData(GfxdDistributionAdvisor.java:1574)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.invokeFromData(InternalDataSerializer.java:2887)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.readGfxdMessage(DSFIDFactory.java:986)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.create(DSFIDFactory.java:775)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.readObject(InternalDataSerializer.java:3424)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.basicReadObject(InternalDataSerializer.java:3417)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.DataSerializer.readObject(DataSerializer.java:3342)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor$UpdateAttributesMessage.fromData(UpdateAttributesProcessor.java:356)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.invokeFromData(InternalDataSerializer.java:2887)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.DSFIDFactory.create(DSFIDFactory.java:819)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.InternalDataSerializer.readDSFID(InternalDataSerializer.java:3311)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.processNIOBuffer(Connection.java:3573)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.runNioReader(Connection.java:1840)
at Remote Member 'host-07(41099)<v7>:32671' in com.gemstone.gemfire.internal.tcp.Connection.run(Connection.java:1715)
at Remote Member 'host-07(41099)<v7>:32671' in java.lang.Thread.run(Thread.java:745)
这是代码:
public class SmartConnectorMain{
public static void main(String [] args){
SparkSession spark = SparkSession
.builder()
.appName("SmartConnectorMainJava")
.master("snappydata://host-01:9999,host05-:9999")
.config("snappydata.store.locators", "host-01:9999,host-05:9999")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
SnappySession snappy = new SnappySession(spark.sparkContext());
println(" #### Creating a table TestColumnTable #### \n");
snappy.dropTable("TestColumnTable", true);
DataSet<Row> dataFrame = snappy.range(1000).selectExpr("id", "floor(rand() * 10000) as k");
snappy.sql("create table TestColumnTable (id bigint not null, k bigint not null) using column");
println(" #### Write to table completed. ### \n\n" +
"Now you can query table TestColumnTable using $SNAPPY_HOME/bin/snappy-shell");
}
private static void println(String s){
System.out.println(s);
}
这是集群中的配置:
Name Value
jobserver.enabled true
snappydata.embedded true
snappydata.store.critical-heap-percentage 90.0
snappydata.store.eviction-heap-percentage 81.0
snappydata.store.host-data true
snappydata.store.locators host-01:9999,host-05:9999
snappydata.store.log-file snappyleader.log
snappydata.store.mcast-port 0
snappydata.store.statistic-archive-file snappyleader.gfs
spark.app.id snappy-app-1494920649221
spark.app.name leaderLauncher
spark.closure.serializer org.apache.spark.serializer.PooledKryoSerializer
spark.driver.host 10.16.118.36
spark.driver.port 25490
spark.executor.cores 10
spark.executor.id driver
spark.master snappydata://host-01:9999,host-05:9999
spark.scheduler.mode FAIR
spark.serializer org.apache.spark.serializer.PooledKryoSerializer
spark.ui.port 9411