这是一个奇怪的问题/答案,因为接受的答案与问题不匹配:问题要求帮助提高筛子的速度(正确选择 Eratosthenes 实现的页面分段筛子)但接受的答案不使用筛子,而是一种数值分析技术,只是一个库。虽然这对于非常快速地找到最大范围内的素数总数是很好的(并且在其他语言中有更快和更广泛的版本可以做到这一点,例如 Kim Walisch 在 C++ 中的 primecount,也可以快速计算素数之和范围,筛子可用于进行特定类型的分析,例如寻找素数间隙,素数双倍,三元组等的存在。(通常是 K-Tuple primes ) 等。 事实上,一般数值分析技术,如Meissel-Lehmer 算法,其中大多数都是基于的,需要一个“种子素数”源来启动,这最好由优化的埃拉托色尼筛。
实际上,根据上述链接,Kim Walisch 的 primecount 已经为其构建了 GHC/Haskell API,并且可以通过外部函数接口 (FFI) 轻松调用,因此比 arithmoi 库更好,因为它更快。它是如此之快,以至于它是目前计算素数高达 1e28 的记录保持者!如果必须为 Haskell 程序提供这样的值并且不在乎他们是否理解它是如何获取它的,它会在几十毫秒内计算出 1e11 的素数。
以类似的方式,如果筛子是真正需要的,那么Kim Walisch 的素筛也有 GHC Haskell FFI,也可以直接调用。
虽然使用库可以完成工作,但仅仅通过使用它们就不会学到任何关于如何实现它们的知识。因此,Daniel Fischer(DF's)非常好的教程答案的原因以及他开始的后续系列。DF 的回答显示了如何改进问题的代码,但是没有一个总结可以显示在完成他的所有建议后代码应该是什么样子;这一点尤其重要,因为 OP 的 hpaste 中的原始问题代码已经消失(幸运的是,这只是一个很好的例子,说明如何不这样做,但也许代码应该嵌入问题中以供参考),我们只能通过 DF 在他的回答中的评论重建它所做的事情。这一系列答案旨在纠正如果有人需要纯 GHC Haskell 中的这种筛子,首先是对 DF 教学所引导的代码的摘要,然后是进一步的阶段性改进。
TLDR;跳到我发布的 Haskell 代码的最后一个答案的末尾,它实际上几乎与 Kim Walisch 的素数筛一样快或一样快,到目前为止,这可能是世界上最快的,至少在 10 到 10 的较小范围内千亿左右,除了非常优化的 YAFU 版本之外,其他任何东西都比不上任何东西,它对于大范围的速度可能快 5% 左右。 DF 在车轮分解之前的最终代码据说比原始问题代码快 40 倍,我将其扩展到我的代码再次快 30 到 32 倍,总共比原始问题代码快 1200 到 1280 倍!.
原始问题代码
这将是我唯一一次引用它,因为它不再可用(而且我认为无论如何都不值得修改):我唯一喜欢该代码的地方是线程池的实现,但即使这样也有缺陷它用于mapM
将要由线程处理的整个作业队列提供给通道,这是一个非延迟函数,因此可能会将大量工作推送到消耗大量内存的作业通道上,而不仅仅是推送足够多的内存努力让所有线程保持忙碌,然后再为从结果返回的每一个人提供一份工作Channel
。我将在此答案底部的代码和后续代码中更正这一点。实际上,只需要一个结果 MVar 池,因为 GHC 运行时分叉新线程的速度比将新工作通过管道传输到等待池所需的速度更快。
原始代码和 DF 改进代码的一个问题是它们都没有使用“-fllvm”编译器标志来使用 LLVM 后端代码生成器。对于我们在这里尝试编写的紧密循环,LLVM 可以将每个循环的时间减少大约两倍。原始代码的循环非常不紧密,LLVM 无能为力,但这并不重要,但 DF 的代码确实有紧密的循环,并且可以通过将循环时间减少到大约 60% 来受益。
使用MVar
's 的另一个问题(因此Chan
's) 是它们不是特别快,每次激活的开销约为 3 毫秒。我们在 DF 在他的最终分析中的回答中得到了这个问题的证据,他说“使用四个线程,不可变数组得到 4.55 秒,而可变数组得到 5.3 秒”,而使用两个线程则得到 3.75 秒。现在他的机器只有两个内核,额外的两个线程是超线程的,与另外两个共享大部分相同的资源,所以使用它们的人并不期望有更好的性能,但也不期望性能更差,因为这里。原因是开销太大,以至于添加效率低下的内核实际上会增加额外的工作并减慢最终结果。我在我的四个“真实”中也看到了这一点 使用 LLVM 后端提高效率后的线程/核心机器。在使用所有四个代码时,我只将时间减少到大约 55%,这与我只使用两个内核时总执行时间实际上增加的情况是一致的。由于 `MVar's 是我们可以使用 GHC 实现“等待结果”的唯一方法,因此解决方案是使工作切片更大(更粗粒度的多线程),因此这种开销变成可以忽略不计的一部分,这是我在第二个答案中的算法改进。
一旦重载问题得到解决,也不需要像在无限深度的地方那样使用通道来接收工作并返回结果,所以我消除了它们,只支持一个“循环”数组,它的元素数量为池中的进程。
测试环境
我不确定 DF 是否还有他的 Sandy Bridge 笔记本电脑,虽然我有一个 Sandy Bridge CPU,但它目前正在停机进行维护。然而,在线 IDE 网站 Wandbox使用的 Broadwell CPU 与 DF 的机器在 2.3 GHz 的回答中使用的额定值大致相同,涡轮增压到 2.9 GHz,用于两核/四线程(超线程)的单线程使用。这与 DF 的机器具有相同的性能,正如我采用他引用的“arithmoi”库的内部结构并强制它运行十亿的范围所证明的那样。 这个魔杖盒链接显示它的运行时间与他在回答中提到的几乎完全相同的 2.92 秒。我没有费心去计算结果(使用弹出计数只有大约 0.01 毫秒),因为这不会改变比较,而只是强制它在默认的 128 KB 缓冲区大小的范围内运行。
因此,在 Wandbox 中,我们有一个易于引用的可比机器;但是,它有一个限制,它不支持使用 LLVM 后端,它的使用对于优化我们将使用的紧密剔除循环很重要。因此,我将在我自己的机器上进行有无 LLVM 的比较,这是一个 3.2 GHz 的英特尔 Skylake i5-2500,单线程使用提升到 3.6 GHz。这有一个轻微的限制,因为 Skylake 在架构上进行了进一步的改进,可以更好地预测分支,并且在正确预测分支时将分支省略至零时间,因此结果不会直接按使用的时钟速度进行缩放;由于我们正在开发的循环几乎将所有时间都花在了紧密的循环中,
快速 Sieve 算法背后的原理
这些原则只有两个,如下:
- 对于给定的筛分范围,保持较低的操作总数很重要。
- 每个操作平均必须占用少量的 CPU 时钟周期。
最终的执行效率就是这两者的乘积。
绩效目标
DF 似乎认为 Atkin 和 Bernstein 的“primegen”是筛分性能的“黄金标准”。这不是主要原因,它不会也不能平均每个操作占用少量 CPU 时钟周期(原则 2),并且它消耗的周期数随着范围的增加而增加,速度快于我认为的“黄金”标准”- TLDR 中引用的 Kim Walisch 的主筛。虽然阿特金筛 (SoA) 的这种实现完全正确,但它通过了上述原则 1),因为它快速收敛到恒定的操作数 0。正如网页上该点上方的公式所估计的那样(1e11 为 0.2984,随着范围的增加而更高),它在效率方面没有达到预期。SoA 文档进行的比较与 Eratosthenese (SoE) 的比较存在缺陷:在与“primegen”代码相同的下载中是“eratspeed”的代码,它是 SoE 的参考版本,筛分超过十亿。然而,他们削弱了该参考版本,因为他们将其限制为与烘焙到 SoA 中相同的 2/3/5 轮分解(无法增加),而不是使用最大轮分解组合筛(有他们知道的文件中的证据)。这使得该范围内的操作数量略高于 4 亿次,而 SoA 的操作数约为 258 次。700万。接下来,他们似乎进一步削弱了参考 SoE,使筛子缓冲区比用于 SoA 的缓冲区更小,从而增加了 SoE 的每次操作时间,因此与 SoA 的时间差不多,尽管这些操作比那些操作更简单的 SoA。通过这种方式,他们声称 SoA 比 SoA 快了大约 40%。
Berstein 以类似的方式对这两者的紧密内部操作循环进行了一些手动优化,这可能是由于当时的 C 编译器无法完全优化这些循环,并且为了让那些编译器不撤消这些手优化他在注释中指出,编译器应该只在第一级优化下运行。这对于今天的“gcc”版本不再适用,因为两者的性能都随着“-O3”高级优化而提高。如果两者都设置为 8192 32-bit words (32 Kilobytes) 并使用上述优化编译,它们在 DF 型机器上都在大约 0.49/0.50 秒内筛选到十亿,表明 CPU 的数量对于“eratspeed”,每次剔除的周期减少了约 36%。如果应用了最大轮分解组合原则,那么它应该再快 40%,这是在对紧密的内部剔除循环进行优化之前;SoA 无法对剔除循环进行这些优化,因为与 SoE 的每个循环的固定跨度相比,它必须使用每个操作的可变跨度循环。这个参考“eratspeed”SoE 实现将在答案后面进一步讨论,因为这是导致我改进算法答案的方法。
作为关于 SoA“primegen”的最后一点,Bernstein 似乎认为筛缓冲区需要限制为小于 CPU L1 缓存大小。虽然这对于他开发这项工作的 CPU 来说可能是正确的,但对于 L2 高速缓存性能比 SoA 快得多并且接近参考 SoE 紧凑内循环时间的现代 CPU 来说不再正确。因此,如果使筛选缓冲区等于 CPU L2 缓存大小(在这些 CPU 的情况下为 256 Kiloytes),“primegen”的筛选到 10 亿的时间几乎不会发生变化,大约为 0.50 秒,但时间筛分到 1000 亿个几乎线性缩放到大约 52 秒(应该如此)。这是有效的,因为缓冲区已增加,因此 SoA 不会很快受到操作跨度溢出的困扰,但它不会
作为参考,当单线程使用 Wandbox 上的两个线程时,“primesieve”类型的算法在大约 0.18 秒内筛选到十亿个,大约 25 秒到 1000 亿个范围内,这两个时间都减少了大约两倍/DF CPU 范围。
DF 对他的回答的最终结果和提出的进一步工作的状态
DF 表示,当单线程/在两个线程上分别运行时,他的最终答案代码将在大约 7.5/3.75 秒内筛选到十亿。这代表了大约 25.514 亿次操作,在 2.9 GHz 的 CPU 时钟下,每个剔除 (CpC) 代表大约 9 个 CPU 时钟。这不好,因为基本的剔除循环需要大约 3.5 CpC。这是不使用上述 LLVM 的结果。
他建议第一个进一步的改进是通过赔率进行车轮分解——仅将速度提高大约 2.5 倍,并且使用更多扩展的车轮分解进行进一步改进是相当容易的。这是真的。然而,他在“arithmoi”库的素数函数中进行扩展轮因式分解的尝试非常失败:在 2.9 GHz 下进行 4.048 亿次剔除需要 2.92 秒,大约是 21 CpC,就像 460.7 亿次剔除需要 340 秒一样筛选到 1000 亿个的筛选也非常糟糕,每次筛选的时钟都差不多。这太慢了,没有理由进行这种扩展的 2/3/5 轮子分解,因为结果将与仅使用赔率相同或慢,即使在 9 CpC 时也是如此。这种糟糕的效率的原因是他使用了一些复杂的,因此很慢,数学来减少车轮分解的剔除,但这些计算需要大量的机器时间。有一些查找表方法可以做到这一点,每次剔除大约 12 个 CPU 时钟周期,速度大约是前者的两倍,但在如此小的范围内使用它们仍然太慢;它们的使用应仅限于为非常大的范围增加有效范围,其中它们所花费的时间百分比仅占总时间的一小部分。
为了说明这些结果有多糟糕,这里有一个参考 Wandbox Javascript 仅赔率版本,在大约 2.14 秒或大约 6.15 CpC 内筛选到十亿;这在我的 Skylake 机器上以 1.54 秒运行,由于上面提到的 5.4 CpC 架构的改进,时间减少到时钟速率比率之上。
此外,在 Haskell 中,这是我提交给 RosettaCode 的仅赔率版本,这是第二个更快的版本,它在 2.26 秒内在参考 Wandbox CPU 上运行,在没有 LLVM 的情况下编译成十亿,大约 6.4 CpC。在我的 Skylake 机器上,这在没有/有 LLVM 的情况下分别以 1.83 秒和 1.023 秒运行(分别为 6.4/3.6 CpC)并且在没有 LLVM/LLVM 的情况下分别在大约 210/127 秒内筛分到 1000 亿(分别为 6.7/4.0 CpC )。请注意,这些比“arithmoi”库轮分解版本更快。这将成为我第二个答案中进一步算法改进的基础。
因此,以下代码是一个仅赔率算法,按照 DF 提到的作为他的最终答案执行:
-- Multi-threaded Page-Segmented Bit-Packed Odds-Only Sieve of Eratosthenes...
-- "Running a modern CPU single threaded is like
-- running a race car on one cylinder" me ...
-- compile with "-threaded" to use maximum available cores and threads...
-- compile with "-fllvm" for highest speed by a factor of up to two times.
{-# LANGUAGE FlexibleContexts, ScopedTypeVariables #-} -- , BangPatterns, MagicHash, UnboxedTuples, Strict
{-# OPTIONS_GHC -O2 -fllvm #-} -- or -O3 -keep-s-files -fno-cse -rtsopts
import Data.Int ( Int32, Int64 )
import Data.Word ( Word32, Word64 )
import Data.Bits ( (.&.), (.|.), shiftL, shiftR, popCount )
import Data.Array.Base (
UArray(..), listArray, assocs, unsafeAt, elems,
STUArray(..), newArray,
unsafeRead, unsafeWrite,
unsafeThaw, unsafeFreezeSTUArray, castSTUArray )
import Data.Array.ST ( runSTUArray )
import Control.Monad.ST ( ST, runST )
import Data.Time.Clock.POSIX ( getPOSIXTime )
-- imports to do with multi-threading...
import Data.Array (Array)
import Control.Monad ( forever, when )
import GHC.Conc ( getNumProcessors )
import Control.Monad.Cont ( join )
import Control.Concurrent
( ThreadId,
forkIO,
getNumCapabilities,
myThreadId,
setNumCapabilities )
import Control.Concurrent.MVar ( MVar, newEmptyMVar, putMVar, takeMVar )
import System.IO.Unsafe ( unsafePerformIO )
type Prime = Word64
type PrimeNdx = Int64
type StartAddr = Int32
type StartAddrArr = UArray Int StartAddr
type BasePrimeRep = Word32
type BasePrimeRepArr = UArray Int BasePrimeRep
type SieveBuffer = UArray Int Bool -- no point to artificial index!
-- constants related to odds-only...
cWHLPRMS :: [Prime]
cWHLPRMS = [2] -- excludes even numbers other than 2
cFRSTSVPRM :: Prime
cFRSTSVPRM = 3 -- start at first prime past the wheel prime(s)
makeSieveBuffer :: Int -> SieveBuffer
{-# INLINE makeSieveBuffer #-}
makeSieveBuffer szbts = runSTUArray $ do
newArray (0, szbts - 1) False
-- count the remaining un-marked composite bits using very fast popcount...
{-# INLINE countSieveBuffer #-}
countSieveBuffer :: Int -> SieveBuffer -> Int
countSieveBuffer lstndx sb = runST $ do
cmpsts <- unsafeThaw sb -- :: ST s (STUArray s PrimeNdx Bool)
wrdcmpsts <-
(castSTUArray :: STUArray s Int Bool ->
ST s (STUArray s Int Word64)) cmpsts
let lstwrd = lstndx `shiftR` 6
let lstmsk = 0xFFFFFFFFFFFFFFFE `shiftL` (lstndx .&. 63)
let loopwi wi cnt =
if wi < lstwrd then do
v <- unsafeRead wrdcmpsts wi
case cnt - popCount v of
ncnt -> ncnt `seq` loopwi (wi + 1) ncnt
else do
v <- unsafeRead wrdcmpsts lstwrd
return $ fromIntegral (cnt - popCount (v .|. lstmsk))
loopwi 0 (lstwrd * 64 + 64)
cWHLPTRNLEN64 :: Int
cWHLPTRNLEN64 = 2048
cWHLPTRN :: SieveBuffer -- twice as big to allow for overflow...
cWHLPTRN = makeSieveBuffer (131072 + 131072)
-- could be faster using primitive copyByteArray#...
-- in preparation for filling with pre-cull pattern...
fillSieveBuffer :: PrimeNdx -> SieveBuffer -> SieveBuffer
fillSieveBuffer lwi sb@(UArray _ _ rng _) = runSTUArray $ do
ptrn <- unsafeThaw cWHLPTRN :: ST s (STUArray s Int Bool)
ptrnu64 <- (castSTUArray :: STUArray s Int Bool ->
ST s (STUArray s Int Word64)) ptrn
cmpsts <- unsafeThaw sb :: ST s (STUArray s Int Bool)
cmpstsu64 <- (castSTUArray :: STUArray s Int Bool ->
ST s (STUArray s Int Word64)) cmpsts
let lmt = rng `shiftR` 6
lwi64 = lwi `shiftR` 6
loop i | i >= lmt = return cmpsts
| otherwise =
let mdlo = fromIntegral $ lwi64 `mod` fromIntegral cWHLPTRNLEN64
sloop j
| j >= cWHLPTRNLEN64 = loop (i + cWHLPTRNLEN64)
| otherwise = do
v <- unsafeRead ptrnu64 (mdlo + j)
unsafeWrite cmpstsu64 (i + j) v; sloop (j + 1) in sloop 0
loop 0
cullSieveBuffer :: PrimeNdx -> [BasePrimeRepArr] -> SieveBuffer -> SieveBuffer
cullSieveBuffer lwi bpras sb@(UArray _ _ rng _) = runSTUArray $ do
cmpsts <- unsafeThaw sb :: ST s (STUArray s Int Bool)
let limi = lwi + fromIntegral rng - 1
loopbpras [] = return cmpsts -- stop warning incomplete pattern match!
loopbpras (bpra@(UArray _ _ bprrng _) : bprastl) =
let loopbpi bpi
| bpi >= bprrng = loopbpras bprastl
| otherwise =
let bp = unsafeAt bpra bpi
bpndx = (fromIntegral bp - cFRSTSVPRM) `shiftR` 1
rsqri = fromIntegral ((bpndx + bpndx) * (bpndx + cFRSTSVPRM)
+ cFRSTSVPRM) - lwi in
if rsqri >= fromIntegral rng then return cmpsts else
let bpint = fromIntegral bp
bppn = fromIntegral bp
cullbits c | c >= rng = loopbpi (bpi + 1)
| otherwise = do unsafeWrite cmpsts c True
cullbits (c + bpint)
s = if rsqri >= 0 then fromIntegral rsqri else
let r = fromIntegral (-rsqri `rem` bppn)
in if r == 0 then 0 else fromIntegral (bppn - r)
in cullbits s in loopbpi 0
loopbpras bpras
-- multithreading goes here...
{-# NOINLINE cNUMPROCS #-}
cNUMPROCS :: Int -- force to the maximum number of threads available
cNUMPROCS = -- 1
-- {-
unsafePerformIO $ do -- no side effects because global!
np <- getNumProcessors; setNumCapabilities np
getNumCapabilities
--}
-- list of culled soeve buffers from index with give bit size...
makePrimePagesFrom :: forall r. PrimeNdx -> Int ->
(PrimeNdx -> SieveBuffer -> r) -> Bool -> [r]
makePrimePagesFrom stwi szbts cnvrtrf thrdd =
-- great, we can make an extra thread pool whenever we might need more, and
-- it should die and be collected whenever this goes out of scope!
let bpras = makeBasePrimeRepArrs thrdd
jbparms() =
let loop lwi szb =
(lwi, szb) : loop (lwi + fromIntegral szb) szb
in loop stwi szbts in
if thrdd then
let
{-# NOINLINE strttsk #-}
strttsk lwi szbts bpras mvr = -- do some strict work but define it non-strictly,
forkIO $ do -- else it will run in forground before threading!
-- and return it using a MVar; force strict execution in thread...
putMVar mvr $! cnvrtrf lwi $ cullSieveBuffer lwi bpras
$ fillSieveBuffer lwi $ makeSieveBuffer szbts
-- start a result pool, initialized to start with the first tasks...
{-# NOINLINE rsltpool #-}
rsltpool :: Array Int (MVar r) = unsafePerformIO $! do
mvlst <- mapM (const newEmptyMVar) [ 1 .. cNUMPROCS ] -- unique copies
mapM_ (\ (mvr, (lwi, szb)) -> strttsk lwi szb bpras mvr)
$ zip mvlst $ jbparms()
return $! listArray (0, cNUMPROCS - 1) mvlst
-- lazily loop over the entire job list...
loop (fdhd : fdtl) =
let {-# NOINLINE getnxt #-}
getnxt ((lwi, szb), i) = unsafePerformIO $! do -- wait for and get result of next page
let mvr = unsafeAt rsltpool i
r <- takeMVar mvr -- recycle mvr for next
strttsk lwi szb bpras mvr; return $! r
in getnxt fdhd : loop fdtl
-- lazily cycle over the rest of the jobs forever...
in rsltpool `seq` loop $ zip (drop cNUMPROCS $ jbparms())
(cycle [ 0 .. cNUMPROCS - 1 ]) else
-- back to non multi-threaded functions...
let loop ((lwi, szb) : jbpmstl) =
(cnvrtrf lwi . cullSieveBuffer lwi bpras . fillSieveBuffer lwi .
makeSieveBuffer) szb : loop jbpmstl
in loop $ jbparms()
makeBasePrimeRepArrs :: Bool -> [BasePrimeRepArr]
makeBasePrimeRepArrs thrdd =
let sb2bpra :: PrimeNdx -> SieveBuffer -> BasePrimeRepArr
sb2bpra lwi sb@(UArray _ _ rng _) =
let len = countSieveBuffer (rng - 1) sb
bpbs = fromIntegral cFRSTSVPRM + fromIntegral (lwi + lwi) in
listArray (0, len - 1) [ bpbs + fromIntegral (i + i) |
(i, False) <- assocs sb ]
fkbpras = [ sb2bpra 0 $ makeSieveBuffer 512 ]
bpra0 = sb2bpra 0 $ cullSieveBuffer 0 fkbpras $ makeSieveBuffer 131072
in bpra0 : makePrimePagesFrom 131072 131072 sb2bpra thrdd
-- result functions are here...
-- prepends the wheel factorized initial primes to the sieved primes output...
-- some faster not useing higher-order-functions, but still slow so who cares?
primes :: Int -> Bool -> [Prime]
primes szbts thrdd = cWHLPRMS ++ concat prmslsts where
-- convert a list of sieve buffers to a UArray of primes...
sb2prmsa :: PrimeNdx -> SieveBuffer -> UArray Int Prime
sb2prmsa lwi sb@(UArray _ _ rng _) = -- bsprm `seq` loop 0 where
let bsprm = cFRSTSVPRM + fromIntegral (lwi + lwi)
len = countSieveBuffer (rng - 1) sb in
bsprm `seq` len `seq`
listArray (0, len - 1)
[ bsprm + fromIntegral (i + i) | (i, False) <- assocs sb ]
prmslsts = map elems $ makePrimePagesFrom 0 szbts sb2prmsa thrdd
-- count the primes from the sieved page list to the limit...
countPrimesTo :: Prime -> Int -> Bool -> Int64
countPrimesTo limit szbts thrdd =
let lmtndx = fromIntegral $ (limit - cFRSTSVPRM) `shiftR` 1 :: PrimeNdx
sb2cnt lwi sb@(UArray _ _ rng _) =
let nlwi = lwi + fromIntegral rng in
if nlwi < lmtndx then (countSieveBuffer (rng - 1) sb, nlwi)
else (countSieveBuffer (fromIntegral (lmtndx - lwi)) sb, nlwi)
loop [] cnt = cnt
loop ((cnt, nxtlwi) : cntstl) ocnt =
if nxtlwi > lmtndx then ocnt + fromIntegral cnt
else loop cntstl $ ocnt + fromIntegral cnt
in if limit < cFRSTSVPRM then
if limit < 2 then 0 else 1
else loop (makePrimePagesFrom 0 szbts sb2cnt thrdd) 1
-- test it...
main :: IO ()
main = do
let limit = 10^9 :: Prime
-- page segmentation sized for most efficiency;
-- fastest with CPU L1 cache size but more address calculation overhead;
-- a little slower with CPU L2 cache size but just about enough to
-- cancell out the gain from reduced page start address calculations...
let cSIEVEPGSZ = (2^18) * 8 :: Int -- CPU L2 cache size in bits
let threaded = True
putStrLn $ "There are " ++ show cNUMPROCS ++ " threads available."
strt <- getPOSIXTime
-- let answr = length $ takeWhile (<= limit) $ primes cSIEVEPGSZ threaded -- slow way
let answr = countPrimesTo limit cSIEVEPGSZ threaded -- fast way
stop <- answr `seq` getPOSIXTime -- force evaluation of answr b4 stop time!
let elpsd = round $ 1e3 * (stop - strt) :: Int64
putStr $ "Found " ++ show answr
putStr $ " primes up to " ++ show limit
putStrLn $ " in " ++ show elpsd ++ " milliseconds."
这已经从我上面提到的 RosettaCode 提交中进行了重构,通过使主筛循环和辅助基础主要进料循环可以具有不同的筛缓冲区大小,以及添加多线程(如上所述在 DF 之上进行了改进)成为可能。它的运行速度与 DF 在 CpC 中提到的最终答案大致相同,相当于他的没有 LLVM 的机器(大约 3/1.5 秒,9 CpC 到 10 亿)并且以 1 秒到 10 亿/125 秒到 1000 亿的速度运行(分别为 3.7/4.1 CpC)在我的带有 LLVM 单线程的 Skylake 机器上,由于如上所述的不够“粗粒度”的问题,大约有一半是多线程的。
这个答案只比 DF 的代码快不到两倍,主要是由于推荐使用 LLVM 后端。