我一直在尝试使用 rq 对来自 BigQuery 的 API 请求进行排队,因为它们需要很长时间,我得到了 H12(超时)错误。当将数据帧传递给下一个队列时,代码会不断崩溃。
这是我的worker.py
文件:
import os
import redis
from rq import Worker, Queue, Connection
listen = ['high', 'default', 'low']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()
这是错误的来源:
data_full = q.enqueue(furnish_stops)
daily_count = q.enqueue(furnish_daily, data_full)
第一个函数所做的只是调用 api 将数据下载到 data_full 数据帧,然后将其传递给另一个函数以创建用于可视化目的的数组。
完整的错误报告:
Traceback (most recent call last):
File "app copy.py", line 29, in <module>
daily_count = q.enqueue(furnish_daily, data_full)
File "/home/alexis/.local/lib/python3.8/site-packages/rq/queue.py", line 502, in enqueue
return self.enqueue_call(
File "/home/alexis/.local/lib/python3.8/site-packages/rq/queue.py", line 400, in enqueue_call
return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
File "/home/alexis/.local/lib/python3.8/site-packages/rq/queue.py", line 560, in enqueue_job
job.save(pipeline=pipe)
File "/home/alexis/.local/lib/python3.8/site-packages/rq/job.py", line 648, in save
mapping = self.to_dict(include_meta=include_meta)
File "/home/alexis/.local/lib/python3.8/site-packages/rq/job.py", line 590, in to_dict
'data': zlib.compress(self.data),
File "/home/alexis/.local/lib/python3.8/site-packages/rq/job.py", line 270, in data
self._data = self.serializer.dumps(job_tuple)
TypeError: cannot pickle '_thread.lock' object
我已经用 python 版本 3.7.11 & 3.8.10 & 3.9.6 试过了,都得到了同样的错误
唯一提到类似问题的解决方案是在这个线程中,但降级到 3.7 的解决方案对我不起作用。