8

我正在尝试制作一个具有推送和弹出功能的队列结构。

我需要使用 10 个线程推送和另外 10 个线程弹出数据,就像我在下面的代码中所做的那样。

问题:

  1. 我需要打印出我推/弹出了多少,但我不知道该怎么做。
  2. 反正有没有加快我的代码?代码对我来说太慢了。
package main

import (
    "runtime"
    "time"
)

const (
    DATA_SIZE_PER_THREAD = 10000000
)

type Queue struct {
    records string
}


func (self Queue) push(record chan interface{}) {
    // need push counter
    record <- time.Now()
}

func (self Queue) pop(record chan interface{}) {
    // need pop counter
    <- record
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    //record chan
    record := make(chan interface{},1000000)
    //finish flag chan
    finish := make(chan bool)
    queue := new(Queue)
    for i:=0; i<10; i++ {
        go func() {
            for j:=0; j<DATA_SIZE_PER_THREAD; j++ {
                queue.push(record)
            }
            finish<-true
        }()
    }
    for i:=0; i<10; i++ {
        go func() {
            for j:=0; j<DATA_SIZE_PER_THREAD; j++ {
                queue.pop(record)
            }
            finish<-true
        }()
    }
    for i:=0; i<20; i++ {
        <-finish
    }
}
4

2 回答 2

15

您应该解决一些问题。

  • Queue 类型的方法应该有指针接收器。否则,每个方法调用都将创建当前队列类型的副本,并且对队列字段的任何更改都不会持续到方法调用本身之外。

  • 等待所有例程完成,可以使用sync.WaitGroup. 这正是它的设计目的。

  • 在队列类型中维护一个线程安全的推送/弹出计数器可以通过使用sync/atomic包来完成。

就速度而言,从您的示例来看,我不太确定您要达到的目标。如果您稍微详细说明一下,可能会出现任何优化。

这是我从您的代码中修改的示例:

package main

import (
    "log"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

const SizePerThread = 10000000

type Queue struct {
    records string
    count   int64
}

func (q *Queue) push(record chan interface{}) {
    record <- time.Now()

    newcount := atomic.AddInt64(&q.count, 1)
    log.Printf("Push: %d", newcount)
}

func (q *Queue) pop(record chan interface{}) {
    <-record

    newcount := atomic.AddInt64(&q.count, -1)
    log.Printf("Pop: %d", newcount)
}

func main() {
    var wg sync.WaitGroup

    runtime.GOMAXPROCS(runtime.NumCPU())

    record := make(chan interface{}, 1000000)
    queue := new(Queue)

    // We are launching 20 goroutines.
    // Let the waitgroup know it should wait for as many
    // of them to finish.
    wg.Add(20)

    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()

            for j := 0; j < SizePerThread; j++ {
                queue.push(record)
            }
        }()

        go func() {
            defer wg.Done()

            for j := 0; j < SizePerThread; j++ {
                queue.pop(record)
            }
        }()
    }

    // Wait for all goroutines to finish.
    wg.Wait()
}
于 2012-09-04T15:37:20.453 回答
-5

对问题 1 的回答:正如 jimt 所建议的,sync/atomic 具有自动更新计数器的功能,这可能对您有用。

问题 2 的答案:减小 DATA_SIZE_PER_THREAD 的值,或者更好的是,使用程序

package main
func main() {}

它以更有效的方式产生与您的程序相同的输出。

不过说真的,我知道您编写了一个小程序来探索一些概念。但是,您的程序包含许多问题。现在不是担心速度的时候,而是学习一些基本概念的时候。

于 2012-09-09T20:18:46.567 回答