我正在从 ldap 读取大量数据,这些数据需要与数据库中的相应记录进行比较。为了尽量减少 SQL 查询的数量,我想将多个 ldap 记录批处理到一个查询中。
所有这一切都非常简单:一个产生 ldap 结果的线程,以及一个使用这些结果并运行 SQL 查询的线程。
ldap_results = Queue.Queue(10) 定义生产者(): 对于 ldap_results() 中的结果: ldap_results.put(结果) 定义消费者(): 缓冲区 = [] 缓冲区大小 = 5 而真: 记录 = ldap_results.get() buffer.append(记录) 如果 len(buffer) >= buffer_size: do_sql(缓冲区) 缓冲区 = []
问题是:如果 ldap 只返回 3 个结果并且buffer_size
是 5 个结果,它将永远阻塞。我意识到我可以将一些特殊的标记放入缓冲区中,例如None
, 或"EOF"
,但这似乎是一个糟糕的设计:“迭代直到你完成,哦,除非你看到这个特殊值,这意味着你也完成了”。
我想出了两个替代的想法。首先是有一个共享eof
变量,但我不知道如何正确同步它。
定义生产者(): 而数据: 缓冲区.put() eof = 真 定义消费者(): 虽然不是 eof: 缓冲区.get()
第二个是ProduceChunks(chunk_size)
为生产者提供一个方法,它会处理结果的批处理,但我不喜欢这样,因为它假设生产者知道如何最好地缓冲结果,而实际上,我认为是消费者的责任。
有没有人有任何指导?