我正在尝试使用 rq 模块生成的队列来测试对 Flask 服务器的发布请求。Rq的队列不断提示这个奇怪的错误:
Traceback (most recent call last):
File "/home/dor/Documents/workspace/NErlNet/src_py/apiServerNew/TotallyNew/test2.py", line 14, in <module>
result2 = trans.testQueue(baseReceiverAddress + '/testQueue')
File "/home/dor/Documents/workspace/NErlNet/src_py/apiServerNew/TotallyNew/transmitter.py", line 23, in testQueue
result = globe.queue.enqueue(testPost(address, 0), args=[])
File "/home/dor/anaconda3/lib/python3.8/site-packages/rq/queue.py", line 500, in enqueue
on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
File "/home/dor/anaconda3/lib/python3.8/site-packages/rq/queue.py", line 466, in parse_args
if not isinstance(f, string_types) and f.__module__ == '__main__':
AttributeError: 'tuple' object has no attribute '__module__'
我很乐意获得有关此错误的帮助。这些是我的文件:transmitter.py:
import requests
import globalVars as globe
import queue
from threading import Thread
import redis
from rq import Queue
import time
#DEFAULT_PORT = 8095
def testPost(address, payloadNum):
payload = {'test' : payloadNum}
response = requests.post(address,data = payload)
#Return true, if received: HTTP status code < 400
#Return the HTTP status code for the response
#Return the reponse in JSON format
return(response.ok, response.status_code, response.json())
def testQueue(address):
result = globe.queue.enqueue(testPost(address, 0), args=[])
def wait():
while not globe.ackQueue.empty(): #While the queue is NOT empty
pass
if __name__ == "__main__":
print('transmitter')
接收服务器.py:
from flask import Flask
from flask_restful import Api, Resource
import globalVars as globe
import redis
from rq import Queue
import queue
receiver = Flask(__name__)
api = Api(receiver)
def runReceiver():
receiver.run(debug=True, threaded=False)
class test(Resource):
def post(self):
return {'Test' : 'Passed!'} #Returns the response in JSON format
class testQueue(Resource):
def post(self):
return {'Test' : 'Passed!'}
#Listener Server list of resources:
api.add_resource(test, "/test")
api.add_resource(testQueue, "/testQueue")
if __name__ == "__main__":
runReceiver()
全局变量.py
import redis
from rq import Queue
r = redis.Redis()
queue = Queue(connection=r)
测试2.py:
from flask import Flask
from flask_restful import Api, Resource
import transmitter as trans
import receiverServer as receiver
import globalVars as globe
defAddress = 'https://httpbin.org/post' #httpbin is a website aimed for testing HTTP requests
baseReceiverAddress = 'http://127.0.0.1:5000'
if __name__ == "__main__":
result1 = trans.testPost(baseReceiverAddress + '/test', 0)
print(result1)
result2 = trans.testQueue(baseReceiverAddress + '/testQueue')
print(result2)
我尝试更改testPost函数几次,但错误不断出现...请帮我解决这个问题!