5

好的,所以我正在尝试编写一个非常快速地计算素数的 Haskell 程序。大概我不是第一个尝试这样做的人。(特别是,我该死的肯定我看到了一些现有技术,但我现在找不到它......)

最初,我想计算小于 10^11 的素数。目前我已经让我的程序运行了大约 15 分钟,而且还不到一半。一些狂热的 C++ 程序员声称他的程序只需要 8 分钟。很明显,我做错了可怕的事情。

(如果重要的话,我当前的实现使用IOUArray Integer Bool多个线程来处理搜索空间的独立子范围。目前需要几秒钟才能从 10MB 数组块中删除所有 2 的倍数......)

请注意,10^11 对于 32 位算术来说太大了。此外,10^11 位 = 12.5 GB,太多的数据无法放入 Haskell 的 32 位地址空间。所以你不能一次将整个位图放在内存中。最后,请注意,小于 10^11 的素数只是小于 2^32 的阴影,因此您也无法一次存储所有实际整数。


编辑:显然我误读了时间信息。C++ 家伙实际上声称的是:

  • 计数 < 10^11 的素数仅使用一个内核需要 8 分钟,使用 4 个内核需要 56 秒。(未指定 CPU 类型。)

  • 计算 < 10^10 的素数需要 5 秒。(不确定正在使用多少个内核。)

对错误感到抱歉...

编辑:我的源代码可以在这里找到:http: //hpaste.org/72898

4

3 回答 3

13

使用arithmoi优秀的 StackOverflow 老师 Daniel Fischer 的包:

import Math.NumberTheory.Primes.Counting

main = print $ primeCount (10^11)

-- $ time ./arith
-- 4118054813
-- 
-- real 0m0.209s
-- user 0m0.198s
-- sys  0m0.008s

这比你“狂热”的 C++ 朋友写的任何东西都要快 40 倍;也许他可以从 Haskell 源代码中学到一两件事……这里是黑线鳕

于 2012-08-09T16:23:33.697 回答
12

一些狂热的 C++ 程序员声称他的程序只需要 8 秒。

那是挂钟时间还是CPU时间?

如果挂钟,并且任务被分成 100 个 CPU,比如说,它不是很令人印象深刻(它很不错),如果分成 1000 个,那就太可怜了。

如果是 CPU 时间:

我很确定实际上筛分到 10 11并没有达到时间。

在那之前有几个超过 4×10 9 个素数,假设一个稍微正常的 2-3GHz CPU,每个素数有 4-6 个周期。

用 Eratosthenes 的筛子和阿特金的筛子都无法做到这一点。每个素数都必须被检查和计数,每个复合材料都必须这样标记和检查。这给出了筛子中每个数字两个循环的理论下限,不包括例如数组初始化、循环边界检查、循环变量更新、冗余标记。你不会接近那个理论界限。

几个数据点:

Daniel Bernstein 的素数(阿特金筛),调整筛分块以充分利用我的 32KB L1 缓存,需要 90 秒将素数筛分到 10 11并计数它们(234 秒,默认筛块大小为 8K字)在我的 Core i5 2410M (2.3GHz) 上。(它针对高达 2 32的范围进行了优化,但在此之上,它变得明显更慢,对于限制 10 9时间分别为 0.49 和 0.64 秒。)

我的 Eratosthenes 分段筛,使用一些未暴露的内部结构来避免创建列表,筛分并在 340 秒内计数到 10 11(嗅:-/,但是嘿,对于 10 9,它花了 2.92 秒 - 它越来越近了,在 10 之间12和 10 13它超过了 primegen :) 使用公开的接口创建一个素数列表大约会使所用时间加倍,就像用 32 位 GHC 编译它一样。

所以我敢打赌,如果是正确的,那么报告的 8 秒时间(如果 CPU 时间)是用于计算素数数量的算法,而无需实际筛选整个过程。正如应用程序的回答所示,这可以做得更快

dafis@schwartz:~/Haskell/Repos/arithmoi> time tests/primeCount 100000000000
4118054813

real    0m0.145s
user    0m0.139s
sys     0m0.006s

请注意,10^11 对于 32 位算术来说太大了。此外,10^11 位 = 12.5 GB,太多的数据无法放入 Haskell 的 32 位地址空间。所以你不能一次将整个位图放在内存中。

要筛选该范围,您必须使用分段筛。即使您不受 32 位地址空间的限制,使用如此大的数组也会由于频繁的缓存未命中而产生糟糕的性能。您的程序将花费大部分时间来等待从主内存传输的数据。筛选适合您的 L2 缓存的块(我没有成功尝试通过使筛子适合 L1 来使其更快,我猜 GHC 运行时的开销太大而无法使其工作)。

此外,从筛子中消除一些小素数的倍数,这减少了所需的工作,并通过使筛子更小来进一步提高性能。消除偶数是微不足道的,3 的倍数容易,5 的倍数并不难。

最后,请注意,小于 10^11 的素数只是小于 2^32 的阴影,因此您也无法一次存储所有实际整数。

