如何使用 python 2.7 将 python 中的这个测试代码加速到 Winxp 上的 Redis?多处理会更好吗?6000/s 的加载速率与 100,000/s 的速率。我选择了 100,000,但在测试中可能会更低。该过程需要 15 秒。
更改服务器上的设置会有帮助吗???
import time
from time import strftime
import redis
import threading, Queue
start_time = time.time()
cxn = redis.StrictRedis('127.0.0.1',6379,1)
class WorkerMain(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while 1:
try: # take a job from the queue
row = self.queue.get_nowait()
except Queue.Empty: raise SystemExit
try:
cxn.set(row, "Row")
#print (row, "Row")
except: print 'Setup Error'
if __name__ == '__main__':
connections = 5
sml = range(1,100000)
queue = Queue.Queue()
for row in sml:
queue.put(str(row))
threads = []
for dummy in range(connections):
t = WorkerMain(queue)
t.start()
threads.append(t)
# wait for all threads to finish
for thread in threads:
thread.join()
print
end_time = time.time()
duration = end_time - start_time
print "Duration: %s" % duration
使用下面的代码进行多重处理并使用 CLI“监控”数据……并非所有数据都进入服务器。
from multiprocessing import Pool
import time
import redis
start_time = time.time()
cxn = redis.Redis('127.0.0.1',6379,1)
def rset(var):
cxn.set(var,"value")
if __name__ =='__main__':
sml = range(1,10000)
#for x in sml:print x
pool = Pool(processes=5)
for row in sml:
pool.apply_async(rset, [(row,)])
#print result.get(),
end_time = time.time()
duration = end_time - start_time
print "Duration: %s" % duration
这是流水线代码……我只是注释掉了线程的东西。
from time import strftime
import redis
import threading, Queue
start_time = time.time()
cxn = redis.StrictRedis('127.0.0.1',6379,0)
pipe = cxn.pipeline(transaction=False)
class WorkerMain(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while 1:
try: # take a job from the queue
row = self.queue.get_nowait()
except Queue.Empty: raise SystemExit
try:
cxn.set(row, "Row")
#print (row, "ROw")
except: print 'Setup Error'
if __name__ == '__main__':
#connections = 5
sml = range(1,100000)
#queue = Queue.Queue()
for row in sml:
#queue.put(str(row))
pipe.set(str(row),"value").execute()# key, value
# threads = []
# for dummy in range(connections):
# t = WorkerMain(queue)
# t.start()
# threads.append(t)
#
# # wait for all threads to finish
# for thread in threads:
# thread.join()
print
end_time = time.time()
duration = end_time - start_time
print "Duration: %s" % duration