5

有两个非常大的系列元素,第二个比第一个大 100 倍。对于第一个系列的每个元素,第二个系列中有 0 个或多个元素。这可以通过 2 个嵌套循环进行遍历和处理。但是第一个数组的每个成员的匹配元素数量的不可预测性使得事情变得非常非常缓慢。

第二系列元素的实际处理涉及逻辑与 (&) 和人口计数。

我找不到使用 C 的良好优化,但我正在考虑对第一个系列的每个元素进行内联 asm、rep* mov* 或类似操作,然后对第二个系列的匹配字节进行批处理,可能在缓冲区中1MB什么的。但是代码会变得非常混乱。

有人知道更好的方法吗?首选 C,但 x86 ASM 也可以。非常感谢!

为了清楚起见,带有简化问题的示例/演示代码,第一个系列是“人”,第二个系列是“事件”。(原来的问题其实是 100m 和 10,000m 条目!)

#include <stdio.h>
#include <stdint.h>

#define PEOPLE 1000000    //   1m
struct Person {
    uint8_t age;   // Filtering condition
    uint8_t cnt;   // Number of events for this person in E
} P[PEOPLE]; // Each has 0 or more bytes with bit flags

#define EVENTS 100000000  // 100m
uint8_t P1[EVENTS]; // Property 1 flags
uint8_t P2[EVENTS]; // Property 2 flags

void init_arrays() {
    for (int i = 0; i < PEOPLE; i++) { // just some stuff
        P[i].age = i & 0x07;
        P[i].cnt = i % 220; // assert( sum < EVENTS );
    }
    for (int i = 0; i < EVENTS; i++) {
        P1[i]    = i % 7;  // just some stuff
        P2[i]    = i % 9;  // just some other stuff
    }
}

int main(int argc, char *argv[])
{
    uint64_t   sum = 0, fcur = 0;

    int age_filter = 7; // just some

    init_arrays();      // Init P, P1, P2

    for (int64_t p = 0; p < PEOPLE ; p++)
        if (P[p].age < age_filter)
            for (int64_t e = 0; e < P[p].cnt ; e++, fcur++)
                sum += __builtin_popcount( P1[fcur] & P2[fcur] );
        else
            fcur += P[p].cnt; // skip this person's events

    printf("(dummy %ld %ld)\n", sum, fcur );

    return 0;
}

gcc -O5 -march=native -std=c99 test.c -o test
4

7 回答 7

4

由于平均每人获得 100 件物品,因此您可以通过一次处理多个字节来加快处理速度。为了使用指针而不是索引,我稍微重新安排了代码,并将一个循环替换为两个循环:

uint8_t *p1 = P1, *p2 = P2;
for (int64_t p = 0; p < PEOPLE ; p++) {
    if (P[p].age < age_filter) {
        int64_t e = P[p].cnt;
        for ( ; e >= 8 ; e -= 8) {
            sum += __builtin_popcountll( *((long long*)p1) & *((long long*)p2) );
            p1 += 8;
            p2 += 8;
        }
        for ( ; e ; e--) {
            sum += __builtin_popcount( *p1++ & *p2++ );
        }
    } else {
        p1 += P[p].cnt;
        p2 += P[p].cnt;
    }
}

在我的测试中,这将您的代码从 1.515s 加速到 0.855s。

于 2012-11-14T04:56:30.280 回答
2

尼尔的答案不需要按年龄排序,顺便说一句,这可能是个好主意——

cumsum[n+1]=cumsum[n]+__popcount(P[n]&P2[n]);
如果第二个循环有漏洞(请更正原始源代码以支持该想法),一个常见的解决方案是为每个人 做Thensum+=cumsum[fcur + P[p].cnt] - cumsum[fcur];

无论如何,计算负担似乎只是为了事件,而不是事件*人。无论如何,都可以通过为满足条件的所有连续人调用内部循环来进行一些优化。


sums (_popcounts(predicate[0..255]))如果真的有最多 8 个谓词,则将每个人的所有谓词预先计算到单独的数组 C[256][PEOPLE] 中可能是有意义的。这几乎使内存需求翻了一番(在磁盘上?),但将搜索从 10GB+10GB+...+10GB(8 个谓词)本地化到一个 200MB 的流(假设 16 位条目)。

根据 p(P[i].age < condition && P[i].height < cond2) 的概率,计算累积和可能不再有意义。也许,也许不是。更有可能一次只有 8 或 16 人的 SSE 并行性。

于 2012-11-13T19:15:02.030 回答
2

一种全新的方法可能是使用ROBDD对每个人/每个事件的真值表进行编码。首先,如果事件表不是很随机,或者不是由病态函数组成,比如大数乘法的真值表,那么第一个可以实现函数的压缩,第二个真值表的算术运算可以压缩形式计算. 每个子树可以在用户之间共享,并且两个相同子树的每个算术运算只需要计算一次。

于 2012-11-14T06:44:03.533 回答
1

我不知道您的示例代码是否准确反映了您的问题,但可以这样重写:

for (int64_t p = 0; p < PEOPLE ; p++)
    if (P[p].age < age_filter)
        fcur += P[p].cnt;

for (int64_t e = 0; e < fcur ; e++)
    sum += __builtin_popcount( P1[e] & P2[e] );
于 2012-11-11T00:08:49.197 回答
0

我不知道 gcc -O5 (这里似乎没有记录)并且似乎使用我的 gcc 4.5.4 产生与 gcc -O3 完全相同的代码(虽然,只在相对较小的代码示例上进行了测试)

根据您想要实现的目标,-O3 可能比 -O2 慢

与您的问题一样,我建议您更多地考虑您的数据结构而不是实际算法。只要您的数据没有以方便的方式表示,您就不应专注于通过适当的算法/代码优化来解决问题。

如果您想根据单个标准(此处为您的示例中的年龄)快速剪切大量数据,我建议您使用排序树的变体。

于 2012-11-13T16:59:42.370 回答
0

如果您的实际数据(年龄、计数等)确实是 8 位,则计算中可能存在很多冗余。在这种情况下,您可以通过查找表替换处理 - 对于每个 8 位值,您将有 256 个可能的输出,并且可以从表中读取计算数据而不是计算。

于 2012-11-14T05:15:56.510 回答
0

为了解决分支错误预测(在其他答案中缺失),代码可以执行以下操作:

#ifdef MISPREDICTIONS
if (cond)
    sum += value
#else
mask = - (cond == 0);  // cond: 0 then -0, binary 00..; cond: 1 then -1, binary 11..
sum += (value & mask); // if mask is 0 sum value, else sums 0
#endif

它不是完全免费的,因为存在数据依赖性(想想超标量 cpu)。但对于大多数不可预测的情况,它通常会获得 10 倍的提升。

于 2012-11-14T14:05:23.860 回答