如果您将筛子存储为位数组列表,并删除 2、3 和 5 的倍数,则需要大约 3.3GB 来存储块,因此如果您真的可以拥有多达 4GB 的空间,它会很合适。但是您应该让不再需要的块立即被垃圾收集。

(以防万一,我当前的实现使用 IOUArray Integer Bool 和多个线程来处理搜索空间的独立子范围。目前从 10MB 数组块中删除所有 2 的倍数需要几秒钟...)

这很重要。

  • 用于Int索引和unsafeRead/unsafeWrite来读取和修改数组。Integer计算比Int计算慢得多,并且你得到的边界检查readArray/writeArray 真的很痛
  • 10MB 的块太大了,你会失去缓存局部性。最多使用几百 KB 的块(L2 缓存减去一些空间用于其他需要的东西)。
  • 尽管如此,即使使用Integer索引、边界检查和 10MB 块,删除 2 的倍数也不应该花费几秒钟。我们可以看看你的代码吗?

假期后更新:

无需深奥的魔法,八分钟即可将质数筛选到 10 11 。我看不出从一核到四核如何能产生八倍的加速,因为这里不应该有缓存效应,但无论如何,它可能是,没有看到代码,我无法调查。

因此,让我们看一下您的代码。

首先,一个错误:

vs <-
  mapM
    (\ start -> do
      let block = (start, start + block_size)
      v <- newEmptyMVar
      writeChan queue $ do
        say $ "New chunk " ++ show block
        chunk <- chunk_new block
        sieve_top base chunk
        c <- chunk_count chunk
        putMVar v c
      return v
    )
    (takeWhile (< target) $ iterate (+block_size) base_max)

每个数字base_max + k*block_size出现在两个块中,如果其中任何一个是素数,则该素数被计算两次,您也应该将上限限制在target.

现在到性能方面:

跳出来的一件事是它真的很健谈,如此健谈,一旦你调整了缓存(我为 512KB L2 缓存占用了 256KB 块),它就可以测量了——然后线程因为争夺消息block_size而减慢了速度。stdoutif prime < 100 then say $ "Sieve " ++ show prime else return ()

让我们看看您的(静音)筛分循环:

chunk_sieve :: Chunk -> Integer -> IO ()
chunk_sieve array prime = do
  (min, max) <- getBounds array
  let n0 = min `mod` prime
  let n1 = if n0 == 0 then min else min - n0 + prime
  mapM_
    (\ n -> writeArray array n (n == prime))
    (takeWhile (<= max) $ iterate (+prime) n1)

花费时间的一件事是,将每个指数与标出倍数的素数进行比较。每个单一的比较都很便宜(虽然比比较贵得多Int),但是大量的比较,其中只有一个可能产生True,加起来。在循环之后无条件地写入False并且如果需要True在素数的索引处写入会产生相当大的加速。

出于计时目的,我将目标减少到 10 9并在两个内核上运行它。原始代码花费了 155 秒(经过,292 秒用户),减少了block_size148 秒,静音了 143 秒。省略比较,

mapM_
  (\ n -> writeArray array n False)
  (takeWhile (<= max) $ iterate (+prime) n1)
when (min <= prime && prime <= max) $ writeArray array prime True

它以 131 秒的速度运行。

现在是时候进行一些更大的加速了。我是否已经提到边界检查会花费大量时间?由于循环条件保证不会尝试越界访问(并且素数足够小以至于不会Int发生 -overflow),我们应该真正使用未经检查的访问:

chunk_sieve :: Chunk -> Integer -> IO ()
chunk_sieve array prime = do
  (min, max) <- getBounds array
  let n0 = min `mod` prime
      n1 = if n0 == 0 then min else min - n0 + prime
      n2 = fromInteger (n1 - min)
      mx = fromInteger (max - min)
      pr = fromInteger prime
  mapM_
    (\ n -> unsafeWrite array n False)
    (takeWhile (<= mx) $ iterate (+pr) n2)
  when (min <= prime && prime <= max) $ writeArray array prime True

这将运行时间减少到 96 秒。好多了,但仍然很糟糕。罪魁祸首是

takeWhile (<= mx) $ iterate (+pr) n2

GHC 不能很好地融合那个组合,你会得到一个Int被遍历的 boxed 的列表。将其替换为算术序列,[n2, n2+pr .. mx]GHC 会愉快地使用 unboxed Int#s 创建一个循环,时间为 37 秒。

好多了,但仍然很糟糕。现在最大的时间消耗是

chunk_count :: Chunk -> IO Integer
chunk_count array = do
    (min, max) <- getBounds array
    work min max 0
  where
    work i max count = do
      b <- readArray array i
      let count' = count + if b then 1 else 0
      evaluate count'
      let i' = i+1
      if i' > max
        then return count'
        else work i' max count'

同样,边界检查需要花费大量时间。和

chunk_count :: Chunk -> IO Integer
chunk_count array = do
    (min, max) <- getBounds array
    work 0 (fromInteger (max-min)) 0
  where
    work i max count = do
      b <- unsafeRead array i
      let count' = count + if b then 1 else 0
      evaluate count'
      let i' = i+1
      if i' > max
        then return count'
        else work i' max count'

