0

我正在尝试使用 map reduce 将数据写入alluxio。我在 hdfs 上有大约 11 gig 的数据正在写入 alluxio。它在 MUST_CACHE 写入类型(alluxio.user.file.writetype.default 的默认值)下工作正常。

但是当我尝试使用 CACHE_THROUGH 编写它时,它失败并出现以下异常:

   Error: alluxio.exception.status.UnavailableException: Channel to <hostname of one of the  worker>:29999: <underfs path to file> (No such file or directory)
            at alluxio.client.block.stream.NettyPacketWriter.close(NettyPacketWriter.java:263)
            at com.google.common.io.Closer.close(Closer.java:206)
            at alluxio.client.block.stream.BlockOutStream.close(BlockOutStream.java:166)
            at alluxio.client.file.FileOutStream.close(FileOutStream.java:137)
            at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
            at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
            at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:111)
            at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.close(MapTask.java:679)
            at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:802)
            at org.apache.hadoop.mapred.MapTask.run(MapTask.java:346)
            at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:422)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1595)
            at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
    Caused by: alluxio.exception.status.NotFoundException: Channel to <hostname of one of the  worker>29999: <underfs path to file> (No such file or directory)
            at alluxio.exception.status.AlluxioStatusException.from(AlluxioStatusException.java:153)
            at alluxio.util.CommonUtils.unwrapResponseFrom(CommonUtils.java:548)
            at alluxio.client.block.stream.NettyPacketWriter$PacketWriteHandler.channelRead(NettyPacketWriter.java:367)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
            at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
            at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
            at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
            at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
            at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
            at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
            at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
            at java.lang.Thread.run(Thread.java:748)

我也尝试使用以下命令,得到相同的错误:

./alluxio fs -Dalluxio.user.file.writetype.default=CACHE_THROUGH copyFromLocal <hdfs_input_path> <alluxio_output_path>

任何帮助/指针将不胜感激。谢谢

4

1 回答 1

1

copyFromLocalshell 命令只能复制本地文件系统上可用的文件。要将文件从 HDFS 复制到 Alluxio,您可以先将文件复制到本地计算机,然后将文件写入 Alluxio。

hdfs dfs -get <hdfs_input_path> /tmp/tmp_file
alluxio fs copyFromLocal /tmp/tmp_file <alluxio_output_path>

要直接从 mapreduce 写入 Alluxio,请将您的更新core-site.xml为包含

<property>
  <name>fs.alluxio.impl</name>
  <value>alluxio.hadoop.FileSystem</value>
  <description>The Alluxio FileSystem (Hadoop 1.x and 2.x)</description>
</property>
<property>
  <name>fs.AbstractFileSystem.alluxio.impl</name>
  <value>alluxio.hadoop.AlluxioFileSystem</value>
  <description>The Alluxio AbstractFileSystem (Hadoop 2.x)</description>
</property>

,将Alluxio客户端jar添加到你的应用程序类路径中-libjars /path/to/client,并写入一个alluxio://master_hostname:19998/alluxio_output_pathURI。有关更多详细信息,请参阅文档

于 2018-01-25T22:11:41.227 回答