发布.py
import redis
import datetime
import time
import json
import sys
import threading
import gevent
from gevent import monkey
monkey.patch_all()
def main(chan):
redis_host = '10.235.13.29'
r = redis.client.StrictRedis(host=redis_host, port=6379)
while True:
def getpkg():
package = {'time': time.time(),
'signature' : 'content'
}
return package
#test 2: complex data
now = json.dumps(getpkg())
# send it
r.publish(chan, now)
print 'Sending {0}'.format(now)
print 'data type is %s' % type(now)
time.sleep(1)
def zerg_rush(n):
for x in range(n):
t = threading.Thread(target=main, args=(x,))
t.setDaemon(True)
t.start()
if __name__ == '__main__':
num_of_chan = 10
zerg_rush(num_of_chan)
cnt = 0
stop_cnt = 21
while True:
print 'Waiting'
cnt += 1
if cnt == stop_cnt:
sys.exit(0)
time.sleep(30)
子.py
import redis
import threading
import time
import json
import gevent
from gevent import monkey
monkey.patch_all()
def callback(ind):
redis_host = '10.235.13.29'
r = redis.client.StrictRedis(host=redis_host, port=6379)
sub = r.pubsub()
sub.subscribe(str(ind))
start = False
avg = 0
tot = 0
sum = 0
while True:
for m in sub.listen():
if not start:
start = True
continue
got_time = time.time()
decoded = json.loads(m['data'])
sent_time = float(decoded['time'])
dur = got_time - sent_time
tot += 1
sum += dur
avg = sum / tot
print decoded #'Recieved: {0}'.format(m['data'])
file_name = 'logs/sub_%s' % ind
f = open(file_name, 'a')
f.write('processing no. %s' % tot)
f.write('it took %s' % dur)
f.write('current avg: %s\n' % avg)
f.close()
def zerg_rush(n):
for x in range(n):
t = threading.Thread(target=callback, args=(x,))
t.setDaemon(True)
t.start()
def main():
num_of_chan = 10
zerg_rush(num_of_chan)
while True:
print 'Waiting'
time.sleep(30)
if __name__ == '__main__':
main()
我正在测试 redis pubsub 以替换使用 rsh 与远程框进行通信。
我测试过的一件事是影响发布和 pubsub.listen() 延迟的通道数量。
测试:每个频道一个发布者和一个订阅者(发布者每隔一秒发布一次)。增加通道数量并观察延迟(从发布者发布消息到订阅者通过监听获得消息的时间)
陈数--------------以秒为单位的平均延迟
10:---------------------------- -------0.004453
50:----------------------------------0.005246
100:--- ------------------------------0.0155
200:------ ----------------0.0221
300:-------------------------------------------- --0.0621
注意:在 2 CPU + 4GB RAM + 1 NICs RHEL6.4 VM 上测试。
我可以做些什么来保持大量通道的低延迟?
Redis 是单线程的,因此增加更多的 cpu 将无济于事。也许更多的内存?如果有,还要多少?
我能做的任何代码或瓶颈都在 Redis 本身?
也许限制来自我的测试代码使用线程编写的方式?
编辑: Redis Cluster vs ZeroMQ in Pub/Sub,用于水平扩展的分布式系统
接受的答案是“我猜你想最小化延迟。通道的数量是无关紧要的。关键因素是发布者的数量和订阅者的数量、消息大小、每个发布者每秒的消息数量、每个发布者接收到的消息数量"
根据我的测试,我不知道这是不是真的。(声称通道数量无关紧要)
例如,我做了一个测试。
1) 一个通道。100 个发布者发布到一个有 1 个订阅者收听的频道。Publisher 一次发布一秒钟。延迟为0.00965秒
2) 除 1000 个发布者外,相同的测试。延迟为0.00808秒
现在在我的频道测试期间:
300 个频道和 1 个 pub - 1 个 sub 导致0.0621,这只有 600 个连接,比上述测试少,但延迟显着减慢