1

我写了一段多处理代码。连接到我正在运行 32 个查询以获取数据的 cassandra。我试图使用 python 中的多处理库来并行化获取。代码看起来像这样。

    from cassandra.cluster import Cluster
    cluster = Cluster(['xyz'])
    session = cluster.connect()

    query = session.prepare('SELECT stuff')
    session.default_timeout = 600000
    session.default_fetch_size = 100
    queries = [
        session.execute_async(query, ['2021-10-19'] + [i])
        for i in range(32)
    ]
    pool = mp.Pool(32)
    inter_obj = pool.map_async(compute, queries)
    inter_obj.wait()
    res = inter_obj.get()

    pool.close()
    pool.join()
    final_response = reduce(aggregate, res)
    resp = json.dumps(final_response, sort_keys=True, indent=4).encode("utf-8")
    print("RESPONSE", resp)

在运行程序时,它在 wait() 上出错

Traceback (most recent call last):
  File "/usr/local/bin/date-run", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/sc_eol/run_stuff.py", line 75, in main
    res = inter_obj.get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 768, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object
4

1 回答 1

2

execute_async()返回一个ResponseFuture对象。你最好建立一个“期货”清单:

futures = []
query = ...
for ... :
    futures.append(session.execute_async(query, ...)

这种方法同时执行查询。然后,您可以使用以下方法迭代结果:

for future in futures:
    rows = future.result()
    # insert processing here

调用result()被阻塞,直到请求返回 (a) 结果或 (b) 错误。

有关详细信息,请参阅 Cassandra Python 驱动程序入门指南。干杯!

于 2021-10-21T00:13:35.097 回答