0

嗨,我正在尝试在 Python 中开发一个具有无限存储桶容量的leakybucket。我希望它是线程安全的和 CPU 高效的,具有最少的线程数。它现在通常可以工作。但是有一些小错误。

我将带宽限制为 500 kbps。但第三行似乎打破了这一点。另外,谁能告诉我这是否是实现leakybucket的正确方法?谢谢。

费率:500.00 费率:500.00 费率:550.00 费率:500.00 费率:500.00 费率:500.00 费率:500.00 费率:500.00

代码在这里:

from collections import deque
import threading, time
class LeakyBucket:
    '''the leaky bucket throttling the bit rate'''

    def __init__(self, node, bitsPerSec, measIntv, LBtype):
        self.node = node
        self.bitsPerSec = bitsPerSec  #the rate limit
        self.measIntv = measIntv      #the measure interval, tokens will become full at the beginning of each interval
        self.LBtype = LBtype          #the type of the bucket  
        self.lastTime = 0             #the start time of the last measure interval
        self.bitsDone = 0             #the bits that have been transmitted
        self.BDLock = threading.Lock() #the lock for the bits sent
        self.packDQ = deque()          #the packet Q 
        self.maxToken = bitsPerSec*float(measIntv)  #the max token (bits)
        self.token = self.maxToken      #the current token
        self.condition = threading.Condition()  #sync lock


    def packIn(self, msg):
        '''Insert a packet'''
        self.condition.acquire()
        self.packDQ.append(msg)
        self.condition.notify()
        self.condition.release()

    def keepPoping(self):
        '''keep poping new pack'''
        self.lastTime = time.time()   #record the start time
        while True:
            timeNow = time.time() 
            if  timeNow - self.lastTime > self.measIntv:     
                #new intv, need to reset token
                self.token = self.maxToken
                self.lastTime = timeNow
            self.condition.acquire()
            if self.packDQ: # the queue is not empty
                pack = list(self.packDQ)[0]
                packLen = len(pack[2])*8
                if  packLen > self.token:   #no enough token?
                    #self.packDQ.popleft()
                    self.condition.release()
                    time.sleep(max(self.lastTime+self.measIntv-time.time(),0)) #wait for enough token
                    else:              #enough token, can send out the packet
                    self.packDQ.popleft()
                    self.condition.release()
                    self.changeBitsDone(packLen)
                    self.token = self.token - packLen #consume token
            else:
                self.condition.wait()
                self.condition.release()

    def begin(self):
        '''begin the leakybucket'''
        aThread = threading.Thread(target = self.keepPoping, args = [])
        aThread.start()


    def getBitsDone(self):
        '''get and reset bitsDone, for testing'''
        self.BDLock.acquire()
        reV = self.bitsDone
        self.bitsDone = 0
        self.BDLock.release()
        return reV

    def changeBitsDone(self,length):
        '''change bitsDone, for testing'''
        self.BDLock.acquire()
        self.bitsDone += length
        self.BDLock.release()

    def measure(self, intv):
        '''measure the throughput of the leaky bucket'''
        while True:
            bitsDone = self.getBitsDone()
            rate = bitsDone / float(intv*1024)
            print 'rate: %.2f' % rate
            time.sleep(intv)

    def startMeasure(self, intv):
        '''start measure the rate'''
        #print 'here'
        aThread = threading.Thread(target = self.measure, args = [intv])
        aThread.start()        

#===============================
def main():
    pack = 1000*'a'
    msg = ('192.168.1.1', 16000, pack)
    print 'here'
    LB = LeakyBucket(None, 500*1024, 1, 'reg')
    LB.begin()
    LB.startMeasure(10)
    numMsg = 0

    while numMsg < 10000:
        LB.packIn(msg)
        #print 'pack in'
        numMsg += 1

if __name__ == '__main__':
    main()
4

0 回答 0