34

我需要对系统进行压力测试,而http://locust.io似乎是解决此问题的最佳方法。但是,看起来它设置为每次都使用同一个用户。我需要每个 spawn 以不同的用户身份登录。我该如何设置呢?或者,是否有另一个系统可以很好地使用?

4

4 回答 4

61

蝗虫作者在这里。

默认情况下,每个 HttpLocust 用户实例都有一个 HTTP 客户端,它有自己的单独会话

Locust 没有提供用户凭据列表或类似内容的任何功能。但是,您的负载测试脚本只是 python 代码,幸运的是,您自己实现它很简单。

这是一个简短的示例:

# locustfile.py

from locust import HttpLocust, TaskSet, task

USER_CREDENTIALS = [
    ("user1", "password"),
    ("user2", "password"),
    ("user3", "password"),
]

class UserBehaviour(TaskSet):
    def on_start(self):
        if len(USER_CREDENTIALS) > 0:
            user, passw = USER_CREDENTIALS.pop()
            self.client.post("/login", {"username":user, "password":passw})

    @task
    def some_task(self):
        # user should be logged in here (unless the USER_CREDENTIALS ran out)
        self.client.get("/protected/resource")

class User(HttpLocust):
    task_set = UserBehaviour
    min_wait = 5000
    max_wait = 60000

上面的代码在运行 Locust 分布式时不起作用,因为相同的代码在每个从节点上运行,并且它们不共享任何状态。因此,您必须引入一些外部数据存储,从属节点可以用来共享状态(例如 PostgreSQL、redis、memcached 或其他)。

于 2014-04-12T19:49:09.677 回答
14

或者,您可以创建users.py模块来保存您在测试用例中需要的用户信息,在我的示例中,它包含emailcookies. 然后你可以在你的任务中随机调用它们。见下文:

# locustfile.py
from locust import HttpLocust, TaskSet, task
from user_agent import *
from users import users_info


class UserBehaviour(TaskSet):
    def get_user(self):
        user = random.choice(users_info)
        return user

    @task(10)
    def get_siparislerim(self):
        user = self.get_user()
        user_agent = self.get_user_agent()
        r = self.client.get("/orders", headers = {"Cookie": user[1], 'User-Agent': user_agent})

class User(HttpLocust):
    task_set = UserBehaviour
    min_wait = 5000
    max_wait = 60000

用户和用户代理可以由函数调用。通过这种方式,我们可以将测试分发给许多用户和不同的用户代理。

# users.py

users_info = [
['performancetest.1441926507@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926506@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926501@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926499@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926494@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926493@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926492@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926491@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926490@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926489@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926487@gmail.com', 'cookies_created_by_each_user']] 
于 2015-10-07T10:13:35.243 回答
4

在这里支持@heyman 的回答。示例代码可以工作,但继续启动/停止测试最终会清除USER_CREDENTIALS列表,并开始抛出错误。

我最终添加了以下内容:

from locust import events # in addition to the other locust modules needed

def hatch_complete_handler(**kw):
    global USER_CREDENTIALS
    USER_CREDENTIALS = generate_users() # some function here to regenerate your list

events.hatch_complete += hatch_complete_handler

一旦您的群体完成孵化,这将刷新您的用户列表。

另请记住,您需要的列表比您希望生成的用户数更长。

于 2017-06-09T00:20:29.607 回答
4

在为分布式系统实现此功能时,我采用了稍微不同的方法。我使用了一个非常简单的烧瓶服务器,我在 TaskSet 的 on_start 部分调用了它。

from flask import Flask, jsonify
app = Flask(__name__)

count = 0    #Shared Variable

@app.route("/")
def counter():
    global count

    count = count+1
    tenant = count // 5 + 1
    user = count % 5 + 1

    return jsonify({'count':count,'tenant':"load_tenant_{}".format(str(tenant)),'admin':"admin",'user':"load_user_{}".format(str(user))})

if __name__ == "__main__":
    app.run()

通过这种方式,我现在有一个端点,我可以在我运行它的任何主机上访问http://localhost:5000/ 。我只需要让从属系统可以访问这个端点,我就不必担心重复用户或由有限的 user_info 集引起的某种类型的循环效应。

于 2016-03-22T14:25:26.127 回答