我们减少到 15 秒。现在,evaluate count'在. 在最后一行使用而不是将运行时间减少到 13 秒。以更适合(至少对于 GHC)的方式定义,workcountelse work i' max $! count'evaluatework

chunk_count :: Chunk -> IO Integer
chunk_count array = do
    (min, max) <- getBounds array
    let mx = fromInteger (max-min)
        work i !ct
            | mx < i    = return ct
            | otherwise = do
                b <- unsafeRead array i
                work (i+1) (if b then ct+1 else ct)
    work 0 0

将时间缩短到 6.55 秒。现在我们处于say $ "New chunk " ++ show block产生可测量差异的情况下,禁用它会使我们降低到 6.18 秒。

但是,通过从数组中读取一个字节来计算设置位,屏蔽掉不需要的位并将每个单独的位与 0 进行比较并不是最有效的方法。从数组中读取整个Words (通过castIOUArray)并使用popCount 会更快,如果“你知道你在做什么......”,这会让我们缩短到 4.25 秒;当素数的平方变得大于块的上限时停止标记

sieve_top :: Chunk -> Chunk -> IO ()
sieve_top base chunk = work 2
  where
    work prime = do
      chunk_sieve chunk prime
      mp <- chunk_next_prime base prime
      case mp of
        Nothing -> return ()
        Just p' -> do
            (_,mx) <- getBounds chunk
            when (p'*p' <= mx) $ work p'

到 3.9 秒。仍然不壮观,但考虑到我们从哪里开始,还不错。只是为了说明在减少其他不良行为后缓存局部性的重要性:具有原始 10MB 块大小的相同代码需要 8.5 秒。

代码中的另一个小问题是所有线程都使用相同的可变小素数数组进行筛选。由于它是可变的,因此必须同步对其的访问,这会增加一些开销。只有两个线程,开销不会太大,使用不可变副本进行筛选只会将时间减少到 3.75 秒,但我希望更多线程的效果会更大。(我只有两个物理内核 - 带有超线程 - 所以使用两个以上的线程来做同样的工作会导致减速,这可能会使从中得出的结论无效,但是使用四个线程,我用不可变数组得到 4.55 秒而不是 5.3 秒使用可变数组。这似乎证实了不断增长的同步开销。)

通过消除更多的计算和为 GHC 的优化器编写代码(更多的工作者/包装器转换,一些静态参数转换),仍然可以获得一些Integer收益,但不是很多,可能是 10-15%。

下一个重大改进是通过从筛子中消除偶数来获得。这将工作、分配和运行时间减少了一半以上。任何素筛都不应该考虑偶数,真的,那只是无意义的浪费工作。

于 2012-08-09T20:45:42.870 回答
1

