1

我需要一种在内存和时间方面有效处理大型二进制数据文件的方法。

我正在开发一个多线程 Java 应用程序,该应用程序处理由许多多维数据点组成的大型二进制数据文件。数据点具有相同的维度,每个点约为 100 kb。数据点的数量大约为 10,000 到 100,000。数据文件在测试阶段是几千兆字节,但将来会是几千兆字节。

客户在运行应用程序时遇到内存问题,因此我正在处理数据点列表,以减少处理所需的内存,同时仍提供良好的性能。

Java 是项目的要求,我们受限于客户端系统上的当前内存。客户端系统有很多核心,但它是一个共享系统,内存是目前的限制因素。

这组数据点在我们的应用程序中重复使用。有时这些点是按顺序处理的。在其他时候,处理点的子集,包括两个不同点的所有组合。在一个子集中,点可以按任何顺序处理,但子集中的点可以任意相距很远。数据文件是我通过解析文本数据文件并将值写入二进制文件而生成的简单二进制文件。目前,数据是双精度的,所以我将数据点作为一系列双精度连续写入二进制数据文件。(我解析每个文本文件数据点并立即将其写入二进制文件,而不是将它们全部保存在内存中。)将来我们可能会处理浮点、整数等数据。

我搜索了 SO 和其他 Internet 站点。到目前为止,我已经尝试了几种方法,包括根据需要从二进制文件中读取点,但与同时将所有数据点的列表保存在内存中相比,性能很差。这适用于具有较小尺寸点的较少数量的测试,但这些测试的数据集比实际数据集小几个数量级。到目前为止,我尝试过的方法比将所有点都保存在内存中要慢数百或数千倍。

我已经尝试过直接 ByteBuffers 和 MappedByteBuffers。最好的方法是我提取了下面相关部分的类。它将二进制数据读入 MappedByteBuffers 数组。然后当通过下面的get(int index)方法请求一个数据点时,该方法加载相关缓冲区,将相关字节读入字节数组,将字节转换为双精度数组,并创建一个DataPoint对象。我使用了一个 MappedByteBuffers 数组,因为整个数据文件无法放入物理内存中。我使用了一个字节数组的数组,以便线程将有单独的字节数组来读取数据。然后我只对 MappedByteBuffer 的实际访问进行同步,以尽量减少阻塞。据我了解 Java 类库,缓冲区不是线程安全的,

欢迎任何反馈。特别是,我对 MappedByteBuffers 的同步感到好奇。

final static private int DOUBLE_BYTE_SIZE = Double.SIZE / Byte.SIZE;

public enum DataType {
  CHAR,
  DOUBLE,
  FLOAT,
  INT,
  LONG,
  SHORT;
}

final static private int numberOfBuffers = 8;
private MappedByteBuffer[] buffers = null;
private int bufferSize = -1;
private byte[][] readArray = null;
private DataType dataType;
private int sizeOfVector;
private int byteSizeOfVector;
private File binFile;
private int size = -1;

private int makeList(File binaryFile, DataType argDataType, int numberOfComponents) {
  FileInputStream fis = null;
  FileChannel fc = null;
  try {
    dataType = argDataType;
    sizeOfVector = numberOfComponents;
    fis = new FileInputStream(binaryFile);
    fc = fis.getChannel();
    long fileSize = fc.size();

    switch (dataType) {
    case DOUBLE:
      byteSizeOfVector = DOUBLE_BYTE_SIZE * sizeOfVector;
      break;
    default:
      break;
    }

    size = (int) fileSize / byteSizeOfVector;
    bufferSize = size / numberOfBuffers;
    buffers = new MappedByteBuffer[numberOfBuffers];
    long remaining = fileSize;
    long position = 0;
    int bufferNumber = 0;
    while(remaining > 0) {
      long length = Math.min(remaining, bufferSize * byteSizeOfVector);
      buffers[bufferNumber] = fc.map(MapMode.READ_ONLY, position, length);
      position += length;   
      remaining -= length;
      bufferNumber++;
    }
    readArray = new byte[numberOfBuffers][byteSizeOfVector];

  } catch (IOException ex) {
    return -1;
  } finally {
    try {
      if(fis != null) {
        fis.close();
      }

      if(fc != null) {
        fc.close();
      }
    } catch (IOException exClose) {
      return -1;
    }
  }

  return 0;
}

private static long makeLong(byte[] data) {
  if (data == null || data.length != 8) return 0x0;

  return (long)(
      (long) (0xFF & data[0]) << 56  |
      (long) (0xFF & data[1]) << 48  |
      (long) (0xFF & data[2]) << 40  |
      (long) (0xFF & data[3]) << 32  |
      (long) (0xFF & data[4]) << 24  |
      (long) (0xFF & data[5]) << 16  |
      (long) (0xFF & data[6]) << 8   |
      (long) (0xFF & data[7]) << 0
      );
}  

private static double makeDouble(byte[] data) {
  if (data == null || data.length != 8) return 0x0;
  return Double.longBitsToDouble(makeLong(data));
}

private static double[] makeDoubleArray(byte[] data) {
  if (data == null) return null;
  if (data.length % 8 != 0) return null;

  double[] doubleArray = new double[data.length / 8];

  for (int index = 0; index < dbls.length; index++) {
    doubleArray[index] = makeDouble(new byte[] {
        data[(index*8)],
        data[(index*8)+1],
        data[(index*8)+2],
        data[(index*8)+3],
        data[(index*8)+4],
        data[(index*8)+5],
        data[(index*8)+6],
        data[(index*8)+7],
    }
        );
  }
  return doubleArray;
}

@Override
public DataPoint get(int index) {
  if(index > size() - 1) {
    throw new IndexOutOfBoundsException("Index exceeds length of list.");
  } else if(index < 0) {
    throw new IndexOutOfBoundsException("Index is less than zero.");
  }

  int bufferNumber = index / bufferSize;
  int bufferPosition = index % bufferSize;
  MappedByteBuffer buffer = buffers[bufferNumber];
  synchronized (buffer) {
    buffer.load();
    buffer.position(bufferPosition * sizeOfVector);
    buffer.get(readArray[bufferNumber]);
  }

  switch(dataType) {
  case DOUBLE:
    return new DoublePoint(makeDoubleArray(readArray[bufferNumber]));
  default:
    return null;
  }
}
4

0 回答 0