我需要对系统进行压力测试,而http://locust.io似乎是解决此问题的最佳方法。但是,看起来它设置为每次都使用同一个用户。我需要每个 spawn 以不同的用户身份登录。我该如何设置呢?或者,是否有另一个系统可以很好地使用?
4 回答
蝗虫作者在这里。
默认情况下,每个 HttpLocust 用户实例都有一个 HTTP 客户端,它有自己的单独会话。
Locust 没有提供用户凭据列表或类似内容的任何功能。但是,您的负载测试脚本只是 python 代码,幸运的是,您自己实现它很简单。
这是一个简短的示例:
# locustfile.py
from locust import HttpLocust, TaskSet, task
USER_CREDENTIALS = [
("user1", "password"),
("user2", "password"),
("user3", "password"),
]
class UserBehaviour(TaskSet):
def on_start(self):
if len(USER_CREDENTIALS) > 0:
user, passw = USER_CREDENTIALS.pop()
self.client.post("/login", {"username":user, "password":passw})
@task
def some_task(self):
# user should be logged in here (unless the USER_CREDENTIALS ran out)
self.client.get("/protected/resource")
class User(HttpLocust):
task_set = UserBehaviour
min_wait = 5000
max_wait = 60000
上面的代码在运行 Locust 分布式时不起作用,因为相同的代码在每个从节点上运行,并且它们不共享任何状态。因此,您必须引入一些外部数据存储,从属节点可以用来共享状态(例如 PostgreSQL、redis、memcached 或其他)。
或者,您可以创建users.py
模块来保存您在测试用例中需要的用户信息,在我的示例中,它包含email
和cookies
. 然后你可以在你的任务中随机调用它们。见下文:
# locustfile.py
from locust import HttpLocust, TaskSet, task
from user_agent import *
from users import users_info
class UserBehaviour(TaskSet):
def get_user(self):
user = random.choice(users_info)
return user
@task(10)
def get_siparislerim(self):
user = self.get_user()
user_agent = self.get_user_agent()
r = self.client.get("/orders", headers = {"Cookie": user[1], 'User-Agent': user_agent})
class User(HttpLocust):
task_set = UserBehaviour
min_wait = 5000
max_wait = 60000
用户和用户代理可以由函数调用。通过这种方式,我们可以将测试分发给许多用户和不同的用户代理。
# users.py
users_info = [
['performancetest.1441926507@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926506@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926501@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926499@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926494@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926493@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926492@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926491@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926490@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926489@gmail.com', 'cookies_created_by_each_user'],
['performancetest.1441926487@gmail.com', 'cookies_created_by_each_user']]
在这里支持@heyman 的回答。示例代码可以工作,但继续启动/停止测试最终会清除USER_CREDENTIALS
列表,并开始抛出错误。
我最终添加了以下内容:
from locust import events # in addition to the other locust modules needed
def hatch_complete_handler(**kw):
global USER_CREDENTIALS
USER_CREDENTIALS = generate_users() # some function here to regenerate your list
events.hatch_complete += hatch_complete_handler
一旦您的群体完成孵化,这将刷新您的用户列表。
另请记住,您需要的列表比您希望生成的用户数更长。
在为分布式系统实现此功能时,我采用了稍微不同的方法。我使用了一个非常简单的烧瓶服务器,我在 TaskSet 的 on_start 部分调用了它。
from flask import Flask, jsonify
app = Flask(__name__)
count = 0 #Shared Variable
@app.route("/")
def counter():
global count
count = count+1
tenant = count // 5 + 1
user = count % 5 + 1
return jsonify({'count':count,'tenant':"load_tenant_{}".format(str(tenant)),'admin':"admin",'user':"load_user_{}".format(str(user))})
if __name__ == "__main__":
app.run()
通过这种方式,我现在有一个端点,我可以在我运行它的任何主机上访问http://localhost:5000/ 。我只需要让从属系统可以访问这个端点,我就不必担心重复用户或由有限的 user_info 集引起的某种类型的循环效应。