这是一个奇怪的问题/答案,因为接受的答案与问题不匹配:问题要求帮助提高筛子的速度(正确选择 Eratosthenes 实现的页面分段筛子)但接受的答案不使用筛子,而是一种数值分析技术,只是一个库。虽然这对于非常快速地找到最大范围内的素数总数是很好的(并且在其他语言中有更快和更广泛的版本可以做到这一点,例如 Kim Walisch 在 C++ 中的 primecount,也可以快速计算素数之和范围,筛子可用于进行特定类型的分析,例如寻找素数间隙,素数双倍,三元组等的存在。(通常是 K-Tuple primes ) 等。 事实上,一般数值分析技术,如Meissel-Lehmer 算法,其中大多数都是基于的,需要一个“种子素数”源来启动,这最好由优化的埃拉托色尼筛。

实际上,根据上述链接,Kim Walisch 的 primecount 已经为其构建了 GHC/Haskell API,并且可以通过外部函数接口 (FFI) 轻松调用,因此比 arithmoi 库更好,因为它更快。它是如此之快,以至于它是目前计算素数高达 1e28 的记录保持者!如果必须为 Haskell 程序提供这样的值并且不在乎他们是否理解它是如何获取它的,它会在几十毫秒内计算出 1e11 的素数。

以类似的方式,如果筛子是真正需要的,那么Kim Walisch 的素筛也有 GHC Haskell FFI,也可以直接调用。

虽然使用库可以完成工作,但仅仅通过使用它们就不会学到任何关于如何实现它们的知识。因此,Daniel Fischer(DF's)非常好的教程答案的原因以及他开始的后续系列。DF 的回答显示了如何改进问题的代码,但是没有一个总结可以显示在完成他的所有建议后代码应该是什么样子;这一点尤其重要,因为 OP 的 hpaste 中的原始问题代码已经消失(幸运的是,这只是一个很好的例子,说明如何不这样做,但也许代码应该嵌入问题中以供参考),我们只能通过 DF 在他的回答中的评论重建它所做的事情。这一系列答案旨在纠正如果有人需要纯 GHC Haskell 中的这种筛子,首先是对 DF 教学所引导的代码的摘要,然后是进一步的阶段性改进。

TLDR;跳到我发布的 Haskell 代码的最后一个答案的末尾,它实际上几乎与 Kim Walisch 的素数筛一样快或一样快,到目前为止,这可能是世界上最快的,至少在 10 到 10 的较小范围内千亿左右,除了非常优化的 YAFU 版本之外,其他任何东西都比不上任何东西,它对于大范围的速度可能快 5% 左右。 DF 在车轮分解之前的最终代码据说比原始问题代码快 40 倍,我将其扩展到我的代码再次快 30 到 32 倍,总共比原始问题代码快 1200 到 1280 倍!.

原始问题代码

这将是我唯一一次引用它,因为它不再可用(而且我认为无论如何都不值得修改):我唯一喜欢该代码的地方是线程池的实现,但即使这样也有缺陷它用于mapM将要由线程处理的整个作业队列提供给通道,这是一个非延迟函数,因此可能会将大量工作推送到消耗大量内存的作业通道上,而不仅仅是推送足够多的内存努力让所有线程保持忙碌,然后再为从结果返回的每一个人提供一份工作Channel。我将在此答案底部的代码和后续代码中更正这一点。实际上,只需要一个结果 MVar 池,因为 GHC 运行时分叉新线程的速度比将新工作通过管道传输到等待池所需的速度更快。

原始代码和 DF 改进代码的一个问题是它们都没有使用“-fllvm”编译器标志来使用 LLVM 后端代码生成器。对于我们在这里尝试编写的紧密循环,LLVM 可以将每个循环的时间减少大约两倍。原始代码的循环非常不紧密,LLVM 无能为力,但这并不重要,但 DF 的代码确实有紧密的循环,并且可以通过将循环时间减少到大约 60% 来受益。

使用MVar's 的另一个问题(因此Chan's) 是它们不是特别快,每次激活的开销约为 3 毫秒。我们在 DF 在他的最终分析中的回答中得到了这个问题的证据,他说“使用四个线程,不可变数组得到 4.55 秒,而可变数组得到 5.3 秒”,而使用两个线程则得到 3.75 秒。现在他的机器只有两个内核,额外的两个线程是超线程的,与另外两个共享大部分相同的资源,所以使用它们的人并不期望有更好的性能,但也不期望性能更差,因为这里。原因是开销太大,以至于添加效率低下的内核实际上会增加额外的工作并减慢最终结果。我在我的四个“真实”中也看到了这一点 使用 LLVM 后端提高效率后的线程/核心机器。在使用所有四个代码时,我只将时间减少到大约 55%,这与我只使用两个内核时总执行时间实际上增加的情况是一致的。由于 `MVar's 是我们可以使用 GHC 实现“等待结果”的唯一方法,因此解决方案是使工作切片更大(更粗粒度的多线程),因此这种开销变成可以忽略不计的一部分,这是我在第二个答案中的算法改进。

一旦重载问题得到解决,也不需要像在无限深度的地方那样使用通道来接收工作并返回结果,所以我消除了它们,只支持一个“循环”数组,它的元素数量为池中的进程。

测试环境

我不确定 DF 是否还有他的 Sandy Bridge 笔记本电脑,虽然我有一个 Sandy Bridge CPU,但它目前正在停机进行维护。然而,在线 IDE 网站 Wandbox使用的 Broadwell CPU 与 DF 的机器在 2.3 GHz 的回答中使用的额定值大致相同,涡轮增压到 2.9 GHz,用于两核/四线程(超线程)的单线程使用。这与 DF 的机器具有相同的性能,正如我采用他引用的“arithmoi”库的内部结构并强制它运行十亿的范围所证明的那样。 这个魔杖盒链接显示它的运行时间与他在回答中提到的几乎完全相同的 2.92 秒。我没有费心去计算结果(使用弹出计数只有大约 0.01 毫秒),因为这不会改变比较,而只是强制它在默认的 128 KB 缓冲区大小的范围内运行。

因此,在 Wandbox 中,我们有一个易于引用的可比机器;但是,它有一个限制,它不支持使用 LLVM 后端,它的使用对于优化我们将使用的紧密剔除循环很重要。因此,我将在我自己的机器上进行有无 LLVM 的比较,这是一个 3.2 GHz 的英特尔 Skylake i5-2500,单线程使用提升到 3.6 GHz。这有一个轻微的限制,因为 Skylake 在架构上进行了进一步的改进,可以更好地预测分支,并且在正确预测分支时将分支省略至零时间,因此结果不会直接按使用的时钟速度进行缩放;由于我们正在开发的循环几乎将所有时间都花在了紧密的循环中,

快速 Sieve 算法背后的原理

这些原则只有两个,如下:

  1. 对于给定的筛分范围,保持较低的操作总数很重要。
  2. 每个操作平均必须占用少量的 CPU 时钟周期。

最终的执行效率就是这两者的乘积。

绩效目标

DF 似乎认为 Atkin 和 Bernstein 的“primegen”是筛分性能的“黄金标准”。这不是主要原因,它不会也不能平均每个操作占用少量 CPU 时钟周期(原则 2),并且它消耗的周期数随着范围的增加而增加,速度快于我认为的“黄金”标准”- TLDR 中引用的 Kim Walisch 的主筛。虽然阿特金筛 (SoA) 的这种实现完全正确,但它通过了上述原则 1),因为它快速收敛到恒定的操作数 0。正如网页上该点上方的公式所估计的那样(1e11 为 0.2984,随着范围的增加而更高),它在效率方面没有达到预期。SoA 文档进行的比较与 Eratosthenese (SoE) 的比较存在缺陷:在与“primegen”代码相同的下载中是“eratspeed”的代码,它是 SoE 的参考版本,筛分超过十亿。然而,他们削弱了该参考版本,因为他们将其限制为与烘焙到 SoA 中相同的 2/3/5 轮分解(无法增加),而不是使用最大轮分解组合筛(有他们知道的文件中的证据)。这使得该范围内的操作数量略高于 4 亿次,而 SoA 的操作数约为 258 次。700万。接下来,他们似乎进一步削弱了参考 SoE,使筛子缓冲区比用于 SoA 的缓冲区更小,从而增加了 SoE 的每次操作时间,因此与 SoA 的时间差不多,尽管这些操作比那些操作更简单的 SoA。通过这种方式,他们声称 SoA 比 SoA 快了大约 40%。

Berstein 以类似的方式对这两者的紧密内部操作循环进行了一些手动优化,这可能是由于当时的 C 编译器无法完全优化这些循环,并且为了让那些编译器不撤消这些手优化他在注释中指出,编译器应该只在第一级优化下运行。这对于今天的“gcc”版本不再适用,因为两者的性能都随着“-O3”高级优化而提高。如果两者都设置为 8192 32-bit words (32 Kilobytes) 并使用上述优化编译,它们在 DF 型机器上都在大约 0.49/0.50 秒内筛选到十亿,表明 CPU 的数量对于“eratspeed”,每次剔除的周期减少了约 36%。如果应用了最大轮分解组合原则,那么它应该再快 40%,这是在对紧密的内部剔除循环进行优化之前;SoA 无法对剔除循环进行这些优化,因为与 SoE 的每个循环的固定跨度相比,它必须使用每个操作的可变跨度循环。这个参考“eratspeed”SoE 实现将在答案后面进一步讨论,因为这是导致我改进算法答案的方法。

作为关于 SoA“primegen”的最后一点,Bernstein 似乎认为筛缓冲区需要限制为小于 CPU L1 缓存大小。虽然这对于他开发这项工作的 CPU 来说可能是正确的,但对于 L2 高速缓存性能比 SoA 快得多并且接近参考 SoE 紧凑内循环时间的现代 CPU 来说不再正确。因此,如果使筛选缓冲区等于 CPU L2 缓存大小(在这些 CPU 的情况下为 256 Kiloytes),“primegen”的筛选到 10 亿的时间几乎不会发生变化,大约为 0.50 秒,但时间筛分到 1000 亿个几乎线性缩放到大约 52 秒(应该如此)。这是有效的,因为缓冲区已增加,因此 SoA 不会很快受到操作跨度溢出的困扰,但它不会

作为参考,当单线程使用 Wandbox 上的两个线程时,“primesieve”类型的算法在大约 0.18 秒内筛选到十亿个,大约 25 秒到 1000 亿个范围内,这两个时间都减少了大约两倍/DF CPU 范围。

DF 对他的回答的最终结果和提出的进一步工作的状态

DF 表示,当单线程/在两个线程上分别运行时,他的最终答案代码将在大约 7.5/3.75 秒内筛选到十亿。这代表了大约 25.514 亿次操作,在 2.9 GHz 的 CPU 时钟下,每个剔除 (CpC) 代表大约 9 个 CPU 时钟。这不好,因为基本的剔除循环需要大约 3.5 CpC。这是不使用上述 LLVM 的结果。

他建议第一个进一步的改进是通过赔率进行车轮分解——仅将速度提高大约 2.5 倍,并且使用更多扩展的车轮分解进行进一步改进是相当容易的。这是真的。然而,他在“arithmoi”库的素数函数中进行扩展轮因式分解的尝试非常失败:在 2.9 GHz 下进行 4.048 亿次剔除需要 2.92 秒,大约是 21 CpC,就像 460.7 亿次剔除需要 340 秒一样筛选到 1000 亿个的筛选也非常糟糕,每次筛选的时钟都差不多。这太慢了,没有理由进行这种扩展的 2/3/5 轮子分解,因为结果将与仅使用赔率相同或慢,即使在 9 CpC 时也是如此。这种糟糕的效率的原因是他使用了一些复杂的,因此很慢,数学来减少车轮分解的剔除,但这些计算需要大量的机器时间。有一些查找表方法可以做到这一点,每次剔除大约 12 个 CPU 时钟周期,速度大约是前者的两倍,但在如此小的范围内使用它们仍然太慢;它们的使用应仅限于为非常大的范围增加有效范围,其中它们所花费的时间百分比仅占总时间的一小部分。

为了说明这些结果有多糟糕,这里有一个参考 Wandbox Javascript 仅赔率版本,在大约 2.14 秒或大约 6.15 CpC 内筛选到十亿;这在我的 Skylake 机器上以 1.54 秒运行,由于上面提到的 5.4 CpC 架构的改进,时间减少到时钟速率比率之上。

此外,在 Haskell 中,这是我提交给 RosettaCode 的仅赔率版本,这是第二个更快的版本,它在 2.26 秒内在参考 Wandbox CPU 上运行,在没有 LLVM 的情况下编译成十亿,大约 6.4 CpC。在我的 Skylake 机器上,这在没有/有 LLVM 的情况下分别以 1.83 秒和 1.023 秒运行(分别为 6.4/3.6 CpC)并且在没有 LLVM/LLVM 的情况下分别在大约 210/127 秒内筛分到 1000 亿(分别为 6.7/4.0 CpC )。请注意,这些比“arithmoi”库轮分解版本更快。这将成为我第二个答案中进一步算法改进的基础。

因此,以下代码是一个仅赔率算法,按照 DF 提到的作为他的最终答案执行:

-- Multi-threaded Page-Segmented Bit-Packed Odds-Only Sieve of Eratosthenes... 
-- "Running a modern CPU single threaded is like
--  running a race car on one cylinder" me ...

-- compile with "-threaded" to use maximum available cores and threads...
-- compile with "-fllvm" for highest speed by a factor of up to two times.

{-# LANGUAGE FlexibleContexts, ScopedTypeVariables #-} -- , BangPatterns, MagicHash, UnboxedTuples, Strict
{-# OPTIONS_GHC -O2 -fllvm #-} -- or -O3 -keep-s-files -fno-cse -rtsopts

import Data.Int ( Int32, Int64 )
import Data.Word ( Word32, Word64 )
import Data.Bits ( (.&.), (.|.), shiftL, shiftR, popCount )
import Data.Array.Base (
         UArray(..), listArray, assocs, unsafeAt, elems,
         STUArray(..), newArray,
         unsafeRead, unsafeWrite,
         unsafeThaw, unsafeFreezeSTUArray, castSTUArray )
import Data.Array.ST ( runSTUArray )
import Control.Monad.ST ( ST, runST )
import Data.Time.Clock.POSIX ( getPOSIXTime )

-- imports to do with multi-threading...
import Data.Array (Array)
import Control.Monad ( forever, when )
import GHC.Conc ( getNumProcessors )
import Control.Monad.Cont ( join )
import Control.Concurrent
    ( ThreadId,
      forkIO,
      getNumCapabilities,
      myThreadId,
      setNumCapabilities )
import Control.Concurrent.MVar ( MVar, newEmptyMVar, putMVar, takeMVar )
import System.IO.Unsafe ( unsafePerformIO )

type Prime = Word64
type PrimeNdx = Int64
type StartAddr = Int32
type StartAddrArr = UArray Int StartAddr
type BasePrimeRep = Word32
type BasePrimeRepArr = UArray Int BasePrimeRep
type SieveBuffer = UArray Int Bool -- no point to artificial index!
 
-- constants related to odds-only...
cWHLPRMS :: [Prime]
cWHLPRMS = [2] -- excludes even numbers other than 2
cFRSTSVPRM :: Prime
cFRSTSVPRM = 3 -- start at first prime past the wheel prime(s)
 
makeSieveBuffer :: Int -> SieveBuffer
{-# INLINE makeSieveBuffer #-}
makeSieveBuffer szbts = runSTUArray $ do
  newArray (0, szbts - 1) False

-- count the remaining un-marked composite bits using very fast popcount...
{-# INLINE countSieveBuffer #-}
countSieveBuffer :: Int -> SieveBuffer -> Int
countSieveBuffer lstndx sb = runST $ do
  cmpsts <- unsafeThaw sb -- :: ST s (STUArray s PrimeNdx Bool)
  wrdcmpsts <-
    (castSTUArray :: STUArray s Int Bool ->
                      ST s (STUArray s Int Word64)) cmpsts
  let lstwrd = lstndx `shiftR` 6
  let lstmsk = 0xFFFFFFFFFFFFFFFE `shiftL` (lstndx .&. 63)
  let loopwi wi cnt =
        if wi < lstwrd then do
          v <- unsafeRead wrdcmpsts wi
          case cnt - popCount v of
            ncnt -> ncnt `seq` loopwi (wi + 1) ncnt
        else do
          v <- unsafeRead wrdcmpsts lstwrd
          return $ fromIntegral (cnt - popCount (v .|. lstmsk))
  loopwi 0 (lstwrd * 64 + 64)

cWHLPTRNLEN64 :: Int
cWHLPTRNLEN64 = 2048

cWHLPTRN :: SieveBuffer -- twice as big to allow for overflow...
cWHLPTRN = makeSieveBuffer (131072 + 131072)

-- could be faster using primitive copyByteArray#...
-- in preparation for filling with pre-cull pattern...
fillSieveBuffer :: PrimeNdx -> SieveBuffer -> SieveBuffer
fillSieveBuffer lwi sb@(UArray _ _ rng _) = runSTUArray $ do
  ptrn <- unsafeThaw cWHLPTRN :: ST s (STUArray s Int Bool)
  ptrnu64 <- (castSTUArray :: STUArray s Int Bool ->
                                  ST s (STUArray s Int Word64)) ptrn
  cmpsts <- unsafeThaw sb :: ST s (STUArray s Int Bool)
  cmpstsu64 <- (castSTUArray :: STUArray s Int Bool ->
                                  ST s (STUArray s Int Word64)) cmpsts
  let lmt = rng `shiftR` 6
      lwi64 = lwi `shiftR` 6
      loop i | i >= lmt = return cmpsts
             | otherwise =
                 let mdlo = fromIntegral $ lwi64 `mod` fromIntegral cWHLPTRNLEN64
                     sloop j
                       | j >= cWHLPTRNLEN64 = loop  (i + cWHLPTRNLEN64)
                       | otherwise = do
                          v <- unsafeRead ptrnu64 (mdlo + j)
                          unsafeWrite cmpstsu64 (i + j) v; sloop (j + 1) in sloop 0
  loop 0

cullSieveBuffer :: PrimeNdx -> [BasePrimeRepArr] -> SieveBuffer -> SieveBuffer
cullSieveBuffer lwi bpras sb@(UArray _ _ rng _) = runSTUArray $ do
  cmpsts <- unsafeThaw sb :: ST s (STUArray s Int Bool)
  let limi = lwi + fromIntegral rng - 1
      loopbpras [] = return cmpsts -- stop warning incomplete pattern match!
      loopbpras (bpra@(UArray _ _ bprrng _) : bprastl) =
        let loopbpi bpi
              | bpi >= bprrng = loopbpras bprastl
              | otherwise =
                let bp = unsafeAt bpra bpi
                    bpndx = (fromIntegral bp - cFRSTSVPRM) `shiftR` 1
                    rsqri = fromIntegral ((bpndx + bpndx) * (bpndx + cFRSTSVPRM)
                                             + cFRSTSVPRM) - lwi in
                if rsqri >= fromIntegral rng then return cmpsts else
                let bpint = fromIntegral bp
                    bppn = fromIntegral bp
                    cullbits c | c >= rng = loopbpi (bpi + 1)
                               | otherwise = do unsafeWrite cmpsts c True
                                                cullbits (c + bpint)
                    s = if rsqri >= 0 then fromIntegral rsqri else
                        let r = fromIntegral (-rsqri `rem` bppn)
                        in if r == 0 then 0 else fromIntegral (bppn - r)
                in cullbits s in loopbpi 0
  loopbpras bpras

-- multithreading goes here...

{-# NOINLINE cNUMPROCS #-}
cNUMPROCS :: Int -- force to the maximum number of threads available
cNUMPROCS = -- 1
-- {-
  unsafePerformIO $ do -- no side effects because global!
  np <- getNumProcessors; setNumCapabilities np
  getNumCapabilities
--}

-- list of culled soeve buffers from index with give bit size...
makePrimePagesFrom :: forall r. PrimeNdx -> Int ->
                                (PrimeNdx -> SieveBuffer -> r) -> Bool -> [r]
makePrimePagesFrom stwi szbts cnvrtrf thrdd =
  -- great, we can make an extra thread pool whenever we might need more, and
  -- it should die and be collected whenever this goes out of scope!
  let bpras = makeBasePrimeRepArrs thrdd
      jbparms() =
        let loop lwi szb =
              (lwi, szb) : loop (lwi + fromIntegral szb) szb
        in loop stwi szbts in
  if thrdd then
    let

      {-# NOINLINE strttsk #-}
      strttsk lwi szbts bpras mvr = -- do some strict work but define it non-strictly,
        forkIO $ do -- else it will run in forground before threading!
          -- and return it using a MVar; force strict execution in thread...
          putMVar mvr $! cnvrtrf lwi $ cullSieveBuffer lwi bpras
                           $ fillSieveBuffer lwi $ makeSieveBuffer szbts

      -- start a result pool, initialized to start with the first tasks...
      {-# NOINLINE rsltpool #-}
      rsltpool :: Array Int (MVar r) = unsafePerformIO $! do
        mvlst <- mapM (const newEmptyMVar) [ 1 .. cNUMPROCS ] -- unique copies
        mapM_ (\ (mvr, (lwi, szb)) -> strttsk lwi szb bpras mvr)
                                          $ zip mvlst $ jbparms()
        return $! listArray (0, cNUMPROCS - 1) mvlst

      -- lazily loop over the entire job list...
      loop (fdhd : fdtl) =
        let {-# NOINLINE getnxt #-}
            getnxt ((lwi, szb), i) = unsafePerformIO $! do -- wait for and get result of next page
              let mvr = unsafeAt rsltpool i
              r <- takeMVar mvr -- recycle mvr for next
              strttsk lwi szb bpras mvr; return $! r
        in getnxt fdhd : loop fdtl

  -- lazily cycle over the rest of the jobs forever...
  in rsltpool `seq` loop $ zip (drop cNUMPROCS $ jbparms())
                               (cycle [ 0 .. cNUMPROCS - 1 ]) else 

-- back to non multi-threaded functions...

  let loop ((lwi, szb) : jbpmstl) =
        (cnvrtrf lwi . cullSieveBuffer lwi bpras . fillSieveBuffer lwi .
           makeSieveBuffer) szb : loop jbpmstl
  in loop $ jbparms()

makeBasePrimeRepArrs :: Bool -> [BasePrimeRepArr]
makeBasePrimeRepArrs thrdd = 
  let sb2bpra :: PrimeNdx -> SieveBuffer -> BasePrimeRepArr
      sb2bpra lwi sb@(UArray _ _ rng _) =
        let len = countSieveBuffer (rng - 1) sb
            bpbs = fromIntegral cFRSTSVPRM + fromIntegral (lwi + lwi) in
        listArray (0, len - 1) [ bpbs + fromIntegral (i + i) |
                                          (i, False) <- assocs sb ]                
      fkbpras = [ sb2bpra 0 $ makeSieveBuffer 512 ]
      bpra0 = sb2bpra 0 $ cullSieveBuffer 0 fkbpras $ makeSieveBuffer 131072
  in bpra0 : makePrimePagesFrom 131072 131072 sb2bpra thrdd

-- result functions are here...

-- prepends the wheel factorized initial primes to the sieved primes output...
-- some faster not useing higher-order-functions, but still slow so who cares?
primes :: Int -> Bool -> [Prime]
primes szbts thrdd = cWHLPRMS ++ concat prmslsts where
  -- convert a list of sieve buffers to a UArray of primes...
  sb2prmsa :: PrimeNdx -> SieveBuffer -> UArray Int Prime
  sb2prmsa lwi sb@(UArray _ _ rng _) = -- bsprm `seq` loop 0 where
    let bsprm = cFRSTSVPRM + fromIntegral (lwi + lwi)
        len = countSieveBuffer (rng - 1) sb in
    bsprm `seq` len `seq`
      listArray (0, len - 1)
                [ bsprm + fromIntegral (i + i) | (i, False) <- assocs sb ]
  prmslsts = map elems $ makePrimePagesFrom 0 szbts sb2prmsa thrdd

-- count the primes from the sieved page list to the limit...
countPrimesTo :: Prime -> Int -> Bool -> Int64
countPrimesTo limit szbts thrdd =
  let lmtndx = fromIntegral $ (limit - cFRSTSVPRM) `shiftR` 1 :: PrimeNdx
      sb2cnt lwi sb@(UArray _ _ rng _) =
        let nlwi = lwi + fromIntegral rng in
        if nlwi < lmtndx then (countSieveBuffer (rng - 1) sb, nlwi)
        else (countSieveBuffer (fromIntegral (lmtndx - lwi)) sb, nlwi)
      loop [] cnt = cnt
      loop ((cnt, nxtlwi) : cntstl) ocnt =
        if nxtlwi > lmtndx then ocnt + fromIntegral cnt
        else loop cntstl $ ocnt + fromIntegral cnt
  in if limit < cFRSTSVPRM then
       if limit < 2 then 0 else 1
     else loop (makePrimePagesFrom 0 szbts sb2cnt thrdd) 1
 
-- test it...
main :: IO ()
main = do
  let limit = 10^9 :: Prime
  -- page segmentation sized for most efficiency;
  -- fastest with CPU L1 cache size but more address calculation overhead;
  -- a little slower with CPU L2 cache size but just about enough to
  -- cancell out the gain from reduced page start address calculations...
  let cSIEVEPGSZ = (2^18) * 8 :: Int -- CPU L2 cache size in bits
  let threaded = True

  putStrLn $ "There are " ++ show cNUMPROCS ++ " threads available."
 
  strt <- getPOSIXTime
--  let answr = length $ takeWhile (<= limit) $ primes cSIEVEPGSZ threaded -- slow way
  let answr = countPrimesTo limit cSIEVEPGSZ threaded -- fast way
  stop <- answr `seq` getPOSIXTime -- force evaluation of answr b4 stop time!
  let elpsd = round $ 1e3 * (stop - strt) :: Int64
 
  putStr $ "Found " ++ show answr
  putStr $ " primes up to " ++ show limit
  putStrLn $ " in " ++ show elpsd ++ " milliseconds."

这已经从我上面提到的 RosettaCode 提交中进行了重构,通过使主筛循环和辅助基础主要进料循环可以具有不同的筛缓冲区大小,以及添加多线程(如上所述在 DF 之上进行了改进)成为可能。它的运行速度与 DF 在 CpC 中提到的最终答案大致相同,相当于他的没有 LLVM 的机器(大约 3/1.5 秒,9 CpC 到 10 亿)并且以 1 秒到 10 亿/125 秒到 1000 亿的速度运行(分别为 3.7/4.1 CpC)在我的带有 LLVM 单线程的 Skylake 机器上,由于如上所述的不够“粗粒度”的问题,大约有一半是多线程的。

这个答案只比 DF 的代码快不到两倍,主要是由于推荐使用 LLVM 后端。

于 2021-02-08T06:58:55.780 回答