1

我需要使用 Cassandra 的 Python DataStax 驱动程序插入大量数据。结果我不能使用 execute() 请求。execute_async( ) 要快得多。

但是我在调​​用 execute_async() 时遇到了丢失数据的问题。如果我使用 execute(),一切正常。但是,如果我使用 execute_async()(对于 SAME 插入查询),我的请求中只有大约 5-7% 正确执行(并且没有发生任何错误)。如果我在每个 1000 个插入请求之后添加 time.sleep(0.01)(通过使用 execute_async()),就可以了。

没有任何数据丢失(案例1):

for query in queries:
    session.execute( query )

没有任何数据丢失(案例2):

counter = 0
for query in queries:
    session.execute_async( query )
    counter += 1
    if counter % 1000 == 0:
        time.sleep( 0.01 )

数据丢失:

for query in queries:
    session.execute_async( query )

有什么理由可以吗?

集群有 2 个节点

[cqlsh 5.0.1 | 卡桑德拉 3.11.2 | CQL 规范 3.4.4 | 本机协议 v4]

DataStax Python 驱动程序版本 3.14.0

蟒蛇 3.6

4

1 回答 1

0

由于execute_async是非阻塞查询,因此您的代码在继续之前不会等待请求完成。在每次执行后添加 10 毫秒时,您可能没有观察到数据丢失的原因sleep是,在您读回数据之前,这为处理请求提供了足够的时间。

您需要在代码中等待请求完成,然后再读取数据,即:

futures = []
for query in queries:
    futures.push(session.execute(query))

for f in futures:
    f.result() # blocks until query is complete

您可能想要评估使用execute_concurrent提交许多查询并让驱动程序为您管理并发级别。

于 2018-09-02T18:31:59.280 回答