1

如何使用 python 2.7 将 python 中的这个测试代码加速到 Winxp 上的 Redis?多处理会更好吗?6000/s 的加载速率与 100,000/s 的速率。我选择了 100,000,但在测试中可能会更低。该过程需要 15 秒。

更改服务器上的设置会有帮助吗???

import time
from time import strftime
import redis
import threading, Queue

start_time = time.time()
cxn = redis.StrictRedis('127.0.0.1',6379,1)


class WorkerMain(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while 1:
            try: # take a job from the queue            
                row  = self.queue.get_nowait()           
            except Queue.Empty: raise SystemExit

            try:
                cxn.set(row, "Row")
                #print (row, "Row")
            except: print 'Setup Error'

if __name__ == '__main__':
    connections =  5

    sml = range(1,100000)    
    queue = Queue.Queue() 
    for row in sml:
        queue.put(str(row))

    threads = []
    for dummy in range(connections):
        t = WorkerMain(queue)
        t.start()
        threads.append(t)

    # wait for all threads to finish
    for thread in threads:
        thread.join()


print 
end_time = time.time()
duration = end_time - start_time
print "Duration: %s" % duration

使用下面的代码进行多重处理并使用 CLI“监控”数据……并非所有数据都进入服务器。

from multiprocessing import Pool
import time
import redis

start_time = time.time()
cxn = redis.Redis('127.0.0.1',6379,1)

def rset(var):
    cxn.set(var,"value")

if __name__ =='__main__':   
    sml = range(1,10000)
    #for x in sml:print x

    pool = Pool(processes=5)
    for row in sml:
        pool.apply_async(rset, [(row,)])
        #print result.get(), 


end_time = time.time()
duration = end_time - start_time
print "Duration: %s" % duration

这是流水线代码……我只是注释掉了线程的东西。

from time import strftime
import redis
import threading, Queue

start_time = time.time()
cxn = redis.StrictRedis('127.0.0.1',6379,0)
pipe = cxn.pipeline(transaction=False)

class WorkerMain(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while 1:
            try: # take a job from the queue            
                row  = self.queue.get_nowait()           
            except Queue.Empty: raise SystemExit

            try:
                cxn.set(row, "Row")
                #print (row, "ROw")
            except: print 'Setup Error'

if __name__ == '__main__':
    #connections =  5

    sml = range(1,100000)    
    #queue = Queue.Queue() 
    for row in sml:
        #queue.put(str(row))
        pipe.set(str(row),"value").execute()# key, value

   # threads = []
   # for dummy in range(connections):
   #     t = WorkerMain(queue)
   #     t.start()
   #     threads.append(t)
   #     
   # # wait for all threads to finish
   # for thread in threads:
   #    thread.join()



print 
end_time = time.time()
duration = end_time - start_time
print "Duration: %s" % duration
4

2 回答 2

2

使用管道。管道批处理命令,因此您无需支付网络开销。

看 :

  1. 关于管道的部分在这里https://github.com/andymccurdy/redis-py
  2. Redis.io 上的流水线 - http://redis.io/topics/pipelining
于 2012-04-17T17:37:05.620 回答
1

如果因为 gil 而使用 cpython(标准 python 解释器),那么使用线程来获得更好的性能并不是一个好主意。

http://wiki.python.org/moin/GlobalInterpreterLock

多处理应该更好

于 2012-04-17T17:21:03.853 回答