我正在从 ldap 读取大量数据,这些数据需要与数据库中的相应记录进行比较。为了尽量减少 SQL 查询的数量,我想将多个 ldap 记录批处理到一个查询中。
所有这一切都非常简单:一个产生 ldap 结果的线程,以及一个使用这些结果并运行 SQL 查询的线程。
ldap_results = Queue.Queue(10)
定义生产者():
对于 ldap_results() 中的结果:
ldap_results.put(结果)
定义消费者():
缓冲区 = []
缓冲区大小 = 5
而真:
记录 = ldap_results.get()
buffer.append(记录)
如果 len(buffer) >= buffer_size:
do_sql(缓冲区)
缓冲区 = []
问题是:如果 ldap 只返回 3 个结果并且buffer_size是 5 个结果,它将永远阻塞。我意识到我可以将一些特殊的标记放入缓冲区中,例如None, 或"EOF",但这似乎是一个糟糕的设计:“迭代直到你完成,哦,除非你看到这个特殊值,这意味着你也完成了”。
我想出了两个替代的想法。首先是有一个共享eof变量,但我不知道如何正确同步它。
定义生产者():
而数据:
缓冲区.put()
eof = 真
定义消费者():
虽然不是 eof:
缓冲区.get()
第二个是ProduceChunks(chunk_size)为生产者提供一个方法,它会处理结果的批处理,但我不喜欢这样,因为它假设生产者知道如何最好地缓冲结果,而实际上,我认为是消费者的责任。
有没有人有任何指导?