4

如果我接近这个错误,请纠正我,但我有一个队列服务器和一堆我在集群中运行的 java 工作者。我的队列中的工作单元非常小,但数量很多。到目前为止,我的基准测试和对工人的审查表明,我的速度约为 200mb/秒。

所以我试图弄清楚如何通过我的带宽获得更多的工作单位。目前我的 CPU 使用率不是很高(40-50%),因为它处理数据的速度比网络发送它的速度要快。我想通过队列获得更多的工作,并愿意通过昂贵的压缩/解压缩来支付它(因为每个核心的一半现在是空闲的)。

我已经尝试过 java LZO 和 gzip,但想知道是否有更好的(即使 cpu 更贵)?

更新:数据是一个字节[]。基本上队列只接受这种格式,所以我ByteArrayOutputStream用来将两个 int 和一个 int[] 写入 byte[] 格式。int[] 中的值都是 0 到 100(或 1000 但绝大多数数字为零)之间的整数。列表非常大,从 1000 到 10,000 个项目(同样,多数为零.. int[] 中的非零数字不要超过 100 个)

4

5 回答 5

6

听起来使用利用数据结构的自定义压缩机制可能非常有效。

首先,使用short[](16 位数据类型)而不是int[]将发送的数据量减半(!),您可以这样做,因为数字很容易介于-2^15(-32768) 和2^15-1(32767) 之间。这非常容易实现。

其次,您可以使用类似于游程编码的方案:正数从字面上表示该数字,而负数表示那么多零(在取绝对值之后)。例如

[10, 40, 0, 0, 0, 30, 0, 100, 0, 0, 0, 0] <=> [10, 40, -3, 30, -1, 100, -4]

这更难实现,只是替换shortint但在最坏的情况下会提供约 80% 的压缩(1000 个数字,100 个非零,没有一个是连续的)。

我只是做了一些模拟来计算压缩比。我测试了上面描述的方法,以及 Louis Wasserman 和 sbridges 建议的方法。两人的表现都非常好。

假设数组的长度和非零数字的数量都在它们的界限之间,这两种方法平均节省了大约 5400int秒(或short秒),压缩大小约为原始的 2.5%!游程编码方法似乎额外节省了大约 1 个int(或平均压缩大小小 0.03%),即基本上没有区别,因此您应该使用最容易实现的一种。以下是 50000 个随机样本的压缩率直方图(它们非常相似!)。

直方图

摘要:使用shorts 代替ints 和其中一种压缩方法,您将能够将数据压缩到其原始大小的 1% 左右!

对于模拟,我使用了以下 R 脚本:

SIZE <- 50000

lengths <- sample(1000:10000, SIZE, replace=T)
nonzeros <- sample(1:100, SIZE, replace=T)

f.rle <- function(len, nonzero) {
  indexes <- sort(c(0,sample(1:len, nonzero, F)))
  steps <- diff(indexes)
  sum(steps > 1) + nonzero # one short per run of zeros, and one per zero
}

f.index <- function(len, nonzero) {
  nonzero * 2
}

# using the [value, -1 * number of zeros,...] method
rle.comprs <- mapply(f.rle, lengths, nonzeros)
print(mean(lengths - rle.comprs)) # average number of shorts saved

rle.ratios <- rle.comprs / lengths * 100
print(mean(rle.ratios)) # average compression ratio

# using the [(index, value),...] method
index.comprs <- mapply(f.index, lengths, nonzeros)
print(mean(lengths - index.comprs)) # average number of shorts saved

index.ratios <- index.comprs / lengths * 100
print(mean(index.ratios)) # average compression ratio


par(mfrow=c(2,1))
hist(rle.ratios, breaks=100, freq=F, xlab="Compression ratio (%)", main="Run length encoding")
hist(index.ratios, breaks=100, freq=F, xlab="Compression ratio (%)", main="Store indices")
于 2012-04-29T04:47:35.470 回答
2

尝试将您的数据编码为两个varint,第一个 varint 是序列中数字的索引,第二个是数字本身。对于为 0 的条目,什么也不写。

于 2012-04-29T04:56:17.190 回答
2

我写了一个 RLE 算法的实现。这对字节数组进行操作,因此可以用作现有代码的内联过滤器。如果您的数据将来发生变化,它应该可以安全地处理大值或负值。

它将零序列编码为 {0}{qty},其中 {qty} 在 1..255 范围内。所有其他字节都存储为字节本身。您在发送之前压缩字节数组,并在接收时将其膨胀回完整大小。

public static byte[] squish(byte[] bloated) {
    int size = bloated.length;
    ByteBuffer bb = ByteBuffer.allocate(2 * size);
    bb.putInt(size);
    int zeros = 0;
    for (int i = 0; i < size; i++) {
        if (bloated[i] == 0) {
            if (++zeros == 255) {
                bb.putShort((short) zeros);
                zeros = 0;
            }
        } else {
            if (zeros > 0) {
                bb.putShort((short) zeros);
                zeros = 0;
            }
            bb.put(bloated[i]);
        }
    }
    if (zeros > 0) {
        bb.putShort((short) zeros);
        zeros = 0;
    }
    size = bb.position();
    byte[] buf = new byte[size];
    bb.rewind();
    bb.get(buf, 0, size).array();
    return buf;
}

public static byte[] bloat(byte[] squished) {
    ByteBuffer bb = ByteBuffer.wrap(squished);
    byte[] bloated = new byte[bb.getInt()];
    int pos = 0;
    while (bb.position() < bb.capacity()) {
        byte value = bb.get();
        if (value == 0) {
            bb.position(bb.position() - 1);
            pos += bb.getShort();
        } else {
            bloated[pos++] = value;
        }
    }
    return bloated;
}
于 2012-04-30T00:00:39.967 回答
1

与 7z 和 gzip 相比,我对 BZIP2 印象深刻。我没有亲自尝试过这个 Java 实现,但看起来很容易用你的 GZIP 调用代替这个并验证结果。

http://www.kohsuke.org/bzip2

于 2012-04-29T04:49:12.947 回答
1

您可能应该在数据流上尝试所有主要的,看看哪个效果最好。您还应该考虑到某些算法将需要更长的时间来运行,从而增加队列的延迟。这可能是也可能不是问题,具体取决于您的应用程序。

如果您对数据有所了解,有时可以获得更好的压缩。(dbaupp 的回答很好地涵盖了这种方法)

这种压缩算法的比较可能很有用。来自文章:

流行压缩算法的压缩比

于 2012-04-29T04:50:21.500 回答