0

我有一个大约 200,000 个实体的列表,我需要为每个实体查询一个特定的 RESTful API,并最终将所有 200,000 个实体以 JSON 格式保存在 txt 文件中。最简单的做法是遍历 200,000 个实体的列表并逐个查询,将返回的 JSON 添加到列表中,完成后,将所有内容添加到文本文件中。就像是:

from apiWrapper import api
from entities import listEntities #list of the 200,000 entities
a=api()
fullEntityList=[]
for entity in listEntities:
fullEntityList.append(a.getFullEntity(entity))

with open("fullEntities.txt","w") as f:
    simplejson.dump(fullEntityList,f)

显然这是不可靠的,因为对 API 的 200,000 次查询大约需要 10 个小时左右,所以我猜在将其写入文件之前会导致错误。我想正确的方法是分块编写,但不确定如何实现。有任何想法吗?另外,我不能用数据库做到这一点。

4

3 回答 3

2

我建议将它们写入 SQLite 数据库。这就是我为自己的小型网络蜘蛛应用程序做的方式。因为您可以很容易地查询密钥,并检查您已经检索到哪些密钥。这样,您的应用程序可以轻松地从中断的地方继续。特别是如果您下周添加了大约 1000 个新条目。

从一开始就将“恢复”设计到您的应用程序中。如果有一些意外的异常(比如,由于网络拥塞而超时),您不想从头开始重新启动,而只希望那些您尚未成功检索的查询。在 200.000 次查询时,99.9% 的正常运行时间意味着您必须预期 200 次失败!

为了空间效率和性能,使用压缩格式可能会有所回报,例如在将 json 转储到数据库 blob 之前使用 zlib 压缩 json。

SQLite 是一个不错的选择,除非您的爬虫同时在多个主机上运行。对于单个应用程序,sqlite 是完美的。

于 2013-01-14T07:00:56.067 回答
1

简单的方法是以'a'(附加)模式打开文件,并在它们进入时一一写入。

更好的方法是使用作业队列。这将允许您产生a.getFullEntity对工作线程的调用并在/如果它们返回时处理您想要的结果,或者安排重试失败等。请参阅Queue

于 2013-01-14T05:22:29.680 回答
0

我还将使用一个单独的线程来进行文件写入,并用于Queue记录所有实体。刚开始的时候,我以为这会在 5 分钟内完成,但后来发现有点困难。simplejson 和我知道的所有其他此类库不支持部分写入,因此您不能先编写列表的一个元素,然后再添加另一个等。因此,我尝试手动解决此问题,方法是[分别写入文件,]然后分别倾倒每个实体。

如果无法检查它(因为我没有你的 api),你可以尝试:

import threading
import Queue
import simplejson
from apiWrapper import api
from entities import listEntities #list of the 200,000 entities

CHUNK_SIZE = 1000

class EntityWriter(threading.Thread):
    lines_written = False
    _filename = "fullEntities.txt"

    def __init__(self, queue):
        super(EntityWriter, self).__init()
        self._q = queue
        self.running = False

    def run(self):
        self.running = True
        with open(self._filename,"a") as f:
            while True:
                try:
                    entity = self._q.get(block=False)
                    if not EntityWriter.lines_written:
                        EntityWriter.lines_written = True
                        f.write("[")
                        simplejson.dump(entity,f)
                    else:
                        f.write(",\n")
                        simplejson.dump(entity,f)
                except Queue.Empty:
                    break
        self.running = False

    def finish_file(self):
         with open(self._filename,"a") as f:
             f.write("]")


a=api()
fullEntityQueue=Queue.Queue(2*CHUNK_SIZE)
n_entities = len(listEntities)
writer = None
for i, entity in listEntities:
    fullEntityQueue.append(a.getFullEntity(entity))
    if (i+1) % CHUNK_SIZE == 0 or i == n_entities-1:
        if writer is None or not writer.running:
            writer = EntityWriter(fullEntityQueue)
            writer.start()
writer.join()
writer.finish_file()

这个脚本的作用

主循环仍然遍历您的实体列表,获取每个实体的完整信息。之后,每个实体现在都被放入一个队列中。每 1000 个实体(在列表的末尾),就会启动一个与主线程并行运行的 EntityWriter-Thread。此 EntityWriterget来自Queue并将其转储到所需的输出文件。

需要一些额外的逻辑来使 JSON 成为一个列表,如上所述,我是手动编写[的。原则上,当你重新加载它时,结果文件应该被理解。,]simplejson

于 2013-01-14T08:27:33.413 回答