1

发布.py

import redis
import datetime
import time
import json
import sys

import threading
import gevent
from gevent import monkey
monkey.patch_all()

def main(chan):
    redis_host = '10.235.13.29'
    r = redis.client.StrictRedis(host=redis_host, port=6379)
    while True:
        def getpkg():
            package = {'time': time.time(),
                        'signature' : 'content'
                      }

            return package

        #test 2: complex data
        now = json.dumps(getpkg())

        # send it
        r.publish(chan, now)
        print 'Sending {0}'.format(now)
        print 'data type is %s' % type(now)
        time.sleep(1)

def zerg_rush(n):
    for x in range(n):
        t = threading.Thread(target=main, args=(x,))
        t.setDaemon(True)
        t.start()

if __name__ == '__main__':
    num_of_chan = 10
    zerg_rush(num_of_chan)
    cnt = 0
    stop_cnt = 21
    while True:
        print 'Waiting'
        cnt += 1
        if cnt == stop_cnt:
            sys.exit(0)
        time.sleep(30)

子.py

import redis
import threading
import time
import json
import gevent
from gevent import monkey
monkey.patch_all()

def callback(ind):
    redis_host = '10.235.13.29'
    r = redis.client.StrictRedis(host=redis_host, port=6379)
    sub = r.pubsub()
    sub.subscribe(str(ind))
    start = False
    avg = 0
    tot = 0
    sum = 0
    while True:
        for m in sub.listen():
            if not start:
                start = True
                continue
            got_time = time.time()
            decoded = json.loads(m['data'])
            sent_time = float(decoded['time'])
            dur = got_time - sent_time
            tot += 1
            sum += dur
            avg = sum / tot

            print decoded #'Recieved: {0}'.format(m['data'])
            file_name = 'logs/sub_%s' % ind
            f = open(file_name, 'a')
            f.write('processing no. %s' % tot)
            f.write('it took %s' % dur)
            f.write('current avg: %s\n' % avg)
            f.close()

def zerg_rush(n):
    for x in range(n):
        t = threading.Thread(target=callback, args=(x,))
        t.setDaemon(True)
        t.start()

def main():
    num_of_chan = 10
    zerg_rush(num_of_chan)
    while True:
        print 'Waiting'
        time.sleep(30)

if __name__ == '__main__':
    main()

我正在测试 redis pubsub 以替换使用 rsh 与远程框进行通信。
我测试过的一件事是影响发布和 pubsub.listen() 延迟的通道数量。

测试:每个频道一个发布者和一个订阅者(发布者每隔一秒发布一次)。增加通道数量并观察延迟(从发布者发布消息到订阅者通过监听获得消息的时间)

陈数--------------以秒为单位的平均延迟
10:---------------------------- -------0.004453
50:----------------------------------0.005246
100:--- ------------------------------0.0155
200:------ ----------------0.0221
300:-------------------------------------------- --0.0621

注意:在 2 CPU + 4GB RAM + 1 NICs RHEL6.4 VM 上测试。

  1. 我可以做些什么来保持大量通道的低延迟?

  2. Redis 是单线程的,因此增加更多的 cpu 将无济于事。也许更多的内存?如果有,还要多少?

  3. 我能做的任何代码或瓶颈都在 Redis 本身?

  4. 也许限制来自我的测试代码使用线程编写的方式?

编辑: Redis Cluster vs ZeroMQ in Pub/Sub,用于水平扩展的分布式系统

接受的答案是“我猜你想最小化延迟。通道的数量是无关紧要的。关键因素是发布者的数量和订阅者的数量、消息大小、每个发布者每秒的消息数量、每个发布者接收到的消息数量"

根据我的测试,我不知道这是不是真的。(声称通道数量无关紧要)
例如,我做了一个测试。

1) 一个通道。100 个发布者发布到一个有 1 个订阅者收听的频道。Publisher 一次发布一秒钟。延迟为0.00965
2) 除 1000 个发布者外,相同的测试。延迟为0.00808

现在在我的频道测试期间:
300 个频道和 1 个 pub - 1 个 sub 导致0.0621,这只有 600 个连接,比上述测试少,但延迟显着减慢

4

0 回答 0