我们有一个运行在 JBoss 上的服务,它从远程检索文件。由于过去的情况,我们无法使用 Hadoop FS API,因此我们使用系统调用和实际命令(我们使用:hadoop fs -cat)。
现在,我们将这些系统调用转移到其等效的 API Hadoop FS API。
我需要从 HDFS 获取文件作为 InputStream(或 FSDataInputStream)并将其传递给 http 响应。我们希望避免将文件保存在本地,然后将其写入流中。
基本上这可行,但我面临两个问题。
我不知道何时可以调用 FileSystem.close() 因为我不知道流式传输是否完成。
流结束后,我会收到 EOFException:
19:09:41,446 INFO [STDOUT] [DEBUG] [http-127.0.0.1-8888-1] Error making BlockReader. Closing stale NioInetPeer(Socket[addr=/192.168.20.198,port=50010,localport=56503]) [org.apache.hadoop.hdfs.DFSClient] java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed(HdfsProtoUtil.java:171) at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:392) at org.apache.hadoop.hdfs.BlockReaderFactory.newBlockReader(BlockReaderFactory.java:137) at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:1084) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:538) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:750) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:794) at java.io.DataInputStream.read(DataInputStream.java:83) at com.sun.jersey.core.util.ReaderWriter.writeTo(ReaderWriter.java:114) at com.sun.jersey.core.provider.AbstractMessageReaderWriterProvider.writeTo(AbstractMessageReaderWriterProvider.java:76) at com.sun.jersey.core.impl.provider.entity.InputStreamProvider.writeTo(InputStreamProvider.java:98) at com.sun.jersey.core.impl.provider.entity.InputStreamProvider.writeTo(InputStreamProvider.java:59) at com.sun.jersey.spi.container.ContainerResponse.write(ContainerResponse.java:306) at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1437) at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1349) at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1339) at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:416) at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:537) at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:708) at javax.servlet.http.HttpServlet.service(HttpServlet.java:717) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at com.liveperson.dataaccess.management.filters.DevFilter.doFilter(DevFilter.java:60) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at com.liveperson.dataaccess.management.filters.ValidationFilter.doFilter(ValidationFilter.java:74) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at com.liveperson.gatekeeper.oauth.filter.GateKeeperFilter.doFilter(GateKeeperFilter.java:117) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at com.liveperson.dataaccess.management.filters.CountersFilter.doFilter(CountersFilter.java:32) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.jboss.web.tomcat.filters.ReplyHeaderFilter.doFilter(ReplyHeaderFilter.java:96) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:235) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191) at org.jboss.web.tomcat.security.SecurityAssociationValve.invoke(SecurityAssociationValve.java:190) at org.jboss.web.tomcat.security.JaccContextValve.invoke(JaccContextValve.java:92) at org.jboss.web.tomcat.security.SecurityContextEstablishmentValve.process(SecurityContextEstablishmentValve.java:126) at org.jboss.web.tomcat.security.SecurityContextEstablishmentValve.invoke(SecurityContextEstablishmentValve.java:70) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102) at org.jboss.web.tomcat.service.jca.CachedConnectionValve.invoke(CachedConnectionValve.java:158) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:330) at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:829) at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:601) at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447) at java.lang.Thread.run(Thread.java:662)
提前感谢您的帮助!