36

我对在纯功能 F# 中实现埃拉托色尼筛感兴趣。我对实际筛子的实现感兴趣,而不是不是真正筛子的天真的功能实现,所以不是这样的:

let rec PseudoSieve list =
    match list with
    | hd::tl -> hd :: (PseudoSieve <| List.filter (fun x -> x % hd <> 0) tl)
    | [] -> []

上面的第二个链接简要描述了一种需要使用多重映射的算法,据我所知,该算法在 F# 中不可用。给出的 Haskell 实现使用了一个支持方法的映射,我在F# 功能映射insertWith中没有看到该方法。

有谁知道将给定的 Haskell 映射代码转换为 F# 的方法,或者可能知道替代实现方法或筛选算法同样有效且更适合功能实现或 F#?

4

16 回答 16

18

阅读那篇文章,我想出了一个不需要多图的想法。它通过一次又一次地将冲突键向前移动其质数来处理冲突的映射键,直到它到达一个不在映射中的键。下面primes是一个映射,其中包含下一个迭代器值的键和素数值。

let primes = 
    let rec nextPrime n p primes =
        if primes |> Map.containsKey n then
            nextPrime (n + p) p primes
        else
            primes.Add(n, p)

    let rec prime n primes =
        seq {
            if primes |> Map.containsKey n then
                let p = primes.Item n
                yield! prime (n + 1) (nextPrime (n + p) p (primes.Remove n))
            else
                yield n
                yield! prime (n + 1) (primes.Add(n * n, n))
        }

    prime 2 Map.empty

这是该论文中基于优先级队列的算法,没有平方优化。我将通用优先级队列函数放在顶部。我使用一个元组来表示惰性列表迭代器。

let primes() = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // skips primes 2, 3, 5, 7
    let wheelData = [|2L;4L;2L;4L;6L;2L;6L;4L;2L;4L;6L;6L;2L;6L;4L;2L;6L;4L;6L;8L;4L;2L;4L;2L;4L;8L;6L;4L;6L;2L;4L;6L;2L;6L;6L;4L;2L;4L;6L;2L;6L;4L;2L;4L;2L;10L;2L;10L|]

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime prime n table =
        insert (prime * prime, n, prime) table

    let rec adjust x (table : Heap) =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator table =
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (adjust x table)
            else
                if x = 13L then
                    yield! [2L; 3L; 5L; 7L; 11L]

                yield x
                yield! sieve (wheel iterator) (insertPrime x n table)
        }

    sieve (13L, 1, 1L) (insertPrime 11L 0 (Heap(0L, 0, 0L)))

这是具有平方优化的基于优先级队列的算法。为了便于将素数添加到查找表中,必须将轮偏移与素数一起返回。这个版本的算法有 O(sqrt(n)) 内存使用,其中没有优化的是 O(n)。

let rec primes2() : seq<int64 * int> = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime enumerator composite table =
        // lazy initialize the enumerator
        let enumerator =
            if enumerator = null then
                let enumerator = primes2().GetEnumerator()
                enumerator.MoveNext() |> ignore
                // skip primes that are a part of the wheel
                while fst enumerator.Current < 11L do
                    enumerator.MoveNext() |> ignore
                enumerator
            else
                enumerator

        let prime = fst enumerator.Current
        // Wait to insert primes until their square is less than the tables current min
        if prime * prime < composite then
            enumerator.MoveNext() |> ignore
            let prime, n = enumerator.Current
            enumerator, insert (prime * prime, n, prime) table
        else
            enumerator, table

    let rec adjust x table =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator (enumerator, table) = 
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (enumerator, adjust x table)
            else
                if x = 13L then
                    yield! [2L, 0; 3L, 0; 5L, 0; 7L, 0; 11L, 0]

                yield x, n
                yield! sieve (wheel iterator) (insertPrime enumerator composite table)
        }

    sieve (13L, 1, 1L) (null, insert (11L * 11L, 0, 11L) (Heap(0L, 0, 0L)))

这是我的测试程序。

