1

我想在 Racket 中实现一个并行映射功能。地方似乎是建立在正确基础上的东西,但它们对我来说是未知的领域。我认为代码应该如下所示。

#lang racket

; return xs split into n sublists
(define (chunk-into n xs)
  (define N (length xs))
  (cond [(= 1 n) (list xs)]
        [(> n N) 
         (cons empty 
               (chunk-into (sub1 n) xs))]
        [else
         (define m (ceiling (/ N n)))
         (cons (take xs m) 
               (chunk-into (sub1 n) (drop xs m)))]))

(module+ test
  (check-equal? (length (chunk-into 4 (range 5))) 4)
  (check-equal? (length (chunk-into 2 (range 5))) 2))

(define (parallel-map f xs)
  (define n-cores (processor-count))
  (define xs* (chunk-into n-cores xs))
  (define ps 
    (for/list ([i n-cores])
      (place ch
             (place-channel-put 
              ch
              (map f
               (place-channel-get ch))))))
  (apply append (map place-channel-put ps xs*)))

这给出了错误:

f:在以下情况下使用的标识符:f

我见过的所有示例都显示了一种设计模式,即提供一个不带参数的主函数,以某种方式习惯于实例化其他位置,但这确实很麻烦,所以我正在积极尝试避免它。这可能吗?

注意:我还尝试使用期货制作并行地图。不幸的是,对于我所有的测试,它实际上都比 map 慢(我尝试使用 fib 的递归过程版本进行测试),但如果你有任何建议可以让它更快。

(define (parallel-map f xs)
  (define xs** (chunk-into (processor-count) xs))
  (define fs (map (λ (xs*) (future (thunk (map f xs*)))) xs**))
  (apply append (map touch fs)))
4

1 回答 1

1

我以前使用过场所,但从来没有将函数作为参数传递给场所。我能够想出以下使用 eval 的相当粗略的代码:

#!/usr/bin/env racket
#lang racket

(define (worker pch) 
  (define my-id (place-channel-get pch)) ; get worker id
  (define wch-w (place-channel-get pch)) ; get work channel (shared between controller and all workers) - worker side
  (define f     (place-channel-get pch)) ; get function
  (define ns    (make-base-namespace))   ; for eval
  (let loop ()
    (define n (place-channel-get wch-w)) ; get work order
    (let ((res (eval `(,f ,n) ns)))      ; need to use eval here !!
      (eprintf "~a says ~a\n" my-id res)
      (place-channel-put wch-w  res)     ; put response
      (loop))))                          ; loop forever

(define (parallel-map f xs)  
  (define l (length xs))
  (define-values (wch-c wch-w) (place-channel))    ; create channel (2 endpoints) for work dispatch (a.k.a. shared queue)
  (for ((i (in-range (processor-count))))
    (define p (place pch (worker pch)))            ; create place
    (place-channel-put p (format "worker_~a" i))   ; give worker id
    (place-channel-put p wch-w)                    ; give response channel
    (place-channel-put p f))                       ; give function
  (for ((n xs))
    (place-channel-put wch-c n))                   ; create work orders
  (let loop ((i 0) (res '()))                      ; response loop
    (if (= i l)
        (reverse res)
        (let ((response (sync/timeout 10 wch-c)))  ; get answer with timeout (place-channel-get blocks!)
          (loop 
           (+ i 1) 
           (if response (cons response res) res))))))

(module+ main 
  (displayln (parallel-map 'add1 (range 10))))

在控制台中运行会给出,例如:

worker_1 says 1
worker_1 says 3
worker_1 says 4
worker_1 says 5
worker_1 says 6
worker_1 says 7
worker_1 says 8
worker_1 says 9
worker_1 says 10
worker_0 says 2
(1 3 4 5 6 7 8 9 10 2)

正如我所说,粗鲁。欢迎所有建议!

于 2013-09-30T13:34:41.953 回答