type GenericHeap<'T when 'T : comparison>(defaultValue : 'T) =
    let mutable capacity = 1
    let mutable values = Array.create capacity defaultValue
    let mutable size = 0

    let swap i n =
        let temp = values.[i]
        values.[i] <- values.[n]
        values.[n] <- temp

    let rec rollUp i =
        if i > 0 then
            let parent = (i - 1) / 2
            if values.[i] < values.[parent] then
                swap i parent
                rollUp parent

    let rec rollDown i =
        let left, right = 2 * i + 1, 2 * i + 2

        if right < size then
            if values.[left] < values.[i] then
                if values.[left] < values.[right] then
                    swap left i
                    rollDown left
                else
                    swap right i
                    rollDown right
            elif values.[right] < values.[i] then
                swap right i
                rollDown right
        elif left < size then
            if values.[left] < values.[i] then
                swap left i

    member this.insert (value : 'T) =
        if size = capacity then
            capacity <- capacity * 2
            let newValues = Array.zeroCreate capacity
            for i in 0 .. size - 1 do
                newValues.[i] <- values.[i]
            values <- newValues

        values.[size] <- value
        size <- size + 1
        rollUp (size - 1)

    member this.delete () =
        values.[0] <- values.[size]
        size <- size - 1
        rollDown 0

    member this.deleteInsert (value : 'T) =
        values.[0] <- value
        rollDown 0

    member this.min () =
        values.[0]

    static member Insert (value : 'T) (heap : GenericHeap<'T>) =
        heap.insert value
        heap    

    static member DeleteInsert (value : 'T) (heap : GenericHeap<'T>) =
        heap.deleteInsert value
        heap    

    static member Min (heap : GenericHeap<'T>) =
        heap.min()

type Heap = GenericHeap<int64 * int * int64>

let wheelData = [|2L;4L;2L;4L;6L;2L;6L;4L;2L;4L;6L;6L;2L;6L;4L;2L;6L;4L;6L;8L;4L;2L;4L;2L;4L;8L;6L;4L;6L;2L;4L;6L;2L;6L;6L;4L;2L;4L;6L;2L;6L;4L;2L;4L;2L;10L;2L;10L|]

let primes() = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime prime n table =
        insert (prime * prime, n, prime) table

    let rec adjust x (table : Heap) =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator table =
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (adjust x table)
            else
                if x = 13L then
                    yield! [2L; 3L; 5L; 7L; 11L]

                yield x
                yield! sieve (wheel iterator) (insertPrime x n table)
        }

    sieve (13L, 1, 1L) (insertPrime 11L 0 (Heap(0L, 0, 0L)))

let rec primes2() : seq<int64 * int> = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime enumerator composite table =
        // lazy initialize the enumerator
        let enumerator =
            if enumerator = null then
                let enumerator = primes2().GetEnumerator()
                enumerator.MoveNext() |> ignore
                // skip primes that are a part of the wheel
                while fst enumerator.Current < 11L do
                    enumerator.MoveNext() |> ignore
                enumerator
            else
                enumerator

        let prime = fst enumerator.Current
        // Wait to insert primes until their square is less than the tables current min
        if prime * prime < composite then
            enumerator.MoveNext() |> ignore
            let prime, n = enumerator.Current
            enumerator, insert (prime * prime, n, prime) table
        else
            enumerator, table

    let rec adjust x table =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator (enumerator, table) = 
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (enumerator, adjust x table)
            else
                if x = 13L then
                    yield! [2L, 0; 3L, 0; 5L, 0; 7L, 0; 11L, 0]

                yield x, n
                yield! sieve (wheel iterator) (insertPrime enumerator composite table)
        }

    sieve (13L, 1, 1L) (null, insert (11L * 11L, 0, 11L) (Heap(0L, 0, 0L)))


let mutable i = 0

let compare a b =
    i <- i + 1
    if a = b then
        true
    else
        printfn "%A %A %A" a b i
        false

Seq.forall2 compare (Seq.take 50000 (primes())) (Seq.take 50000 (primes2() |> Seq.map fst))
|> printfn "%A"

primes2()
|> Seq.map fst
|> Seq.take 10
|> Seq.toArray
|> printfn "%A"

primes2()
|> Seq.map fst
|> Seq.skip 999999
|> Seq.take 10
|> Seq.toArray
|> printfn "%A"

System.Console.ReadLine() |> ignore
于 2011-01-08T01:12:53.790 回答
10

尽管有一个答案给出了使用优先级队列(PQ)的算法,如在SkewBinomialHeap中,它可能不是适合这项工作的 PQ。Eratosthenes (iEoS) 的增量筛选器需要的是一个 PQ,它具有出色的性能,可以获取最小值并重新插入值,但在添加新值时不需要最终性能,因为 iSoE 仅添加为新值将质数的总和值取到范围的平方根(这是每次减少发生一次的重新插入次数的一小部分)。SkewBinomialHeap PQ 并没有提供比使用平衡二叉搜索树的内置 Map 更多的东西- 所有 O(log n) 操作 - 除了它稍微改变操作的权重以有利于 SoE 的要求。但是,SkewBinaryHeap 每次归约仍需要许多 O(log n) 操作。

PQ 实现为,更具体地作为二进制堆,甚至更具体地作为 MinHeap,几乎满足 iSoE 的要求,具有 O(1) 性能以获取最小值和 O(log n) 性能以用于重新插入和添加新条目,尽管性能实际上是 O(log n) 的一小部分,因为大多数重新插入发生在队列顶部附近,并且大多数新值的添加(无关紧要,因为它们不常见)发生在附近这些操作最有效的队列末尾。此外,MinHeap PQ 可以在一次 O(log n) 次(通常是一小部分)中有效地实现删除最小值和插入函数。然后,而不是 Map(实现为AVL 树) 其中有一个 O(log n) 操作,通常具有完整的“log n”范围,因为我们需要的最小值位于树的最左端最后一个叶子处,我们通常在根处添加和删除最小值,并且平均一次插入几个级别。因此,每次剔除减少时,MinHeap PQ 只能用于 O(log n) 操作的一小部分,而不是多个较大部分的 O(log n) 操作。

MinHeap PQ 可以用纯函数代码实现(没有实现“removeMin”,因为 iSoE 不需要它,但有一个“调整”函数用于分段),如下所示:

[<RequireQualifiedAccess>]
module MinHeap =

  type MinHeapTreeEntry<'T> = class val k:uint32 val v:'T new(k,v) = { k=k;v=v } end
  [<CompilationRepresentation(CompilationRepresentationFlags.UseNullAsTrueValue)>]
  [<NoEquality; NoComparison>]
  type MinHeapTree<'T> = 
      | HeapEmpty 
      | HeapOne of MinHeapTreeEntry<'T>
      | HeapNode of MinHeapTreeEntry<'T> * MinHeapTree<'T> * MinHeapTree<'T> * uint32

  let empty = HeapEmpty

  let getMin pq = match pq with | HeapOne(kv) | HeapNode(kv,_,_,_) -> Some kv | _ -> None

  let insert k v pq =
    let kv = MinHeapTreeEntry(k,v)
    let rec insert' kv msk pq =
      match pq with
        | HeapEmpty -> HeapOne kv
        | HeapOne kv2 -> if k < kv2.k then HeapNode(kv,pq,HeapEmpty,2u)
                          else let nn = HeapOne kv in HeapNode(kv2,nn,HeapEmpty,2u)
        | HeapNode(kv2,l,r,cnt) ->
          let nc = cnt + 1u
          let nmsk = if msk <> 0u then msk <<< 1
                     else let s = int32 (System.Math.Log (float nc) / System.Math.Log(2.0))
                          (nc <<< (32 - s)) ||| 1u //never ever zero again with the or'ed 1
          if k <= kv2.k then if (nmsk &&& 0x80000000u) = 0u then HeapNode(kv,insert' kv2 nmsk l,r,nc)
                                                            else HeapNode(kv,l,insert' kv2 nmsk r,nc)
          else if (nmsk &&& 0x80000000u) = 0u then HeapNode(kv2,insert' kv nmsk l,r,nc)
                else HeapNode(kv2,l,insert' kv nmsk r,nc)
    insert' kv 0u pq

  let private reheapify kv k pq =
    let rec reheapify' pq =
      match pq with
        | HeapEmpty -> HeapEmpty //should never be taken
        | HeapOne kvn -> HeapOne kv
        | HeapNode(kvn,l,r,cnt) ->
            match r with
              | HeapOne kvr when k > kvr.k ->
                  match l with //never HeapEmpty
                    | HeapOne kvl when k > kvl.k -> //both qualify, choose least
                        if kvl.k > kvr.k then HeapNode(kvr,l,HeapOne kv,cnt)
                        else HeapNode(kvl,HeapOne kv,r,cnt)
                    | HeapNode(kvl,_,_,_) when k > kvl.k -> //both qualify, choose least
                        if kvl.k > kvr.k then HeapNode(kvr,l,HeapOne kv,cnt)
                        else HeapNode(kvl,reheapify' l,r,cnt)
                    | _ -> HeapNode(kvr,l,HeapOne kv,cnt) //only right qualifies
              | HeapNode(kvr,_,_,_) when k > kvr.k -> //need adjusting for left leaf or else left leaf
                  match l with //never HeapEmpty or HeapOne
                    | HeapNode(kvl,_,_,_) when k > kvl.k -> //both qualify, choose least
                        if kvl.k > kvr.k then HeapNode(kvr,l,reheapify' r,cnt)
                        else HeapNode(kvl,reheapify' l,r,cnt)
                    | _ -> HeapNode(kvr,l,reheapify' r,cnt) //only right qualifies
              | _ -> match l with //r could be HeapEmpty but l never HeapEmpty
                        | HeapOne(kvl) when k > kvl.k -> HeapNode(kvl,HeapOne kv,r,cnt)
                        | HeapNode(kvl,_,_,_) when k > kvl.k -> HeapNode(kvl,reheapify' l,r,cnt)
                        | _ -> HeapNode(kv,l,r,cnt) //just replace the contents of pq node with sub leaves the same
    reheapify' pq


  let reinsertMinAs k v pq =
    let kv = MinHeapTreeEntry(k,v)
    reheapify kv k pq

  let adjust f (pq:MinHeapTree<_>) = //adjust all the contents using the function, then rebuild by reheapify
    let rec adjust' pq =
      match pq with
        | HeapEmpty -> pq
        | HeapOne kv -> HeapOne(MinHeapTreeEntry(f kv.k kv.v))
        | HeapNode (kv,l,r,cnt) -> let nkv = MinHeapTreeEntry(f kv.k kv.v)
                                   reheapify nkv nkv.k (HeapNode(kv,adjust' l,adjust' r,cnt))
    adjust' pq

使用上述模块,iSoE 可以通过车轮分解优化和使用高效的共感应流 (CIS) 编写如下:

type CIS<'T> = class val v:'T val cont:unit->CIS<'T> new(v,cont) = { v=v;cont=cont } end
type cullstate = struct val p:uint32 val wi:int new(cnd,cndwi) = { p=cnd;wi=cndwi } end
let primesPQWSE() =
  let WHLPRMS = [| 2u;3u;5u;7u |] in let FSTPRM = 11u in let WHLCRC = int (WHLPRMS |> Seq.fold (*) 1u) >>> 1
  let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1
  let WHLPTRN =
    let wp = Array.zeroCreate (WHLLMT+1)
    let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1uy) else acc
                         {0..WHLCRC-1} |> Seq.fold (fun s i->
                           let ns = if a.[i]<>0 then wp.[s]<-2uy*gap (i+1) 1uy;(s+1) else s in ns) 0 |> ignore
    Array.init (WHLCRC+1) (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u)
                                  then 1 else 0) |> gaps;wp
  let inline whladv i = if i < WHLLMT then i + 1 else 0 in let advcnd c i = c + uint32 WHLPTRN.[i]
  let inline culladv c p i = let n = c + uint32 WHLPTRN.[i] * p in if n < c then 0xFFFFFFFFu else n
  let rec mkprm (n,wi,pq,(bps:CIS<_>),q) =
    let nxt = advcnd n wi in let nxti = whladv wi
    if nxt < n then (0u,0,(0xFFFFFFFFu,0,MinHeap.empty,bps,q))
    elif n>=q then let bp,bpi = bps.v in let nc,nci = culladv n bp bpi,whladv bpi
                    let nsd = bps.cont() in let np,_ = nsd.v in let sqr = if np>65535u then 0xFFFFFFFFu else np*np
                    mkprm (nxt,nxti,(MinHeap.insert nc (cullstate(bp,nci)) pq),nsd,sqr)
    else match MinHeap.getMin pq with | None -> (n,wi,(nxt,nxti,pq,bps,q))
                                      | Some kv -> let ca,cs = culladv kv.k kv.v.p kv.v.wi,cullstate(kv.v.p,whladv kv.v.wi)
                                                   if n>kv.k then mkprm (n,wi,(MinHeap.reinsertMinAs ca cs pq),bps,q)
                                                   elif n=kv.k then mkprm (nxt,nxti,(MinHeap.reinsertMinAs ca cs pq),bps,q)
                                                   else (n,wi,(nxt,nxti,pq,bps,q))
  let rec pCID p pi pq bps q = CIS((p,pi),fun()->let (np,npi,(nxt,nxti,npq,nbps,nq))=mkprm (advcnd p pi,whladv pi,pq,bps,q)
                                                 pCID np npi npq nbps nq)
  let rec baseprimes() = CIS((FSTPRM,0),fun()->let np=FSTPRM+uint32 WHLPTRN.[0]
                                               pCID np (whladv 0) MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM))
  let genseq sd = Seq.unfold (fun (p,pi,pcc) ->if p=0u then None else Some(p,mkprm pcc)) sd
  seq { yield! WHLPRMS; yield! mkprm (FSTPRM,0,MinHeap.empty,baseprimes(),(FSTPRM*FSTPRM)) |> genseq }

上面的代码在大约 0.077 秒内计算前 100,000 个素数,在 0.977 秒内计算前 1,000,000 个素数,在大约 14.33 秒内计算前 10,000,000 个素数,在大约 221.87 秒内计算前 100,000,000 个素数,所有这些都在 i7-2700K (3.5GHz) 上作为 64 位代码。这个纯函数式代码比Dustin Cambell 的基于可变字典的代码稍快,增加了轮分解的常见优化,延迟添加基本素数,并使用更有效的 CID 全部添加(tryfsharpideone,但仍然是纯函数式的他不使用 Dictionary 类的代码. 然而,对于大约 20 亿(约 1 亿个素数)的较大素数范围,使用基于哈希表的 Dictionary 的代码将更快,因为 Dictionary 操作没有 O(log n) 因子,并且这种增益克服了计算使用字典哈希表的复杂性。

上述程序具有进一步的特征,即分解轮是参数化的,例如,可以通过将 WHLPRMS 设置为 [| 来使用非常大的轮。2u;3u;5u;7u;11u;13u;17u;19u |] 和 FSTPRM 到 23u 以获得大约三分之二的运行时间,对于一千万个素数大约为 9.34 秒,但请注意,这需要几秒钟在程序开始运行之前计算 WHLPTRN,无论素数范围如何,这都是一个恒定的开销。

比较分析:与纯函数式增量树折叠实现相比,该算法稍微快一些,因为 MinHeap 树的平均使用高度比折叠树的深度小两倍,但被一个等价物抵消遍历 PQ 树级别的能力中的常数因子损失,因为它基于二叉堆,需要处理每个堆级别的右叶和左叶以及任一方式的分支,而不是树的每个级别的单个比较折叠通常用较浅的树枝折叠。与其他基于 PQ 和 Map 的功能算法相比,改进通常是通过减少遍历各个树结构的每个级别的 O(log n) 操作数量的常数因素。

在 400 多年前Michael Eytzinger发明的基于谱系树的模型之后,MinHeap 通常实现为可变数组 二进制堆。我知道这个问题说对非功能性可变代码没有兴趣,但是如果必须避免使用所有使用可变性的子代码,那么出于性能原因,我们不能使用“在幕后”使用可变性的列表或 LazyList。因此,想象一下 MinHeap PQ 的以下备用可变版本由库提供,并在更大的主要性能范围内享受另一个超过 2 的因子:

[<RequireQualifiedAccess>]
module MinHeap =

  type MinHeapTreeEntry<'T> = class val k:uint32 val v:'T new(k,v) = { k=k;v=v } end
  type MinHeapTree<'T> = ResizeArray<MinHeapTreeEntry<'T>>

  let empty<'T> = MinHeapTree<MinHeapTreeEntry<'T>>()

  let getMin (pq:MinHeapTree<_>) = if pq.Count > 0 then Some pq.[0] else None

  let insert k v (pq:MinHeapTree<_>) =
    if pq.Count = 0 then pq.Add(MinHeapTreeEntry(0xFFFFFFFFu,v)) //add an extra entry so there's always a right max node
    let mutable nxtlvl = pq.Count in let mutable lvl = nxtlvl <<< 1 //1 past index of value added times 2
    pq.Add(pq.[nxtlvl - 1]) //copy bottom entry then do bubble up while less than next level up
    while ((lvl <- lvl >>> 1); nxtlvl <- nxtlvl >>> 1; nxtlvl <> 0) do
      let t = pq.[nxtlvl - 1] in if t.k > k then pq.[lvl - 1] <- t else lvl <- lvl <<< 1; nxtlvl <- 0 //causes loop break
    pq.[lvl - 1] <-  MinHeapTreeEntry(k,v); pq

  let reinsertMinAs k v (pq:MinHeapTree<_>) = //do minify down for value to insert
    let mutable nxtlvl = 1 in let mutable lvl = nxtlvl in let cnt = pq.Count
    while (nxtlvl <- nxtlvl <<< 1; nxtlvl < cnt) do
      let lk = pq.[nxtlvl - 1].k in let rk = pq.[nxtlvl].k in let oldlvl = lvl
      let k = if k > lk then lvl <- nxtlvl; lk else k in if k > rk then nxtlvl <- nxtlvl + 1; lvl <- nxtlvl
      if lvl <> oldlvl then pq.[oldlvl - 1] <- pq.[lvl - 1] else nxtlvl <- cnt //causes loop break
    pq.[lvl - 1] <- MinHeapTreeEntry(k,v); pq

  let adjust f (pq:MinHeapTree<_>) = //adjust all the contents using the function, then re-heapify
    if pq <> null then 
      let cnt = pq.Count
      if cnt > 1 then
        for i = 0 to cnt - 2 do //change contents using function
          let e = pq.[i] in let k,v = e.k,e.v in pq.[i] <- MinHeapTreeEntry (f k v)
        for i = cnt/2 downto 1 do //rebuild by reheapify
          let kv = pq.[i - 1] in let k = kv.k
          let mutable nxtlvl = i in let mutable lvl = nxtlvl
          while (nxtlvl <- nxtlvl <<< 1; nxtlvl < cnt) do
            let lk = pq.[nxtlvl - 1].k in let rk = pq.[nxtlvl].k in let oldlvl = lvl
            let k = if k > lk then lvl <- nxtlvl; lk else k in if k > rk then nxtlvl <- nxtlvl + 1; lvl <- nxtlvl
            if lvl <> oldlvl then pq.[oldlvl - 1] <- pq.[lvl - 1] else nxtlvl <- cnt //causes loop break
          pq.[lvl - 1] <- kv
    pq

极客注意:我实际上期望可变版本提供更好的改进性能比,但由于嵌套的 if-then-else 代码结构和主要剔除值的随机行为,它在重新插入时陷入困境,这意味着大部分分支的 CPU 分支预测失败,导致每次剔除减少许多额外的 10 个 CPU 时钟周期以重建指令预取缓存。

该算法唯一的其他恒定因素性能增益是分段和使用多任务处理以获得与 CPU 内核数量成正比的性能增益;然而,就目前而言,这是迄今为止最快的纯函数式 SoE 算法,甚至使用函数式 MinHeap 的纯函数形式也击败了简单的命令式实现,例如Jon Harrop 的代码Johan Kullbom 的阿特金筛子(他的时间安排有误因为他只计算了素数到 1000 万而不是第 1000 万个素数),但如果使用更好的优化,这些算法将快五倍。当我们添加更大轮子分解的多线程时,函数式代码和命令式代码之间大约 5 的比例将有所降低,因为命令式代码的计算复杂度比函数式代码增加得更快,并且多线程比函数式代码更能帮助较慢的函数式代码更快的命令式代码,因为后者越来越接近枚举找到的素数所需的时间的基本限制。

EDIT_ADD: 即使可以选择继续使用 MinHeap 的纯功能版本,增加效率填充剔除复合数组的最简单方法是将其归零,使其具有正确的大小,然后运行一个初始化函数,该函数可以写入每个可变数组索引不超过一次。虽然这不是严格的“功能性”,但它很接近,因为数组被初始化,然后再也不会被修改。

添加分段、多线程、可编程轮因子圆周和许多性能调整的代码如下(除了一些添加的新常量外,用于实现分段和多线程的额外调整代码大约是代码的下半部分从“prmspg”函数开始):

type prmsCIS = class val pg:uint16 val bg:uint16 val pi:int val cont:unit->prmsCIS
                     new(pg,bg,pi,nxtprmf) = { pg=pg;bg=bg;pi=pi;cont=nxtprmf } end
type cullstate = struct val p:uint32 val wi:int new(cnd,cndwi) = { p=cnd;wi=cndwi } end
let primesPQOWSE() =
  let WHLPRMS = [| 2u;3u;5u;7u;11u;13u;17u |] in let FSTPRM = 19u in let WHLCRC = int(WHLPRMS |> Seq.fold (*) 1u)
  let MXSTP = uint64(FSTPRM-1u) in let BFSZ = 1<<<11 in let NUMPRCS = System.Environment.ProcessorCount
  let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1 in let WHLPTRN = Array.zeroCreate (WHLLMT+1)
  let WHLRNDUP = let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1)
                                                          else acc in let b = a |> Array.scan (+) 0
                                      Array.init (WHLCRC>>>1) (fun i->
                                        if a.[i]=0 then 0 else let g=2*gap (i+1) 1 in WHLPTRN.[b.[i]]<-byte g;1)
                 Array.init WHLCRC (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u) then 1 else 0)
                 |> gaps |> Array.scan (+) 0
  let WHLPOS = WHLPTRN |> Array.map (uint32) |> Array.scan (+) 0u in let advcnd cnd cndi = cnd + uint32 WHLPTRN.[cndi]
  let MINRNGSTP = if WHLLMT<=31 then uint32(32/(WHLLMT+1)*WHLCRC) else if WHLLMT=47 then uint32 WHLCRC<<<1 else uint32 WHLCRC
  let MINBFRNG = uint32((BFSZ<<<3)/(WHLLMT+1)*WHLCRC)/MINRNGSTP*MINRNGSTP
  let MINBFRNG = if MINBFRNG=0u then MINRNGSTP else MINBFRNG
  let inline whladv i = if i < WHLLMT then i+1 else 0 in let inline culladv c p i = c+uint32 WHLPTRN.[i]*p
  let rec mkprm (n,wi,pq,(bps:prmsCIS),q,lstp,bgap) =
    let nxt,nxti = advcnd n wi,whladv wi
    if n>=q then let p = (uint32 bps.bg<<<16)+uint32 bps.pg
                 let nbps,nxtcmpst,npi = bps.cont(),culladv n p bps.pi,whladv bps.pi
                 let pg = uint32 nbps.pg in let np = p+pg in let sqr = q+pg*((p<<<1)+pg) //only works to p < about 13 million
                 let nbps = prmsCIS(uint16 np,uint16(np>>>16),nbps.pi,nbps.cont) //therefore, algorithm only works to p^2 or about
                 mkprm (nxt,nxti,(MinHeap.insert nxtcmpst (cullstate(p,npi)) pq),nbps,sqr,lstp,(bgap+1us)) //1.7 * 10^14
    else match MinHeap.getMin pq with 
           | None -> (uint16(n-uint32 lstp),bgap,wi,(nxt,nxti,pq,bps,q,n,1us)) //fix with q is uint64
           | Some kv -> let ca,cs = culladv kv.k kv.v.p kv.v.wi,cullstate(kv.v.p,whladv kv.v.wi)
                        if n>kv.k then mkprm (n,wi,(MinHeap.reinsertMinAs ca cs pq),bps,q,lstp,bgap)
                        elif n=kv.k then mkprm (nxt,nxti,(MinHeap.reinsertMinAs ca cs pq),bps,q,lstp,(bgap+1us))
                        else (uint16(n-uint32 lstp),bgap,wi,(nxt,nxti,pq,bps,q,n,1us))
  let rec pCIS p pg bg pi pq bps q = prmsCIS(pg,bg,pi,fun()->
    let (npg,nbg,npi,(nxt,nxti,npq,nbps,nq,nl,ng))=mkprm (p+uint32 WHLPTRN.[pi],whladv pi,pq,bps,q,p,0us)
    pCIS (p+uint32 npg) npg nbg npi npq nbps nq)
  let rec baseprimes() = prmsCIS(uint16 FSTPRM,0us,0,fun()->
                           let np,npi=advcnd FSTPRM 0,whladv 0
                           pCIS np (uint16 WHLPTRN.[0]) 1us npi MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM))
  let prmspg nxt adj pq bp q =
    //compute next buffer size rounded up to next even wheel circle so at least one each base prime hits the page
    let rng = max (((uint32(MXSTP+uint64(sqrt (float (MXSTP*(MXSTP+4UL*nxt))))+1UL)>>>1)+MINRNGSTP)/MINRNGSTP*MINRNGSTP) MINBFRNG
    let nxtp() = async {
      let rec addprms pqx (bpx:prmsCIS) qx = 
        if qx>=adj then pqx,bpx,qx //add primes to queue for new lower limit
        else let p = (uint32 bpx.bg<<<16)+uint32 bpx.pg in let nbps = bpx.cont()
             let pg = uint32 nbps.pg in let np = p+pg in let sqr = qx+pg*((p<<<1)+pg)
             let nbps = prmsCIS(uint16 np,uint16(np>>>16),nbps.pi,nbps.cont)
             addprms (MinHeap.insert qx (cullstate(p,bpx.pi)) pqx) nbps sqr
      let adjcinpg low k (v:cullstate) = //adjust the cull states for the new page low value
        let p = v.p in let WHLSPN = int64 WHLCRC*int64 p in let db = int64 p*int64 WHLPOS.[v.wi]
        let db = if k<low then let nk = int64(low-k)+db in nk-((nk/WHLSPN)*WHLSPN)
                 else let nk = int64(k-low) in if db<nk then db+WHLSPN-nk else db-nk
        let r = WHLRNDUP.[int((((db>>>1)%(WHLSPN>>>1))+int64 p-1L)/int64 p)] in let x = int64 WHLPOS.[r]*int64 p
        let r = if r>WHLLMT then 0 else r in let x = if x<db then x+WHLSPN-db else x-db in uint32 x,cullstate(p,r)
      let bfbtsz = int rng/WHLCRC*(WHLLMT+1) in let nbuf = Array.zeroCreate (bfbtsz>>>5)
      let rec nxtp' wi cnt = let _,nbg,_,ncnt = mkprm cnt in let nwi = wi + int nbg
                             if nwi < bfbtsz then nbuf.[nwi>>>5] <- nbuf.[nwi>>>5] ||| (1u<<<(nwi&&&0x1F)); nxtp' nwi ncnt
                             else let _,_,pq,bp,q,_,_ = ncnt in nbuf,pq,bp,q //results incl buf and cont parms for next page
      let npq,nbp,nq = addprms pq bp q
      return nxtp' 0 (0u,0,MinHeap.adjust (adjcinpg adj) npq,nbp,nq-adj,0u,0us) }
    rng,nxtp() |> Async.StartAsTask
  let nxtpg nxt (cont:(_*System.Threading.Tasks.Task<_>)[]) = //(len,pq,bp,q) =
    let adj = (cont |> Seq.fold (fun s (r,_)  -> s+r) 0u)
    let _,tsk = cont.[0] in let _,pq,bp,q = tsk.Result
    let ncont = Array.init (NUMPRCS+1) (fun i -> if i<NUMPRCS then cont.[i+1]
                                                 else prmspg (nxt+uint64 adj) adj pq bp q)
    let _,tsk = ncont.[0] in let nbuf,_,_,_ = tsk.Result in nbuf,ncont
  //init cond buf[0], no queue, frst bp sqr offset
  let initcond = 0u,System.Threading.Tasks.Task.Factory.StartNew (fun()->
                   (Array.empty,MinHeap.empty,baseprimes(),FSTPRM*FSTPRM-FSTPRM))
  let nxtcond n = prmspg (uint64 n) (n-FSTPRM) MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM-FSTPRM)
  let initcont = Seq.unfold (fun (n,((r,_)as v))->Some(v,(n+r,nxtcond (n+r)))) (FSTPRM,initcond)
                 |> Seq.take (NUMPRCS+1) |> Seq.toArray
  let rec nxtprm (c,ci,i,buf:uint32[],cont) =
    let rec nxtprm' c ci i =
      let nc = c + uint64 WHLPTRN.[ci] in let nci = whladv ci in let ni = i + 1 in let nw = ni>>>5
      if nw >= buf.Length then let (npg,ncont)=nxtpg nc cont in nxtprm (c,ci,-1,npg,ncont)
      elif (buf.[nw] &&& (1u <<< (ni &&& 0x1F))) = 0u then nxtprm' nc nci ni
      else nc,nci,ni,buf,cont
    nxtprm' c ci i
  seq { yield! WHLPRMS |> Seq.map (uint64);
        yield! Seq.unfold (fun ((c,_,_,_,_) as cont)->Some(c,nxtprm cont))
                 (nxtprm (uint64 FSTPRM-uint64 WHLPTRN.[WHLLMT],WHLLMT,-1,Array.empty,initcont)) }

请注意,MinHeap 模块,无论是基于函数的还是基于数组的,都添加了一个“调整”函数,以允许在每个新段页面的开头调整每个线程版本的 PQ 的剔除状态。另请注意,可以调整代码,以便大多数计算使用 32 位范围完成,最终序列输出为 uint64,计算时间成本很低,因此目前的理论范围超过 100 万亿(十提高到十四次方)如果愿意等待大约三到四个月来计算该范围。数字范围检查已被删除,因为任何人都不太可能使用此算法来计算该范围,更不用说超过它了。

使用纯函数 MinHeap 和 2,3,5,7 轮分解,上述程序分别在 0.062、0.629、10.53 和 195.62 秒内计算前十万、一百万、一千万和一亿个素数。使用基于数组的 MinHeap 可以分别加快 0.097、0.276、3.48 和 51.60 秒。通过将 WHLPRMS 更改为“[| 2u;3u;5u;7u;11u;13u;17u |]”并将 FSTPRM 更改为 19u 来使用 2、3、5、7、11、13、17 轮,速度会更快一些分别为 0.181、0.308、2.49 和 36.58 秒(对于具有恒定开销的恒定因子改进)。这个最快的调整在大约 88.37 秒内计算出 32 位数字范围内的 203,280,221 个素数。“BFSZ”常数可以通过在较小范围的较慢时间和较大范围的较快时间之间进行权衡来调整,建议在较大范围内尝试使用“1<<<14”的值。此常量仅设置最小缓冲区大小,程序会自动将缓冲区大小调整为高于该大小以适应更大的范围,以便缓冲区足够,以便页面范围所需的最大基本素数将始终“打击”每个页面至少一次; 这意味着不需要额外的“桶筛”的复杂性和开销。使用“primesPQOWSE() |> Seq.takeWhile ((>=)100000000000UL) 测试,最后一个完全优化的版本可以在大约 256.8 和 3617.4 秒(对于 1000 亿只需要一个多小时)内计算高达 10 和 1000 亿的素数|> Seq.fold (fun sp -> s + 1UL) 0UL" 用于输出。

我不认为使用增量 SoE 算法来制作功能性或几乎功能性的代码运行得比这快得多。从代码中可以看出,优化基本增量算法大大增加了代码复杂性,因此它可能比基于直接数组剔除的等效优化代码稍微复杂一点,该代码的运行速度大约比这并且没有额外的性能指数意味着这个功能性增量代码具有不断增加的额外百分比开销。

那么除了从有趣的理论和知识观点来看,这是否有用?可能不是。对于最多约一千万的较小范围的素数,最好的基本未完全优化的增量功能 SoE 可能已经足够,并且与最简单的命令式 SoE 相比,编写起来非常简单,或者 RAM 内存使用量更少。但是,它们比使用数组的命令式代码要慢得多,因此对于高于该范围的范围,它们“失去动力”。虽然这里已经证明可以通过优化来加速代码,但它仍然比更命令式的纯基于数组的版本慢 10 倍,但增加了复杂性,至少与具有等效优化的代码一样复杂, 甚至在 DotNet 上的 F# 下的代码也比使用直接编译为本机代码的 C++ 等语言慢四倍;如果真的想研究大范围的素数,可能会使用其他语言和技术中的一种primesieve可以在四个小时内计算出百万亿范围内的素数,而不是这段代码所需的大约三个月。 END_EDIT_ADD

于 2013-09-26T07:53:30.577 回答
6

这是一个关于使用序列的基于 Eratosthenes 的算法增量(和递归)映射的最大优化,因为不需要记忆先前的序列值(除了使用 Seq.缓存),主要的优化是它对输入序列使用轮分解,并且它使用多个(递归)流来维护小于被筛选的最新数字的平方根的基本素数,如下所示:

  let primesMPWSE =
    let whlptrn = [| 2;4;2;4;6;2;6;4;2;4;6;6;2;6;4;2;6;4;6;8;4;2;4;2;
                     4;8;6;4;6;2;4;6;2;6;6;4;2;4;6;2;6;4;2;4;2;10;2;10 |]
    let adv i = if i < 47 then i + 1 else 0
    let reinsert oldcmpst mp (prime,pi) =
      let cmpst = oldcmpst + whlptrn.[pi] * prime
      match Map.tryFind cmpst mp with
        | None -> mp |> Map.add cmpst [(prime,adv pi)]
        | Some(facts) -> mp |> Map.add cmpst ((prime,adv pi)::facts)
    let rec mkprimes (n,i) m ps q =
      let nxt = n + whlptrn.[i]
      match Map.tryFind n m with
        | None -> if n < q then seq { yield (n,i); yield! mkprimes (nxt,adv i) m ps q }
                  else let (np,npi),nlst = Seq.head ps,ps |> Seq.skip 1
                       let (nhd,ni),nxtcmpst = Seq.head nlst,n + whlptrn.[npi] * np
                       mkprimes (nxt,adv i) (Map.add nxtcmpst [(np,adv npi)] m) nlst (nhd * nhd)
        | Some(skips) -> let adjmap = skips |> List.fold (reinsert n) (m |> Map.remove n)
                         mkprimes (nxt,adv i) adjmap ps q
    let rec prs = seq {yield (11,0); yield! mkprimes (13,1) Map.empty prs 121 } |> Seq.cache
    seq { yield 2; yield 3; yield 5; yield 7; yield! mkprimes (11,0) Map.empty prs 121 |> Seq.map (fun (p,i) -> p) }

它在大约 0.445 秒内找到第 100,000 个素数,直到 1,299,721,但它不是一个适当的命令式 EoS 算法,它不会随着素数数量的增加而接近线性地扩展,需要 7.775 秒才能找到高达 15,485,867 的第 1,000,000 个素数以获得性能在这个大约 O(n^1.2) 的范围内,其中 n 是找到的最大素数。

可以做更多的调整,但它可能不会对更好的性能产生很大的影响,如下所示:

  1. 由于 F# 序列库非常慢,因此可以使用实现 IEnumerable 的自定义类型来减少在内部序列中花费的时间,但由于序列操作只需要大约 20% 的总时间,即使这些时间减少到零时间结果只会减少到 80% 的时间。

  2. 可以尝试其他形式的地图存储,例如 O'Neil 提到的优先级队列或 @gradbot 使用的 SkewBinomialHeap,但至少对于 SkewBinomialHeap,性能的提升只有几个百分点。似乎在选择不同的地图实现时,一个人只是在寻找和删除靠近列表开头的项目时做出更好的反应,而不是花费时间插入新条目以享受这些好处,因此净收益几乎是一种洗涤随着地图中条目的增加,仍然具有 O(log n) 性能。上述使用多个条目流的优化只是平方根减少了映射中的条目数量,因此这些改进并不重要。

EDIT_ADD: 我确实做了一些额外的优化,性能确实比预期的有所提高,这可能是由于改进了消除 Seq.skip 作为通过基本素数序列前进的方式。此优化使用替换内部序列生成作为整数值的元组和用于前进到序列中的下一个值的延续函数,最终 F# 序列由整体展开操作生成。代码如下:

type SeqDesc<'a> = SeqDesc of 'a * (unit -> SeqDesc<'a>) //a self referring tuple type
let primesMPWSE =
  let whlptrn = [| 2;4;2;4;6;2;6;4;2;4;6;6;2;6;4;2;6;4;6;8;4;2;4;2;
                   4;8;6;4;6;2;4;6;2;6;6;4;2;4;6;2;6;4;2;4;2;10;2;10 |]
  let inline adv i = if i < 47 then i + 1 else 0
  let reinsert oldcmpst mp (prime,pi) =
    let cmpst = oldcmpst + whlptrn.[pi] * prime
    match Map.tryFind cmpst mp with
      | None -> mp |> Map.add cmpst [(prime,adv pi)]
      | Some(facts) -> mp |> Map.add cmpst ((prime,adv pi)::facts)
  let rec mkprimes (n,i) m (SeqDesc((np,npi),nsdf) as psd) q =
    let nxt = n + whlptrn.[i]
    match Map.tryFind n m with
      | None -> if n < q then SeqDesc((n,i),fun() -> mkprimes (nxt,adv i) m psd q)
                else let (SeqDesc((nhd,x),ntl) as nsd),nxtcmpst = nsdf(),n + whlptrn.[npi] * np
                     mkprimes (nxt,adv i) (Map.add nxtcmpst [(np,adv npi)] m) nsd (nhd * nhd)
      | Some(skips) -> let adjdmap = skips |> List.fold (reinsert n) (m |> Map.remove n)
                       mkprimes (nxt,adv i) adjdmap psd q
  let rec prs = SeqDesc((11,0),fun() -> mkprimes (13,1) Map.empty prs 121 )
  let genseq sd = Seq.unfold (fun (SeqDesc((n,i),tailfunc)) -> Some(n,tailfunc())) sd
  seq { yield 2; yield 3; yield 5; yield 7; yield! mkprimes (11,0) Map.empty prs 121 |> genseq }

找到第 100,000 个和第 1,000,000 个素数所需的时间分别约为 0.31 和 5.1 秒,因此这个微小的变化有相当大的百分比增益。我确实尝试了我自己的 IEnumerable/IEnumerator 接口实现,它们是序列的基础,虽然它们比 Seq 模块使用的版本更快,但它们对这个算法几乎没有任何进一步的区别,因为大部分时间都花在了地图功能。 END_EDIT_ADD

除了基于地图的增量 EoS 实现之外,还有另一个使用树折叠的“纯功能”实现,据说速度稍快,但由于它在树折叠中仍然有一个 O(log n) 项,我怀疑它主要是更快(如果是的话),因为与使用地图相比,算法在计算机操作数量方面的实现方式。如果人们有兴趣,我也会开发那个版本。

最后,人们必须接受增量 EoS 的纯函数式实现永远不会接近较大数值范围的良好命令式实现的原始处理速度。但是,可以提出一种方法,其中所有代码都是纯函数式的,除了使用(可变)数组在一个范围内对复合数进行分段筛选,这将接近 O(n) 性能,在实际使用中将是 50对于大范围(例如前 200,000,000 个素数)来说,它比函数算法快一百倍。@Jon Harrop 在他的博客中已经完成了这项工作,但只需很少的额外代码就可以进一步调整。

于 2013-07-01T10:02:30.493 回答
5

这是我将 Haskell 代码合理忠实地翻译为 F# 的尝试:

#r "FSharp.PowerPack"

module Map =
  let insertWith f k v m =
    let v = if Map.containsKey k m then f m.[k] v else v
    Map.add k v m

let sieve =
  let rec sieve' map = function
  | LazyList.Nil -> Seq.empty
  | LazyList.Cons(x,xs) -> 
      if Map.containsKey x map then
        let facts = map.[x]
        let map = Map.remove x map
        let reinsert m p = Map.insertWith (@) (x+p) [p] m
        sieve' (List.fold reinsert map facts) xs
      else
        seq {
          yield x
          yield! sieve' (Map.add (x*x) [x] map) xs
        }
  fun s -> sieve' Map.empty (LazyList.ofSeq s)

let rec upFrom i =
  seq {
    yield i
    yield! upFrom (i+1)
  }

let primes = sieve (upFrom 2)
于 2011-01-07T21:27:19.633 回答
5

使用邮箱处理器实现的 Prime sieve:

let (<--) (mb : MailboxProcessor<'a>) (message : 'a) = mb.Post(message)
let (<-->) (mb : MailboxProcessor<'a>) (f : AsyncReplyChannel<'b> -> 'a) = mb.PostAndAsyncReply f

type 'a seqMsg =  
    | Next of AsyncReplyChannel<'a>   

type PrimeSieve() =   
    let counter(init) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop n =   
                async { let! msg = inbox.Receive()   
                        match msg with
                        | Next(reply) ->   
                            reply.Reply(n)   
                            return! loop(n + 1) }   
            loop init)   

    let filter(c : MailboxProcessor<'a seqMsg>, pred) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop() =   
                async {   
                    let! msg = inbox.Receive()   
                    match msg with
                    | Next(reply) ->
                        let rec filter prime =
                            if pred prime then async { return prime }
                            else async {
                                let! next = c <--> Next
                                return! filter next }
                        let! next = c <--> Next
                        let! prime = filter next
                        reply.Reply(prime)
                        return! loop()   
                }   
            loop()   
        )   

    let processor = MailboxProcessor.Start(fun inbox ->   
        let rec loop (oldFilter : MailboxProcessor<int seqMsg>) prime =   
            async {   
                let! msg = inbox.Receive()   
                match msg with
                | Next(reply) ->   
                    reply.Reply(prime)   
                    let newFilter = filter(oldFilter, (fun x -> x % prime <> 0))   
                    let! newPrime = oldFilter <--> Next
                    return! loop newFilter newPrime   
            }   
        loop (counter(3)) 2)   

    member this.Next() = processor.PostAndReply( (fun reply -> Next(reply)), timeout = 2000)

    static member upto max =
        let p = PrimeSieve()
        Seq.initInfinite (fun _ -> p.Next())
        |> Seq.takeWhile (fun prime -> prime <= max)
        |> Seq.toList
于 2011-01-09T05:29:45.173 回答
3

这是我的两分钱,尽管我不确定它是否符合 OP 真正成为 Eratosthenes 筛子的标准。它不利用模块化划分,并实现了 OP 引用的论文中的优化。它仅适用于有限列表,但在我看来,这似乎符合最初描述筛子的精神。顺便说一句,该论文讨论了标记数量和划分数量方面的复杂性。似乎,由于我们必须遍历一个链表,这可能忽略了各种算法在性能方面的一些关键方面。一般来说,尽管使用计算机进行模块化划分是一项昂贵的操作。

open System

let rec sieve list =
    let rec helper list2 prime next =
        match list2 with
            | number::tail -> 
                if number< next then
                    number::helper tail prime next
                else
                    if number = next then 
                        helper tail prime (next+prime)
                    else
                        helper (number::tail) prime (next+prime)

            | []->[]
    match list with
        | head::tail->
            head::sieve (helper tail head (head*head))
        | []->[]

let step1=sieve [2..100]

编辑:修复了我原始帖子中的代码错误。我尝试按照筛子的原始逻辑进行一些修改。即从第一个项目开始,并从集合中划掉该项目的倍数。这个算法从字面上寻找下一个是素数的倍数的项目,而不是对集合中的每个数字进行模除。论文的一个优化是它开始寻找大于 p^2 的素数的倍数。

具有多级的辅助函数部分处理可能已经从列表中删除下一个素数的倍数的可能性。因此,例如使用素数 5,它会尝试删除数字 30,但它永远不会找到它,因为它已经被素数 3 删除了。希望能澄清算法的逻辑。

于 2011-01-07T23:32:20.877 回答
3

就其价值而言,这不是 Erathothenes 的筛子,但它的速度非常快:

let is_prime n =
    let maxFactor = int64(sqrt(float n))
    let rec loop testPrime tog =
        if testPrime > maxFactor then true
        elif n % testPrime = 0L then false
        else loop (testPrime + tog) (6L - tog)
    if n = 2L || n = 3L || n = 5L then true
    elif n <= 1L || n % 2L = 0L || n % 3L = 0L || n % 5L = 0L then false
    else loop 7L 4L
let primes =
    seq {
        yield 2L;
        yield 3L;
        yield 5L;
        yield! (7L, 4L) |> Seq.unfold (fun (p, tog) -> Some(p, (p + tog, 6L - tog)))
    }
    |> Seq.filter is_prime

它在我的机器(AMD Phenom II,3.2GHZ 四核)上在 1.25 秒内找到第 100,000 个素数。

于 2011-01-09T05:45:37.200 回答
3

我知道您明确表示您对纯功能筛子的实现感兴趣,所以直到现在我才展示我的筛子。但是在重新阅读您引用的论文后,我发现那里提出的增量筛算法与我自己的基本相同,唯一的区别是使用纯函数技术与绝对命令技术的实现细节。所以我认为我至少有一半资格满足你的好奇心。此外,我认为,当可以实现显着的性能提升但被函数式接口隐藏时,使用命令式技术是 F# 编程中鼓励的最强大的技术之一,这与纯粹的 Haskell 文化相反。我首先在我的Project Euler for F#un 博客上发布了这个实现但在此处重新发布,将先决条件代码替换回去并删除结构类型。primes可以在我的计算机上在 0.248 秒内计算前 100,000 个素数,在 4.8 秒内计算前 1,000,000 个素数(请注意,primes它会缓存其结果,因此每次执行基准测试时都需要重新评估它)。

let inline infiniteRange start skip = 
    seq {
        let n = ref start
        while true do
            yield n.contents
            n.contents <- n.contents + skip
    }

///p is "prime", s=p*p, c is "multiplier", m=c*p
type SievePrime<'a> = {mutable c:'a ; p:'a ; mutable m:'a ; s:'a}

///A cached, infinite sequence of primes
let primes =
    let primeList = ResizeArray<_>()
    primeList.Add({c=3 ; p=3 ; m=9 ; s=9})

    //test whether n is composite, if not add it to the primeList and return false
    let isComposite n = 
        let rec loop i = 
            let sp = primeList.[i]
            while sp.m < n do
                sp.c <- sp.c+1
                sp.m <- sp.c*sp.p

            if sp.m = n then true
            elif i = (primeList.Count-1) || sp.s > n then
                primeList.Add({c=n ; p=n ; m=n*n ; s=n*n})
                false
            else loop (i+1)
        loop 0

    seq { 
        yield 2 ; yield 3

        //yield the cached results
        for i in 1..primeList.Count-1 do
            yield primeList.[i].p

        yield! infiniteRange (primeList.[primeList.Count-1].p + 2) 2 
               |> Seq.filter (isComposite>>not)
    }
于 2011-01-09T15:46:17.043 回答
3

这是另一种仅使用纯函数式 F# 代码来完成埃拉托色尼 (SoE) 增量筛分的方法。它改编自 Haskell 代码,开发为“这个想法是由于 Dave Bayer,虽然他使用了复杂的公式和平衡的三叉树结构,以统一的方式逐步加深(简化的公式和一个倾斜的、加深到正确的二叉树结构)由 Heinrich Apfelmus 编写,由 Will Ness 进一步简化)。由于 M. O'Neill 的分阶段生产理念”,根据以下链接:在 Haskell 中使用阶乘轮优化树折叠代码

以下代码有几处优化,使其更适合在 F# 中执行,如下所示:

  1. 该代码使用协导流而不是 LazyList 的,因为该算法不需要(或很少)需要 LazyList 的记忆,而且我的协导流比 LazyLists(来自 FSharp.PowerPack)或内置序列更有效。另一个优点是我的代码可以在tryFSharp.orgideone.com上运行,而无需复制和粘贴 LazyList 类型和模块的 Microsoft.FSharp.PowerPack Core 源代码(连同版权声明)

  2. 发现 F# 在函数参数上的模式匹配有相当多的开销,因此之前使用元组的可读性更强的可区分联合类型被牺牲了,取而代之的是按值结构(或在某些平台上运行得更快的类)类型大约两倍或更多的加速。

  3. Will Ness 的优化从线性树折叠到双边折叠再到多向折叠以及使用轮子分解的改进对于 F# 和 Haskell 一样有效,两种语言之间的主要区别在于 Haskell 可以编译为本机代码并具有更高度优化的编译器,而 F# 在 DotNet 框架系统下运行的开销更大。

    type prmstate = struct val p:uint32 val pi:byte new (prm,pndx) = { p = prm; pi = pndx } end
    type prmsSeqDesc = struct val v:prmstate val cont:unit->prmsSeqDesc new(ps,np) = { v = ps; cont = np } end
    type cmpststate = struct val cv:uint32 val ci:byte val cp:uint32 new (strt,ndx,prm) = {cv = strt;ci = ndx;cp = prm} end
    type cmpstsSeqDesc = struct val v:cmpststate val cont:unit->cmpstsSeqDesc new (cs,nc) = { v = cs; cont = nc } end
    type allcmpsts = struct val v:cmpstsSeqDesc val cont:unit->allcmpsts new (csd,ncsdf) = { v=csd;cont=ncsdf } end
    
    let primesTFWSE =
      let whlptrn = [| 2uy;4uy;2uy;4uy;6uy;2uy;6uy;4uy;2uy;4uy;6uy;6uy;2uy;6uy;4uy;2uy;6uy;4uy;6uy;8uy;4uy;2uy;4uy;2uy;
                       4uy;8uy;6uy;4uy;6uy;2uy;4uy;6uy;2uy;6uy;6uy;4uy;2uy;4uy;6uy;2uy;6uy;4uy;2uy;4uy;2uy;10uy;2uy;10uy |]
      let inline whladv i = if i < 47uy then i + 1uy else 0uy
      let inline advmltpl c ci p = cmpststate (c + uint32 whlptrn.[int ci] * p,whladv ci,p)
      let rec pmltpls cs = cmpstsSeqDesc(cs,fun() -> pmltpls (advmltpl cs.cv cs.ci cs.cp))
      let rec allmltpls (psd:prmsSeqDesc) =
        allcmpsts(pmltpls (cmpststate(psd.v.p*psd.v.p,psd.v.pi,psd.v.p)),fun() -> allmltpls (psd.cont()))
      let rec (^) (xs:cmpstsSeqDesc) (ys:cmpstsSeqDesc) = //union op for SeqDesc's
        match compare xs.v.cv ys.v.cv with
          | -1 -> cmpstsSeqDesc (xs.v,fun() -> xs.cont() ^ ys)
          | 0 -> cmpstsSeqDesc (xs.v,fun() -> xs.cont() ^ ys.cont())
          | _ -> cmpstsSeqDesc(ys.v,fun() -> xs ^ ys.cont()) //must be greater than
      let rec pairs (csdsd:allcmpsts) =
        let ys = csdsd.cont in
        allcmpsts(cmpstsSeqDesc(csdsd.v.v,fun()->csdsd.v.cont()^ys().v),fun()->pairs (ys().cont()))
      let rec joinT3 (csdsd:allcmpsts) = cmpstsSeqDesc(csdsd.v.v,fun()->
        let ys = csdsd.cont() in let zs = ys.cont() in (csdsd.v.cont()^(ys.v^zs.v))^joinT3 (pairs (zs.cont())))
      let rec mkprimes (ps:prmstate) (csd:cmpstsSeqDesc) =
        let nxt = ps.p + uint32 whlptrn.[int ps.pi]
        if ps.p >= csd.v.cv then mkprimes (prmstate(nxt,whladv ps.pi)) (csd.cont()) //minus function
        else prmsSeqDesc(prmstate(ps.p,ps.pi),fun() -> mkprimes (prmstate(nxt,whladv ps.pi)) csd)
      let rec baseprimes = prmsSeqDesc(prmstate(11u,0uy),fun() -> mkprimes (prmstate(13u,1uy)) initcmpsts)
      and initcmpsts = joinT3 (allmltpls baseprimes)
      let genseq sd = Seq.unfold (fun (psd:prmsSeqDesc) -> Some(psd.v.p,psd.cont())) sd
      seq { yield 2u; yield 3u; yield 5u; yield 7u; yield! mkprimes (prmstate(11u,0uy)) initcmpsts |> genseq }
    
    primesLMWSE |> Seq.nth 100000
    

EDIT_ADD: 这已得到纠正,因为原始代码没有正确处理流的尾部并将参数流的尾部传递给pairs函数到joinT3函数,而不是zs流之后的尾部。下面的时间也相应地得到了​​修正,速度提高了大约 30%。tryFSharp 和 ideone 链接代码也已更正。 END_EDIT_ADD

上面的程序以大约 O(n^1.1) 的性能工作,n 是计算的最大素数,或者当 n 是计算的素数时大约 O(n^1.18),计算前一百万个素数大约需要 2.16 秒(大约 0.14第二个用于前 100,000 个素数)在使用结构类型而不是类运行 64 位代码的快速计算机上(似乎某些实现在形成闭包时对按值结构进行装箱和拆箱)。我认为这大约是任何这些纯函数素数算法的最大实用范围。这可能是运行纯函数式 SoE 算法的最快速度,而不是进行一些小的调整以减少常数因子。

除了将分段和多线程结合起来在多个 CPU 内核之间共享计算之外,可以对该算法进行的大多数“调整”是增加车轮分解的周长,以获得高达约 40% 的性能增益由于在函数之间传递状态时对结构、类、元组或更直接的单个参数的使用进行了调整,因此获得了少量收益。

EDIT_ADD2: 我已经完成了上述优化,结果由于结构优化,代码现在几乎快了两倍,另外还可以选择使用更大的车轮分解圆周来增加更小的减少。请注意,下面的代码避免在主序列生成循环中使用延续,并且仅在必要时将它们用于基本素数流和从这些基本素数派生的后续复合数剔除流。新代码如下:

type CIS<'T> = struct val v:'T val cont:unit->CIS<'T> new(v,cont) = { v=v;cont=cont } end //Co-Inductive Steam
let primesTFOWSE =
  let WHLPRMS = [| 2u;3u;5u;7u |] in let FSTPRM = 11u in let WHLCRC = int (WHLPRMS |> Seq.fold (*) 1u) >>> 1
  let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1
  let WHLPTRN =
    let wp = Array.zeroCreate (WHLLMT+1)
    let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1uy) else acc
                         {0..WHLCRC-1} |> Seq.fold (fun s i->
                           let ns = if a.[i]<>0 then wp.[s]<-2uy*gap (i+1) 1uy;(s+1) else s in ns) 0 |> ignore
    Array.init (WHLCRC+1) (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u)
                                  then 1 else 0) |> gaps;wp
  let inline whladv i = if i < WHLLMT then i+1 else 0 in let inline advcnd c ci = c + uint32 WHLPTRN.[ci]
  let inline advmltpl p (c,ci) = (c + uint32 WHLPTRN.[ci] * p,whladv ci)
  let rec pmltpls p cs = CIS(cs,fun() -> pmltpls p (advmltpl p cs))
  let rec allmltpls k wi (ps:CIS<_>) =
    let nxt = advcnd k wi in let nxti = whladv wi
    if k < ps.v then allmltpls nxt nxti ps
    else CIS(pmltpls ps.v (ps.v*ps.v,wi),fun() -> allmltpls nxt nxti (ps.cont()))
  let rec (^) (xs:CIS<uint32*_>) (ys:CIS<uint32*_>) = 
    match compare (fst xs.v) (fst ys.v) with //union op for composite CIS's (tuple of cmpst and wheel ndx)
      | -1 -> CIS(xs.v,fun() -> xs.cont() ^ ys)
      | 0 -> CIS(xs.v,fun() -> xs.cont() ^ ys.cont())
      | _ -> CIS(ys.v,fun() -> xs ^ ys.cont()) //must be greater than
  let rec pairs (xs:CIS<CIS<_>>) =
    let ys = xs.cont() in CIS(CIS(xs.v.v,fun()->xs.v.cont()^ys.v),fun()->pairs (ys.cont()))
  let rec joinT3 (xs:CIS<CIS<_>>) = CIS(xs.v.v,fun()->
    let ys = xs.cont() in let zs = ys.cont() in (xs.v.cont()^(ys.v^zs.v))^joinT3 (pairs (zs.cont())))
  let rec mkprm (cnd,cndi,(csd:CIS<uint32*_>)) =
    let nxt = advcnd cnd cndi in let nxti = whladv cndi
    if cnd >= fst csd.v then mkprm (nxt,nxti,csd.cont()) //minus function
    else (cnd,cndi,(nxt,nxti,csd))
  let rec pCIS p pi cont = CIS(p,fun()->let (np,npi,ncont)=mkprm cont in pCIS np npi ncont)
  let rec baseprimes() = CIS(FSTPRM,fun()->let np,npi = advcnd FSTPRM 0,whladv 0
                                           pCIS np npi (advcnd np npi,whladv npi,initcmpsts))
  and initcmpsts = joinT3 (allmltpls FSTPRM 0 (baseprimes()))
  let inline genseq sd = Seq.unfold (fun (p,pi,cont) -> Some(p,mkprm cont)) sd
  seq { yield! WHLPRMS; yield! mkprm (FSTPRM,0,initcmpsts) |> genseq }

上述代码分别在 64 位模式下的参考 Intel i7-2700K (3.5 GHz) 机器上枚举前十万、百万和一千万素数大约需要 0.07、1.02 和 14.58 秒。这并不比派生此代码的参考 Haskell 实现慢多少,尽管由于在 Silverlight 下 tryfsharp 处于 32 位模式(大约再慢一半)并且在较慢的机器上运行,它在tryfsharpideone上稍慢在 ideone 上的 Mono 2.0 下(对于 F# 来说,这本来就慢得多),因此比参考机器慢五倍左右。请注意,ideone 报告的运行时间包括嵌入式查找表数组的初始化时间,该时间需要考虑在内。

上述程序具有进一步的特征,即分解轮是参数化的,例如,可以通过将 WHLPRMS 设置为 [| 来使用非常大的轮。2u;3u;5u;7u;11u;13u;17u;19u |] 和 FSTPRM 到 23u 以获得大约三分之二的运行时间,对于一千万个素数大约为 10.02 秒,但请注意,这需要几秒钟在程序开始运行之前计算 WHLPTRN。

极客注意:我没有按照参考 Haskell 代码实现“用于伸缩多级素数生产的非共享固定点组合器”,尽管我尝试这样做,因为需要有类似 Haskell 的惰性列表这样的东西才能在不运行的情况下工作进入无限循环和堆栈溢出。尽管我的共同感应流(CIS)具有一些惰性,但它们不是正式的惰性列表或缓存序列(它们成为非缓存序列并且可以在传递时被缓存,因此我提供了诸如“genseq”之类的功能最终输出序列)。我不想使用 LazyList 的 PowerPack 实现,因为它效率不高,并且需要我将该源代码复制到不提供导入模块的 tryfsharp 和 ideone 中。当一个人想要使用该算法所需的头/尾操作时,使用内置序列(甚至缓存)是非常低效的,因为获得序列尾部的唯一方法是使用“Seq.skip 1”多次使用会在递归跳过多次的原始序列的基础上产生一个新序列。我可以基于 CIS 实现我自己的高效 LazyList 类,但是当递归“initcmpsts”和“baseprimes”对象只需要很少的代码时,证明这一点似乎不值得。此外,将 LazyList 传递给函数以生成对该 LazyList 的扩展,该函数仅使用 Lazylist 开头附近的值,这需要记忆几乎整个 LazyList 以降低内存效率:前 1000 万个素数的通过将需要内存中包含近 1.8 亿个元素的 LazyList。所以我就通过了这个。

请注意,对于更大的范围(1000 万或更多),这个纯函数式代码的速度与埃拉托色尼筛或阿特金斯筛的许多简单命令式实现的速度大致相同,尽管这是由于这些命令式算法缺乏优化;根据我的“几乎功能性”的答案,使用等效优化和分段筛分数组的比这更重要的实现仍将比这快十倍。

另请注意,虽然可以使用树折叠来实现分段筛选,但由于提前剔除算法隐藏在用于“union - ^”运算符的延续中,因此解决此问题将意味着不断推进的剔除范围需要使用;这与其他算法不同,其中可以为每个新页面重置剔除变量的状态,包括减小它们的范围,因此如果使用大于 32 位的范围,内部剔除范围仍然可以重置为在 32 - 位范围,即使在以很少的每个素数执行时间成本确定 64 位素数范围时。 END_EDIT_ADD2

于 2013-07-16T05:34:46.483 回答
2

实际上我也尝试过这样做,我首先尝试了与问题相同的幼稚实现,但它太慢了。然后我找到了这个页面YAPES: Problem Seven, Part 2,在那里他使用了基于 Melissa E. O'Neill 的真正的 Eratosthenes 筛。我从那里获取代码,只是对其进行了一些修改,因为 F# 自发布以来发生了一些变化。

let reinsert x table prime = 
   let comp = x+prime 
   match Map.tryFind comp table with 
   | None        -> table |> Map.add comp [prime] 
   | Some(facts) -> table |> Map.add comp (prime::facts) 

let rec sieve x table = 
  seq { 
    match Map.tryFind x table with 
    | None -> 
        yield x 
        yield! sieve (x+1I) (table |> Map.add (x*x) [x]) 
    | Some(factors) -> 
        yield! sieve (x+1I) (factors |> List.fold (reinsert x) (table |> Map.remove x)) } 

let primes = 
  sieve 2I Map.empty

primes |> Seq.takeWhile (fun elem -> elem < 2000000I) |> Seq.sum
于 2012-07-14T02:35:32.220 回答
2

我不认为这个问题是完整的,只考虑纯粹的功能算法,在少数答案的情况下隐藏在地图或优先队列中的状态,或者在我的其他答案之一的情况下隐藏在折叠合并树中的任何一个由于它们的近似 O(n^1.2) 性能('^' 意味着提高到 n 是序列中的最高数字的幂)以及它们的计算开销,因此这些对于大范围素数的可用性非常有限剔除操作。这意味着即使对于 32 位数字范围,这些算法也将花费至少几分钟的时间来生成高达 40 亿以上的素数,这不是非常有用的东西。

有几个答案提出了使用不同程度的可变性的解决方案,但它们要么没有充分利用它们的可变性并且效率低下,要么只是对命令式代码的非常简单的翻译和丑陋的功能。在我看来,F# 可变数组只是在数据结构中隐藏可变状态的另一种形式,并且可以开发一种有效的算法,除了用于有效剔除复合数的可变缓冲区数组之外没有其他可变性使用分页缓冲区段,其余代码以纯函数样式编写。

以下代码是在看到Jon Harrop 的代码后开发的,并对这些想法进行了如下改进:

  1. Jon 的代码在高内存使用方面失败(将所有生成的素数而不是仅将素数保存到最高候选素数的平方根,并不断重新生成越来越大的缓冲区数组(等于找到的最后一个素数的大小)与 CPU 缓存大小无关。

  2. 同样,他提供的代码不包括生成序列。

  3. 此外,所呈现的代码没有进行优化,至少只处理奇数,更不用说不考虑使用车轮分解了。

如果使用 Jon 的代码生成素数范围到 40 亿以上的 32 位数字范围,则列表结构中保存的素数将需要千兆字节的内存,筛缓冲区还有数千兆字节的内存需求,尽管没有真正的理由认为后者不能具有固定的较小尺寸。一旦筛缓冲区超过 CPU 缓存大小的大小,“缓存抖动”中的性能将迅速恶化,随着首先超过 L1,然后是 L2,最后是 L3(如果存在)大小的性能损失越来越大。

这就是为什么 Jon 的代码即使在我的 8 GB 内存的 64 位机器上也只能计算大约 2500 万左右的素数,然后才会生成内存不足异常,同时也解释了为什么相对的下降幅度越来越大随着范围的增加,性能随着范围的增加而变得更高,大约 O(n^1.4) 性能,并且只是稍微节省了一些,因为它一开始就具有如此低的计算复杂度。

以下代码解决了所有这些限制,因为它只记住了根据需要计算的范围内最大数的平方根的基本素数(在 32 位数字范围的情况下只有几千字节)并且只为每个基本素数生成器和主页分段筛过滤器(小于大多数现代 CPU 的 L1 缓存大小)使用大约 16 KB 的非常小的缓冲区,以及包括生成序列代码和(当前)被对仅筛选奇数进行了一些优化,这意味着内存的使用效率更高。此外,采用打包位数组,进一步提高内存效率;它的计算成本主要由扫描缓冲区所需的较少计算来弥补。

let primesAPF32() =
  let rec oddprimes() =
    let BUFSZ = 1<<<17 in let buf = Array.zeroCreate (BUFSZ>>>5) in let BUFRNG = uint32 BUFSZ<<<1
    let inline testbit i = (buf.[i >>> 5] &&& (1u <<< (i &&& 0x1F))) = 0u
    let inline cullbit i = let w = i >>> 5 in buf.[w] <- buf.[w] ||| (1u <<< (i &&& 0x1F))
    let inline cullp p s low = let rec cull' i = if i < BUFSZ then cullbit i; cull' (i + int p)
                               cull' (if s >= low then int((s - low) >>> 1)
                                      else let r = ((low - s) >>> 1) % p in if r = 0u then 0 else int(p - r))
    let inline cullpg low = //cull composites from whole buffer page for efficiency
      let max = low + BUFRNG - 1u in let max = if max < low then uint32(-1) else max
      let sqrtlm = uint32(sqrt(float max)) in let sqrtlmndx = int((sqrtlm - 3u) >>> 1)
      if low <= 3u then for i = 0 to sqrtlmndx do if testbit i then let p = uint32(i + i + 3) in cullp p (p * p) 3u
      else baseprimes |> Seq.skipWhile (fun p -> //force side effect of culling to limit of buffer
          let s = p * p in if p > 0xFFFFu || s > max then false else cullp p s low; true) |> Seq.nth 0 |> ignore
    let rec mkpi i low =
      if i >= BUFSZ then let nlow = low + BUFRNG in Array.fill buf 0 buf.Length 0u; cullpg nlow; mkpi 0 nlow
      else (if testbit i then i,low else mkpi (i + 1) low)
    cullpg 3u; Seq.unfold (fun (i,lw) -> //force cull the first buffer page then doit
        let ni,nlw = mkpi i lw in let p = nlw + (uint32 ni <<< 1)
        if p < lw then None else Some(p,(ni+1,nlw))) (0,3u)
  and baseprimes = oddprimes() |> Seq.cache
  seq { yield 2u; yield! oddprimes() }

primesAPF32() |> Seq.nth 203280220 |> printfn "%A"

这段新代码计算 32 位数字范围内的 203,280,221 个素数,大约为ADDED/CORRECTED:25.4 秒,前 100000、100 万、1000 万和 1 亿测试的运行时间分别为 0.01、0.088、0.94 和 11.25 秒,分别在快速台式计算机(i7-2700K @ 3.5 GHz)上,并且可以在tryfsharp.orgideone.com上运行,尽管由于执行时间限制,后者的范围较小。由于它增加了计算复杂性,它的性能比 Jon Harrop 的代码更差,因为它增加了计算复杂性,但由于其更好的性能算法弥补了这种复杂性,因此它很快通过了更大的范围,使得它大约是五倍第 1000 万个素数的速度更快,而在 Jon 的代码在第 2500 万个素数爆炸之前快了大约 7 倍。

在总执行时间中,有一半以上花费在基本序列枚举上,因此将复合数剔除操作作为后台操作在很大程度上没有帮助,尽管车轮分解优化组合会有所帮助(尽管计算量更大密集,这种复杂性将在后台运行),因为它们减少了枚举中所需的缓冲区测试操作的数量。如果不需要保留序列的顺序(例如仅计算素数或对素数求和),则可以进行进一步优化,因为可以编写序列以支持并行枚举接口或代码可以是写成一个类,以便成员方法可以在没有枚举的情况下进行计算。PrimeSieve主要针对仅计算一个范围内的素数数量的任务进行了优化,并且可以计算 32 位数字范围内的素数数量是一秒的一小部分(在 i7-2700K 上为 0.25 秒)。

因此,进一步的优化将集中在使用轮分解对筛选阵列进行进一步位打包以最小化在剔除复合数时所做的工作,尝试提高结果素数的枚举效率,并将所有复合剔除委托给后台线程,其中四到八核处理器可以隐藏所需的额外计算复杂性。

而且它主要是纯函数代码,只是它使用可变数组来合并复合剔除......

于 2013-07-23T20:16:09.293 回答
2

由于这个问题特别要求其他算法,我提供以下实现:

或者可能知道替代实现方法或筛选算法

任何提交的各种埃拉托色尼筛 (SoE) 算法都是不完整的,而没有提到阿特金筛(SoA),它实际上是 SoE 的一种变体,它使用一组二次方程的解来实现复合剔除以及消除基本素数平方的所有倍数(素数小于或等于测试素数的最高数的平方根)。从理论上讲,SoA 比 SoE 更有效,因为在该范围内的操作略少,因此对于大约 10 到 1 亿的范围,它的复杂性应该降低约 20%,但实际上它通常较慢,因为求解多个二次方程的复杂性的计算开销。虽然,高度优化Daniel J. Bernstein 的 C 实现比他针对特定测试数字范围测试它的 SoE 实现要快,他测试的 SoE 实现并不是最优化的,并且更高度优化的直接 SoE 版本仍然更快。这里似乎就是这种情况,尽管我承认我可能错过了进一步的优化。

由于 O'Neill 在她关于使用增量无界 Sieves 的 SoE 的论文中主要表明 Turner Sieve 在算法和性能方面都不是 SoE,因此她没有考虑 SoE 的许多其他变体,例如 SoA。快速搜索文献,我发现没有将 SoA 应用于我们在这里讨论的无界增量序列,因此我自己将其修改为以下代码。

正如纯 SoE 无界情况可以被认为具有一个无界序列的素数倍数序列作为合数一样,SoA 认为具有作为潜在素数的所有二次方程表达式的无界序列的无界序列与一个在两个自由变量中,'x' 或 'y' 固定为一个起始值,并具有一个单独的“消除”序列,该序列是所有基素数的倍数的序列,最后一个非常类似于的复合消除序列SoE 的序列,除了序列以素数的平方而不是素数的(较小的)倍数更快地前进。我试图减少表示的二次方程序列的数量,认识到对于增量筛来说,“

以下代码符合 SoA 的公式,使用 CoInductive Stream 类型而不是 LazyList 或序列,因为不需要记忆并且性能更好,也没有使用可区分联合并出于性能原因避免模式匹配:

#nowarn "40"
type cndstate = class val c:uint32 val wi:byte val md12:byte new(cnd,cndwi,mod12) = { c=cnd;wi=cndwi;md12=mod12 } end
type prmsCIS = class val p:uint32 val cont:unit->prmsCIS new(prm,nxtprmf) = { p=prm;cont=nxtprmf } end
type stateCIS<'b> = class val v:uint32 val a:'b val cont:unit->stateCIS<'b> new(curr,aux,cont)= { v=curr;a=aux;cont=cont } end
type allstateCIS<'b> = class val ss:stateCIS<'b> val cont:unit->allstateCIS<'b> new(sbstrm,cont) = { ss=sbstrm;cont=cont } end

let primesTFWSA() =
  let WHLPTRN = [| 2uy;4uy;2uy;4uy;6uy;2uy;6uy;4uy;2uy;4uy;6uy;6uy;2uy;6uy;4uy;2uy;6uy;4uy;6uy;8uy;4uy;2uy;4uy;2uy;
                   4uy;8uy;6uy;4uy;6uy;2uy;4uy;6uy;2uy;6uy;6uy;4uy;2uy;4uy;6uy;2uy;6uy;4uy;2uy;4uy;2uy;10uy;2uy;10uy |]
  let rec prmsqrs v sqr = stateCIS(v,sqr,fun() -> let n=v+sqr+sqr in let n=if n<v then 0xFFFFFFFFu else n in prmsqrs n sqr)
  let rec allsqrs (prms:prmsCIS) = let s = prms.p*prms.p in allstateCIS(prmsqrs s s,fun() -> allsqrs (prms.cont()))
  let rec qdrtc v y = stateCIS(v,y,fun() -> let a=(y+1)<<<2 in let a=if a<=0 then (if a<0 then -a else 2) else a
                                            let vn=v+uint32 a in let vn=if vn<v then 0xFFFFFFFFu else vn in qdrtc vn (y+2))
  let rec allqdrtcsX4 x = allstateCIS(qdrtc (((x*x)<<<2)+1u) 1,fun()->allqdrtcsX4 (x+1u))
  let rec allqdrtcsX3 x = allstateCIS(qdrtc (((x*(x+1u))<<<1)-1u) (1 - int x),fun() -> allqdrtcsX3 (x+1u))
  let rec joinT3 (ass:allstateCIS<'b>) = stateCIS<'b>(ass.ss.v,ass.ss.a,fun()->
    let rec (^) (xs:stateCIS<'b>) (ys:stateCIS<'b>) = //union op for CoInductiveStreams
      match compare xs.v ys.v with
        | 1 -> stateCIS(ys.v,ys.a,fun() -> xs ^ ys.cont())
        | _ -> stateCIS(xs.v,xs.a,fun() -> xs.cont() ^ ys) //<= then keep all the values without combining
    let rec pairs (ass:allstateCIS<'b>) =
      let ys = ass.cont
      allstateCIS(stateCIS(ass.ss.v,ass.ss.a,fun()->ass.ss.cont()^ys().ss),fun()->pairs (ys().cont()))
    let ys = ass.cont() in let zs = ys.cont() in (ass.ss.cont()^(ys.ss^zs.ss))^joinT3 (pairs (zs.cont())))
  let rec mkprm (cs:cndstate) (sqrs:stateCIS<_>) (qX4:stateCIS<_>) (qX3:stateCIS<_>) tgl =
    let inline advcnd (cs:cndstate) =
      let inline whladv i = if i < 47uy then i + 1uy else 0uy
      let inline modadv m a = let md = m + a in if md >= 12uy then md - 12uy else md
      let a = WHLPTRN.[int cs.wi] in let nc = cs.c+uint32 a
      if nc<cs.c then failwith "Tried to enumerate primes past the numeric range!!!"
      else cndstate(nc,whladv cs.wi,modadv cs.md12 a)
    if cs.c>=sqrs.v then mkprm (if cs.c=sqrs.v then advcnd cs else cs) (sqrs.cont()) qX4 qX3 false //squarefree function
    elif cs.c>qX4.v then mkprm cs sqrs (qX4.cont()) qX3 false
    elif cs.c>qX3.v then mkprm cs sqrs qX4 (qX3.cont()) false
    else match cs.md12 with
            | 7uy -> if cs.c=qX3.v then mkprm cs sqrs qX4 (qX3.cont()) (if qX3.a>0 then not tgl else tgl) //only for a's are positive
                     elif tgl then prmsCIS(cs.c,fun() -> mkprm (advcnd cs) sqrs qX4 qX3 false)
                     else mkprm (advcnd cs) sqrs qX4 qX3 false
            | 11uy -> if cs.c=qX3.v then mkprm cs sqrs qX4 (qX3.cont()) (if qX3.a<0 then not tgl else tgl) //only for a's are negatve
                      elif tgl then prmsCIS(cs.c,fun() -> mkprm (advcnd cs) sqrs qX4 qX3 false)
                      else mkprm (advcnd cs) sqrs qX4 qX3 false
            | _ -> if cs.c=qX4.v then mkprm cs sqrs (qX4.cont()) qX3 (not tgl) //always must be 1uy or 5uy
                   elif tgl then prmsCIS(cs.c,fun() -> mkprm (advcnd cs) sqrs qX4 qX3 false)
                   else mkprm (advcnd cs) sqrs qX4 qX3 false
  let qX4s = joinT3 (allqdrtcsX4 1u) in let qX3s = joinT3 (allqdrtcsX3 1u)
  let rec baseprimes = prmsCIS(11u,fun() -> mkprm (cndstate(13u,1uy,1uy)) initsqrs qX4s qX3s false)
  and initsqrs = joinT3 (allsqrs baseprimes)
  let genseq ps = Seq.unfold (fun (psd:prmsCIS) -> Some(psd.p,psd.cont())) ps
  seq { yield 2u; yield 3u; yield 5u; yield 7u;
        yield! mkprm (cndstate(11u,0uy,11uy)) initsqrs qX4s qX3s false |> genseq }

如前所述,对于前 100,000 个素数,该代码比在另一个答案中发布的 Tree Folding Wheel Optimized SoE 慢,大约半秒,并且对于素数发现的性能,其经验 O(n^1.2) 大致相同其他纯功能解决方案。可以尝试的一些进一步优化是素数平方序列不使用轮分解来消除平方的 357 个倍数,或者甚至只使用素数平方的素数倍数来减少平方序列流中的值的数量,也许其他与二次方程表达式序列流相关的优化。

EDIT_ADD: 我花了一点时间研究 SoA 模优化,发现除了上述“squarefree”优化(可能不会有太大区别)之外,二次序列在每 15 个元素上都有一个模模式将允许预先筛选许多通过的切换复合测试值,并将消除对每个复合数的特定模 12 运算的需要。所有这些优化都可能导致提交给树折叠的计算工作减少多达约 50%,以使稍微优化的 SoA 版本接近或略好于最佳树折叠合并 SoE。我不知道我什么时候才能找到时间做这几天的调查以确定结果。

EDIT_ADD2: 在进行上述优化时,确实将性能提高了大约两倍,我明白了为什么当前随着 n 增加的经验性能不如 SoE:而 SoE 特别适合树折叠操作,因为第一个序列更密集,重复频率更高,后面的序列更不密集,然后再次变得不那么密集;这意味着调用/返回序列不会像 SoE 那样保持在最小深度,而是该深度增加超出与数字范围成正比。使用折叠的解决方案是' t 很漂亮,因为可以对随时间增加密度的序列实施左折叠,但这仍然使“3X”序列的负部分优化不佳,将“3X”序列分解为正部分和负部分也是如此。最简单的解决方案可能是将所有序列保存到 Map 中,这意味着访问时间将增加类似于范围平方根的对数,但对于更大的数字范围,这将比当前树折叠更好。END_EDIT_ADD2

虽然速度较慢,但​​我在此展示此解决方案是为了展示如何将代码演变为将最初以命令式方式构想的想法表达为 F# 中的纯函数式代码。它提供了在 CoInductive Streams 中使用延续来实现惰性而不使用 Lazy 类型的示例,实现(尾)递归循环以避免对可变性的任何要求,通过递归调用线程化累加器(tgl)以获得结果(将二次方程“击中”测试数字),将方程的解表示为(惰性)序列(或在这种情况下为流)等。

对于那些即使没有基于 Windows 的开发系统也想进一步使用此代码的人,我也已将其发布到tryfsharp.orgIdeone.com,尽管它在这两个平台上运行速度较慢,而 tryfsharp 都与速度成正比由于在 Silverlight 下运行而导致本地客户端计算机运行速度较慢,而 Ideone 在 Mono-project 2.0 下的 Linux 服务器 CPU 上运行,这在实现方面尤其是垃圾收集方面都非常缓慢。

于 2013-08-01T17:30:16.140 回答
2

在这个线程上有一些真正有趣和有启发性的讨论,我知道这个线程很老,但我想解决 OP 的原始问题。回想一下,它想要一个纯功能版本的 Eratosthenes 筛子。

let rec PseudoSieve list =
match list with
| hd::tl -> hd :: (PseudoSieve <| List.filter (fun x -> x % hd <> 0) tl)
| [] -> []

这有已经讨论过的缺陷。当然,不使用突变、模算术的最简单的纯函数解决方案 - 有太多检查以划掉候选者 - 会是这样的吗?

let rec sieve primes = function
    | []        -> primes |> List.rev
    | p :: rest -> 
         sieve  (p :: primes) (rest |> List.except [p*p..p..n])

这显然不是为了最终的性能和内存使用,检查如何List.except- 以某种方式进行交叉以使其只完成一次会很有趣(这可能使其成为 Eratosthenes 筛子的替代方案而不是实现但与 OP 中链接的论文中所述的幼稚解决方案具有相同的好处 - 已实施并且 Big O 成本在那里。

我仍然认为这是对原始 OP 最简洁的答案。你怎么看?

更新:在 List.except 中使用 p*p 使其成为合适的筛子!

编辑_添加:

我是@GordonBGood,我正在直接编辑您的答案(当您征求意见时),而不是发表一系列广泛的评论,如下所示:

  1. 首先,您的代码将无法编译,因为它是n未知的,并且必须在定义列表的封闭代码中给出,该列表[ 2 .. n ]定义了开始的起始列表。

  2. 您的代码基本上是欧拉筛,而不是要求的埃拉托色尼筛(SoE),尽管您是正确的,复合数的“交叉”只发生一次,List.except因为该复合数将不再存在于候选列表中,使用刚刚定义了List.except“幕后”人们可以用折叠和过滤功能做什么:想想List.except正在做什么 - 对于要筛选的候选列表中的每个元素,它正在扫描以查看该元素是否与基本素因子列表,如果是,则将其过滤掉。这是非常低效的,因为当使用列表处理而不是可变数组来实现这些扫描时,每个元素都会复合。以下是您充实的代码,作为一个完整的答案uint32产生相同类型素数序列的参数:

let sieveTo n =
  let rec sieve primes = function
    | []        -> primes |> List.rev
    | p :: rest -> sieve  (p :: primes) (rest |> List.except [p*p..p..n])
  sieve [] [ 2u .. n ] |> List.toSeq```

这具有极高的对数复杂度,因为对于平方定律关系,大约需要 2.3 秒才能筛选到 10 万和 227 秒才能筛选到 100 万 - 基本上,由于数量快速增加,它是为列表实现的无用功能筛选范围的工作(每个剩余元素的所有扫描)。

  1. OP中提到的O'Neill文章中的“Richard Bird”筛子是一个真正的基于列表的SoE,因为她正确识别了它,并且通过考虑要筛选的候选列表是有序的和组合的,它避免了重复处理要消除/排除的素数列表(复合数字列表)按顺序排序,以便只需要比较每个列表的头部。使用惰性,这些列表也是无限的,因此不需要n并产生一个“无限”的素数列表。Richard Bird 筛子对所有数字(不仅仅是赔率)的表达式,包括通过 Co Inductive Stream's (CIS's) 的惰性,在 F# 中如下所示:
type 'a CIS = CIS of 'a * (unit -> 'a CIS) //'Co Inductive Stream for laziness
 
let primesBird() =
  let rec (^^) (CIS(x, xtlf) as xs) (CIS(y, ytlf) as ys) = // stream merge function
    if x < y then CIS(x, fun() -> xtlf() ^^ ys)
    elif y < x then CIS(y, fun() -> xs ^^ ytlf())
    else CIS(x, fun() -> xtlf() ^^ ytlf()) // eliminate duplicate!
  let pmltpls p = let rec nxt c = CIS(c, fun() -> nxt (c + p)) in nxt (p * p)
  let rec allmltps (CIS(p, ptlf)) = CIS(pmltpls p, fun() -> allmltps (ptlf()))
  let rec cmpsts (CIS(CIS(c, ctlf), amstlf)) =
    CIS(c, fun() -> (ctlf()) ^^ (cmpsts (amstlf())))
  let rec minusat n (CIS(c, ctlf) as cs) =
    if n < c then CIS(n, fun() -> minusat (n + 1u) cs)
    else minusat (n + 1u) (ctlf())
  let rec baseprms() = CIS(2u, fun() -> baseprms() |> allmltps |> cmpsts |> minusat 3u)
  Seq.unfold (fun (CIS(p, ptlf)) -> Some(p, ptlf())) (baseprms())

以上在我的机器上大约需要 2.3 秒才能将质数数到一百万。上面的筛子已经有了改进,它使用baseprms小素数的二次流来引入复合剔除流。

  1. 这个问题除了它没有非常小的变化来使它只有赔率或更高程度的车轮分解是,虽然它没有上面的平方律复杂性那么糟糕,它仍然需要大约将执行时间的线性量加倍(经验增长顺序约为 1.3)或可能与平方成正比(log n)。对此的解决方案是使用无限树状结构而不是线性排序来对合数进行排序,以降低到 log n 复杂度,如下面的代码所示(也有一些小的改动,使其仅是赔率):
type 'a CIS = CIS of 'a * (unit -> 'a CIS) //'Co Inductive Stream for laziness
 
let primesTreeFold() =
  let rec (^^) (CIS(x, xtlf) as xs) (CIS(y, ytlf) as ys) = // stream merge function
    if x < y then CIS(x, fun() -> xtlf() ^^ ys)
    elif y < x then CIS(y, fun() -> xs ^^ ytlf())
    else CIS(x, fun() -> xtlf() ^^ ytlf()) // no duplication
  let pmltpls p = let rec nxt c = CIS(c, fun() -> nxt (c + p)) in nxt (p * p)
  let rec allmltps (CIS(p, ptlf)) = CIS(pmltpls p, fun() -> allmltps (ptlf()))
  let rec pairs (CIS(cs0, cs0tlf)) = // implements infinite tree-folding
    let (CIS(cs1, cs1tlf)) = cs0tlf() in CIS(cs0 ^^ cs1, fun() -> pairs (cs1tlf()))
  let rec cmpsts (CIS(CIS(c, ctlf), amstlf)) = // pairs is used below...
    CIS(c, fun() -> ctlf() ^^ (cmpsts << pairs << amstlf)())
  let rec minusat n (CIS(c, ctlf) as cs) =
    if n < c then CIS(n, fun() -> minusat (n + 2u) cs)
    else minusat (n + 1u) (ctlf())
  let rec oddprms() = CIS(3u, fun() -> oddprms() |> allmltps |> cmpsts |> minusat 5u)
  Seq.unfold (fun (CIS(p, ptlf)) -> Some(p, ptlf())) (CIS(2u, fun() -> oddprms()))

请注意使用无限树折叠而不是线性排序的非常小的变化;它还需要递归辅助馈送,以便将初始化级别增加到 2/3/5 而不是仅 2/3,以防止失控。这个微小的变化将素数的计数速度提高到一百万到 0.437 秒,在 4.91 秒内增加到一千万,在 62.4 秒内增加到一亿,增长率趋于与 log n 成正比。

  1. 结论:您的实现几乎无法在人们实际上可以手动使用 SoE 计算出素数的范围内使用,而这些“功能”筛中最好的一个在一分钟内达到一亿左右的范围内是适度有用的。然而,尽管相对简单,但它们无法实现多线程,因为每个新状态都依赖于之前状态的连续性,并且计算的开销使它们比基于数组突变的筛子慢数百倍,其中我们可以在几分之一秒内找到十亿的素数。
于 2020-12-04T17:50:50.683 回答
1

我对 Haskell 多映射不是很熟悉,但是F# Power Pack有一个 HashMultiMap 类,其 xmldoc 总结是:“哈希表,默认基于 F# 结构的“哈希”和 (=) 函数。该表可能映射单个多个绑定的关键。” 也许这可以帮助你?

于 2011-01-07T20:44:20.337 回答
1

我已经提交了一个“几乎功能性”的答案,并且不想通过进一步的添加/改进来混淆它,所以我提交了这个答案,其中包括最大轮子分解和多线程 - 在我看来,购买一台带有多线程(甚至智能手机也是多核的)并且运行单线​​程就像购买一辆带有多缸发动机的汽车,然后只用一个发动机开火。

同样,除了剔除缓冲区内容的突变和与枚举相关的优化(如果使用)之外,以下代码大部分都是功能性的,这总是需要当前状态的概念(尽管这些细节被一些较慢的方法隐藏了,例如就像使用 F# 的内置 seq's - 但它们很慢);代码如下:

/// F# version of the Wheel Factorized  Sieve of Eratosthenes...

/// This is a "combo" sieve where
///   it is fully wheel factorized by the primes of 2, 3, 5, and 7; then
///   pre-sieved by the pattern of the 11, 13, 17, and 19 primes...

/// This version is almost fully functional with no mutation used except for
/// the contents of the sieve buffer arrays on composite number culling/sieving.

module SoE =

  type private Prime = uint64 // JavaScript doesn't have anything bigger!
  type private PrimeNdx = int64
  type private BasePrimeRep = uint32

  let inline public prime n = uint64 n // match these convenience conversions
  let inline private primendx n = int64 n // with the types above!
  let inline private bprep n = uint32 n // with the types above!

  let private cPGSZBTS = (1 <<< 14) * 8 // sieve buffer size in bits = CPUL1CACHE - THIS SHOULD REALLY BE AN ARGUMENT!!!!

  type private BasePrimeRepArr = BasePrimeRep[]
  type private SieveBuffer = uint8[][] // multiple levels by residue index, by segment, by byte

  /// a Co-Inductive Stream (CIS) of an "infinite" non-memoized series...
  type private CIS<'T> = CIS of 'T * (unit -> CIS<'T>) //' apostrophe formatting adjustment

  /// lazy list (memoized) series of base prime page arrays...
  type private BasePrime = uint32
  type private BasePrimeRepArrs =
    BasePrimeRepArrs of BasePrimeRepArr * Option<Lazy<BasePrimeRepArrs>>

// constants and Look Up Tables to do with culling start address calculation...
  let private FRSTSVPRM = prime 23 // past the precull primes!
  let private WHLNDXCNST = primendx (FRSTSVPRM * (FRSTSVPRM - prime 1) >>> 1)
  let private WHLPRMS = [| prime 2; prime 3; prime 5; prime 7;
                           prime 11; prime 13; prime 17; prime 19 |]
  let private WHLHITS = 48 // (3 - 1) * (5 - 1) * (7 - 1)!
  let private WHLODDCRC = 105 // 3 * 5 * 7; no evens!
  let private WHLPTRNLEN = 11 * 13 * 17 * 19 // repeating pattern of pre-cull primes
  let private NUMPCULLPRMS = 4
  let private PCULLPRMREPS: BasePrimeRepArrs =
    BasePrimeRepArrs( [| uint32 (-1 <<< 6) + 44u; uint32 (-1 <<< 6) + 45u;
                         uint32 (-1 <<< 6) + 46u; uint32 (-1 <<< 6) + 47u |], None)
  // number of primes to a million minus number wheel prims; go sieving to 10^12
  let private NUMSTRTSBASEPRMS = 78498 + WHLPRMS.Length + 1 // +1 for end 0xFFFFFFFFu
  let private NUMSTRTSPRMS = (6542 - WHLPRMS.Length + 1) // enough for  65536 squared
  let private RESIDUES = [|
    23; 29; 31; 37; 41; 43; 47; 53; 59; 61; 67; 71;
    73; 79; 83; 89; 97; 101; 103; 107; 109; 113; 121; 127;
    131; 137; 139; 143; 149; 151; 157; 163; 167; 169; 173; 179;
    181; 187; 191; 193; 197; 199; 209; 211; 221; 223; 227; 229; 233 |]
  let private WHLNDXS = [|
    0; 0; 0; 1; 2; 2; 2; 3; 3; 4; 5; 5; 6; 6; 6;
    7; 7; 7; 8; 9; 9; 9; 10; 10; 11; 12; 12; 12; 13; 13;
    14; 14; 14; 15; 15; 15; 15; 16; 16; 17; 18; 18; 19; 20; 20;
    21; 21; 21; 21; 22; 22; 22; 23; 23; 24; 24; 24; 25; 26; 26;
    27; 27; 27; 28; 29; 29; 29; 30; 30; 30; 31; 31; 32; 33; 33;
    34; 34; 34; 35; 36; 36; 36; 37; 37; 38; 39; 39; 40; 41; 41;
    41; 41; 41; 42; 43; 43; 43; 43; 43; 44; 45; 45; 46; 47; 47; 48 |]
  let private WHLRNDUPS = [| // two rounds to avoid overflow; used in start address calcs...
      0; 3; 3; 3; 4; 7; 7; 7; 9; 9; 10; 12; 12; 15; 15;
      15; 18; 18; 18; 19; 22; 22; 22; 24; 24; 25; 28; 28; 28; 30;
      30; 33; 33; 33; 37; 37; 37; 37; 39; 39; 40; 42; 42; 43; 45;
      45; 49; 49; 49; 49; 52; 52; 52; 54; 54; 57; 57; 57; 58; 60;
      60; 63; 63; 63; 64; 67; 67; 67; 70; 70; 70; 72; 72; 73; 75;
      75; 78; 78; 78; 79; 82; 82; 82; 84; 84; 85; 87; 87; 88; 93;
      93; 93; 93; 93; 94; 99; 99; 99; 99; 99; 100; 102; 102; 103; 105;
      105; 108; 108; 108; 109; 112; 112; 112; 114; 114; 115; 117; 117; 120; 120;
      120; 123; 123; 123; 124; 127; 127; 127; 129; 129; 130; 133; 133; 133; 135;
      135; 138; 138; 138; 142; 142; 142; 142; 144; 144; 145; 147; 147; 148; 150;
      150; 154; 154; 154; 154; 157; 157; 157; 159; 159; 162; 162; 162; 163; 165;
      165; 168; 168; 168; 169; 172; 172; 172; 175; 175; 175; 177; 177; 178; 180;
      180; 183; 183; 183; 184; 187; 187; 187; 189; 189; 190; 192; 192; 193; 198;
      198; 198; 198; 198; 199; 204; 204; 204; 204; 204; 205; 207; 207; 208; 210; 210 |]
  /// LUT of relative cull start points given the residual bit plane index (outer index),
  /// and the combination of the base prime residual index and the bit plane index of
  /// the first cull position for the page (multiply combined for the inner index), giving
  /// a 16-bit value which contains the multipier (the upper byte) and the extra
  /// cull index offset (the lower byte) used to multiply by the base prime wheel index
  /// and add the offset with the result added with the start wheel index to obtain
  /// the residual bit plane start wheel index...
  /// for "PG11", these arrays get huge (quarter meg elements with elements of 4 bytes for
  /// a megabyte size), which will partially (or entirely) cancell out the benefit for
  /// smaller prime ranges; may help for the huge prime ranges...
  let private WHLSTRTS: uint16[][] =
    let arr = Array.init WHLHITS <| fun _ -> Array.zeroCreate (WHLHITS * WHLHITS)
    for pi = 0 to WHLHITS - 1 do
      let mltsarr = Array.zeroCreate WHLHITS
      let p = RESIDUES.[pi] in let s = (p * p - int FRSTSVPRM) >>> 1 
      // build array of relative mults and offsets to `s`...
      { 0 .. WHLHITS - 1 } |> Seq.iter (fun ci ->
        let rmlt0 = (RESIDUES.[(pi + ci) % WHLHITS] - RESIDUES.[pi]) >>> 1
        let rmlt = rmlt0 + if rmlt0 < 0 then WHLODDCRC else 0 in let sn = s + p * rmlt
        let snd = sn / WHLODDCRC in let snm = sn - snd * WHLODDCRC
        mltsarr.[WHLNDXS.[snm]] <- rmlt) // new rmlts 0..209!
      let ondx = pi * WHLHITS
      { 0 .. WHLHITS - 1 } |> Seq.iter (fun si ->
        let s0 = (RESIDUES.[si] - int FRSTSVPRM) >>> 1 in let sm0 = mltsarr.[si]
        { 0 .. WHLHITS - 1 } |> Seq.iter (fun ci ->
          let smr = mltsarr.[ci]
          let rmlt = if smr < sm0 then smr + WHLODDCRC - sm0 else smr - sm0
          let sn = s0 + p * rmlt in let rofs = sn / WHLODDCRC
          // we take the multiplier times 2 so it multiplies by the odd wheel index...
          arr.[ci].[ondx + si] <- (rmlt <<< 9) ||| rofs |> uint16))
    arr

  let private makeSieveBuffer btsz: SieveBuffer =
    let sz = ((btsz + 31) >>> 5) <<< 2 // rounded up to nearest 32 bit boundary
    { 1 .. WHLHITS } |> Seq.map (fun _ -> Array.zeroCreate sz) |> Array.ofSeq

  // a dedicated BITMSK LUT may be faster than bit twiddling...
  let private BITMSK = [| 1uy; 2uy; 4uy; 8uy; 16uy; 32uy; 64uy; 128uy |]

  /// all the heavy lifting work is done here...
  let private cullSieveBuffer (lwi: PrimeNdx) (bpras: BasePrimeRepArrs)
                              (strtsa: uint32[]) (sb: SieveBuffer) =
    let sz = sb.[0].Length in let szbits = sz <<< 3 in let bplmt = sz >>> 4
    let lowndx = lwi * primendx WHLODDCRC
    let nxti = (lwi + primendx szbits) * primendx WHLODDCRC
    // set up strtsa for use by each modulo residue bit plane...
    let rec looppi ((BasePrimeRepArrs(bpra, bprastl)) as obpras) pi j =
      if pi < bpra.Length then
        let ndxdprm = bpra.[pi] in let rsd = RESIDUES.[int ndxdprm &&& 0x3F]
        let bp = (int ndxdprm >>> 6) * (WHLODDCRC <<< 1) + rsd
        let i = (bp - int FRSTSVPRM) >>> 1 |> primendx
        let s = (i + i) * (i + primendx FRSTSVPRM) + WHLNDXCNST
        if s >= nxti then strtsa.[j] <- 0xFFFFFFFFu else // enough base primes!
          let si = if s >= lowndx then int (s - lowndx) else
                    let wp = (rsd - int FRSTSVPRM) >>> 1
                    let r = (lowndx - s) %
                              (primendx bp * primendx WHLODDCRC) |> int
                    if r = 0 then 0 else
                    bp * (WHLRNDUPS.[wp + (int r + bp - 1) / bp] - wp) - r
          let sd = si / WHLODDCRC in let sn = WHLNDXS.[si - sd * WHLODDCRC]
          strtsa.[j] <- (uint32 sn <<< 26) ||| uint32 sd
          looppi obpras (pi + 1) (j + 1)
      else match bprastl with
           | None -> ()
           | Some lv -> looppi lv.Value 0 j       
    looppi bpras 0 0
    // do the culling based on the preparation...
    let rec loopri ri =
      if ri < WHLHITS then
        let pln = sb.[ri] in let plnstrts = WHLSTRTS.[ri]
        let rec looppi (BasePrimeRepArrs(bpra, bprastl) as obpras) pi =
          if pi < bpra.Length then
            let prmstrt = strtsa.[pi]
            if prmstrt < 0xFFFFFFFFu then
              let ndxdprm = bpra.[pi]
              let pd = int ndxdprm >>> 6 in let prmndx = int ndxdprm &&& 0x3F
              let bp = pd * (WHLODDCRC <<< 1) + RESIDUES.[prmndx]
              let sd = int prmstrt &&& 0x3FFFFFF in let sn = int (prmstrt >>> 26)
              let adji = prmndx * WHLHITS + sn in let adj = plnstrts.[adji]
              let s0 = sd + int (adj >>> 8) * pd + (int adj &&& 0xFF)
              if bp < bplmt then
                let slmt = min szbits (s0 + (bp <<< 3))
                let rec loops s8 =
                  if s8 < slmt then
                    let msk = BITMSK.[s8 &&& 7]
                    let rec loopc c =
                      if c < pln.Length then pln.[c] <- pln.[c] ||| msk; loopc (c + bp)
                    loopc (s8 >>> 3); loops (s8 + bp) in loops s0
              else
                let rec loopsi si =
                  if si < szbits then
                    let w = si >>> 3 in pln.[w] <- pln.[w] ||| BITMSK.[si &&& 7]
                    loopsi (si + bp) in loopsi s0
              looppi obpras (pi + 1)
          else match bprastl with
               | None -> ()
               | Some lv -> looppi lv.Value 0        
        looppi bpras 0; loopri (ri + 1) in loopri 0

  /// pre-culled wheel pattern with a 131072 extra size to avoid overflow...
  /// (copy by 16 Kilobytes per time!)
  let private WHLPTRN: SieveBuffer = // rounded up to next 32-bit alignmenet!
    let sb = makeSieveBuffer ((WHLPTRNLEN <<< 3) + 131072 + 31)
    let strtsa = Array.zeroCreate NUMPCULLPRMS
    cullSieveBuffer (primendx 0) PCULLPRMREPS strtsa sb; sb

  /// fill the SieveBuffer from the WHLPTRN according to the modulo of the low wheel index...
  let private fillSieveBuffer (lwi: PrimeNdx) (sb: SieveBuffer) =
    let len = sb.[0].Length in let cpysz = min len 16384 in let mdlo0 = lwi / (primendx 8)
    { 0 .. WHLHITS - 1 } |> Seq.iter (fun i ->
      { 0 .. 16384 .. len - 1 } |> Seq.iter (fun j ->
        let mdlo = (mdlo0 + primendx j) % (primendx WHLPTRNLEN) |> int
        Array.blit WHLPTRN.[i] mdlo sb.[i] j cpysz))

  /// fast value set bit count Look Up Table (CLUT) for 16-bit input...
  let private CLUT: uint8[] =
    let arr = Array.zeroCreate 65536
    let rec cntem i cnt = if i <= 0 then cnt else cntem (i &&& (i - 1)) (cnt + 1)
    for i = 0 to 65535 do arr.[i] <- cntem i 0 |> uint8
    arr

  /// count the zero (prime) bits in the SieveBuffer up to the "lsti" odd index...
  let private countSieveBuffer (bitlmt: int) (sb: SieveBuffer): int =
    let lstwi = bitlmt / WHLODDCRC
    let lstri = WHLNDXS.[bitlmt - lstwi * WHLODDCRC]
    let lst = (lstwi >>> 5) <<< 2 in let lstm = lstwi &&& 31
    let rec loopr ri cr =
      if ri >= WHLHITS then cr else
      let pln = sb.[ri]
      let rec cntem i cnt =
        if i >= lst then
          let msk = (0xFFFFFFFFu <<< lstm) <<< if ri <= lstri then 1 else 0
          let v = (uint32 pln.[lst] + (uint32 pln.[lst + 1] <<< 8) +
                   (uint32 pln.[lst + 2] <<< 16) + (uint32 pln.[lst + 3] <<< 24)) ||| msk
          cnt - int CLUT.[int v &&& 0xFFFF] - int CLUT.[int (v >>> 16)] else
        let v = uint32 pln.[i] + (uint32 pln.[i + 1] <<< 8) +
                (uint32 pln.[i + 2] <<< 16) + (uint32 pln.[i + 3] <<< 24)
        cntem (i + 4) (cnt - int CLUT.[int v &&& 0xFFFF] - int CLUT.[int (v >>> 16)])
      let cnti = cntem 0 cr in loopr (ri + 1) cnti
    loopr 0 ((lst * 8 + 32) * WHLHITS)

  /// it's rediculously easy to make this multi-threaded with the following change...
// (*
  /// a CIS series of pages from the given start index with the given SieveBuffer size,
  /// and provided with a polymorphic converter function to produce
  /// and type of result from the culled page parameters...
  let cNUMPROCS = System.Environment.ProcessorCount
  let rec private makePrimePages strtwi btsz strtsasz
                                 (cnvrtrf: PrimeNdx -> SieveBuffer -> 'T): CIS<'T> =
    let bpas = makeBasePrimes() in let tsks = Array.zeroCreate cNUMPROCS
    let sbs = Array.init cNUMPROCS (fun _ -> Array.zeroCreate (btsz >>> 3))
    let mktsk lwi i = System.Threading.Tasks.Task.Run(fun() ->
      let sb = makeSieveBuffer btsz in let strtsa = Array.zeroCreate strtsasz
      fillSieveBuffer lwi sb; cullSieveBuffer lwi bpas strtsa sb
      cnvrtrf lwi sb)
    let rec jobfeed lwi i =
      CIS(lwi, fun() -> let ni = i + 1
                        jobfeed (lwi + primendx btsz)
                                (if ni >= cNUMPROCS then 0 else ni))
    let rec strttsks (CIS(lwi, jbfdtlf) as jbfd) i =
      if i >= cNUMPROCS then jbfd else
      tsks.[i] <- mktsk lwi i; strttsks (jbfdtlf()) (i + 1)
    let rec mkpgrslt (CIS(lwi, jbfdtlf)) i =
      let rslt = tsks.[i].Result in tsks.[i] <- mktsk lwi i
      CIS(rslt,
          fun() -> mkpgrslt (jbfdtlf())
                            (if i >= cNUMPROCS - 1 then 0 else i + 1))
    mkpgrslt <| strttsks (jobfeed strtwi 0) 0 <| 0
// *)

  // the below is single threaded...
(*
  /// a CIS series of pages from the given start index with the given SieveBuffer size,
  /// and provided with a polymorphic converter function to produce
  /// and type of result from the culled page parameters...
  let rec private makePrimePages strtwi btsz strtsasz
                                 (cnvrtrf: PrimeNdx -> SieveBuffer -> 'T): CIS<'T> =
    let bpas = makeBasePrimes() in let sb = makeSieveBuffer btsz
    let strtsa = Array.zeroCreate strtsasz
    let rec nxtpg lwi =
      fillSieveBuffer lwi sb; cullSieveBuffer lwi bpas strtsa sb
      CIS(cnvrtrf lwi sb, fun() -> nxtpg (lwi + primendx btsz))
    nxtpg strtwi
// *)

  /// secondary feed of lazy list of memoized pages of base primes...
  and private makeBasePrimes(): BasePrimeRepArrs =
    let sb2bpa lwi (sb: SieveBuffer) =
      let btsz = sb.[0].Length <<< 3
      let arr =
        Array.zeroCreate <| countSieveBuffer ((btsz * WHLODDCRC) - 1) sb
      let rec loop ri i j =
        if i < btsz then
          if ri < WHLHITS then
            if sb.[ri].[i >>> 3] &&& BITMSK.[i &&& 7] <> 0uy then
              loop (ri + 1) i j
            else arr.[j] <- ((bprep lwi + bprep i) <<< 6) ||| bprep ri
                 loop (ri + 1) i (j + 1)
          else loop 0 (i + 1) j in loop 0 0 0; arr
    // finding the first page as not part of the loop and making succeeding
    // pages lazy breaks the recursive data race!
    let fksb = makeSieveBuffer 64 in fillSieveBuffer (primendx 0) fksb
    let fkbpra = sb2bpa (primendx 0) fksb
    let fkbpas = BasePrimeRepArrs(fkbpra, None)
    let strtsa = Array.zeroCreate (fkbpra.Length + 1)
    let frstsb = makeSieveBuffer 512 in fillSieveBuffer (primendx 0) frstsb
    cullSieveBuffer (primendx 0) fkbpas strtsa frstsb
    let rec nxtbpas (CIS(bpa, tlf)) =
      BasePrimeRepArrs(bpa, Some(lazy (nxtbpas (tlf()))))
    let restbpras =
      Some(lazy (nxtbpas <|
                   makePrimePages (primendx 512) 512 NUMSTRTSPRMS sb2bpa))
    let frstbpa = sb2bpa (primendx 0) frstsb
    BasePrimeRepArrs(frstbpa, restbpras)                    

  /// produces a generator of primes; uses mutability for better speed...
  let primes(): unit -> Prime =
    let sb2prmsarr lwi (sb: SieveBuffer) =
      let btsz = sb.[0].Length <<< 3
      let arr = Array.zeroCreate <| countSieveBuffer (btsz * WHLODDCRC - 1) sb
      let baseprm = prime (lwi + lwi) * prime WHLODDCRC
      let inline notprm ri i = sb.[ri].[i >>> 3] &&& BITMSK.[i &&& 7] <> 0uy
      let rec loop ri i j =
        if ri >= WHLHITS then loop 0 (i + 1) j else
        if i < btsz then
          if notprm ri i then loop (ri + 1) i j
          else arr.[j] <- baseprm + prime (i + i) * prime WHLODDCRC
                            + prime RESIDUES.[ri]
               loop (ri + 1) i (j + 1) in loop 0 0 0
      arr
    let mutable i = -WHLPRMS.Length
    let (CIS(nprms, npgtlf)) = // use page generator function above!
      makePrimePages (primendx 0) cPGSZBTS NUMSTRTSPRMS sb2prmsarr
    let mutable prmarr = nprms in let mutable pgtlf = npgtlf
    fun() -> 
      if i >= 0 && i < prmarr.Length then
        let oi = i in i <- i + 1; prmarr.[oi] else // ready next call!      
        if i < 0 then i <- i + 1; WHLPRMS.[7 + i] else
        let (CIS(nprms, npgtlf)) = pgtlf() // use page generator function above!
        i <- 1; prmarr <- nprms; pgtlf <- npgtlf; prmarr.[0]

  let countPrimesTo (limit: Prime): int64 = // much faster!
    let precnt = WHLPRMS |> Seq.takeWhile ((>=) limit) |> Seq.length |> int64
    if limit < FRSTSVPRM then precnt else
    let topndx = (limit - FRSTSVPRM) >>> 1 |> primendx
    let lmtlwi = topndx / primendx WHLODDCRC
    let sb2cnt lwi (sb: SieveBuffer) =
      let btsz = sb.[0].Length <<< 3 in let lmti = lwi + primendx (btsz - 1)
      countSieveBuffer
        (if lmti < lmtlwi then btsz * WHLODDCRC - 1
         else int (topndx - lwi * primendx WHLODDCRC)) sb |> int64, lmti
    let rec loop (CIS((cnt, nxti), tlf)) count =
      if nxti < lmtlwi then loop (tlf()) (count + cnt)
      else count + cnt
    loop <| makePrimePages (primendx 0) cPGSZBTS NUMSTRTSBASEPRMS sb2cnt <| precnt

open System
open SoE

[<EntryPoint>]
let main argv =
  let limit = prime 2000000000

  let frstprms = primes()
  printf "The first 23 primes are:  "
  for _ in 1 .. 25 do printf "%d " (frstprms())
  printfn ""

  let numprms = primes() in let mutable cnt = 0
  printf "Number of primes up to a million:  "
  while numprms() <= prime 1000000 do cnt <- cnt + 1
  printfn "%d" cnt

  let strt = DateTime.Now.Ticks

(*  // the slow way of enumerating and counting...
  let primegen = primes() in let mutable answr = 0
  while primegen() <= limit do answr <- answr + 1
// *)

  // the fast way of counting...
  let answr = countPrimesTo (prime 2000000000)

  let elpsd = (DateTime.Now.Ticks - strt) / 10000L

  printfn "Found %d primes up to %d in %d milliseconds" answr limit elpsd

  0 // return an integer exit code

在具有两个内核/四个线程的 3.1 GHz 旧 Intel I3-2100 上运行的输出是:

前 23 个素数是: 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97
最多一百万个素数:78498
在 468 毫秒内找到 98222287 个素数到 2000000000

每次剔除操作大约需要 5.8 个 CPU 时钟周期(在此范围内进行十亿次剔除操作)。考虑到更多的真实(非超线程)线程、更高的 CPU 时钟速率和更新的 CPU 代以及改进的每时钟指令 (IPC),它将成比例地更快。

这大约是 DotNet 代码在此范围内的终极速度,但对于超过约 170 亿的更大范围,将剔除缓冲区大小调整为与被筛选的最大数量的平方根成比例的进一步细化将有助于保持如果整个范围都被筛分,而不仅仅是整个范围的较窄跨度,则范围会随着范围增加到巨大范围而增加速度,需要几天...几周...几个月才能完成。

于 2020-04-06T10:22:49.770 回答
0

Corei5 上 1 秒内 2 * 10^6

let n = 2 * (pown 10 6)
let sieve = Array.append [|0;0|] [|2..n|]

let rec filterPrime p = 
    seq {for mul in (p*2)..p..n do 
            yield mul}
        |> Seq.iter (fun mul -> sieve.[mul] <- 0)

    let nextPrime = 
        seq { 
            for i in p+1..n do 
                if sieve.[i] <> 0 then 
                    yield sieve.[i]
        }
        |> Seq.tryHead

    match nextPrime with
        | None -> ()
        | Some np -> filterPrime np

filterPrime 2

let primes = sieve |> Seq.filter (fun x -> x <> 0)
于 2015-11-07T20:59:38.543 回答