1

以下代码可以在此处讨论之后即时读取和写入 s3 :

from smart_open import open
import os

bucket_dir = "s3://my-bucket/annotations/"

with open(os.path.join(bucket_dir, "in.tsv.gz"), "rb") as fin:
    with open(
        os.path.join(bucket_dir, "out.tsv.gz"), "wb"
    ) as fout:
        for line in fin:
            l = [i.strip() for i in line.decode().split("\t")]
            string = "\t".join(l) + "\n"
            fout.write(string.encode())    

问题是,在处理了数千行(几分钟)后,我收到“对等方重置连接”错误:

    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))

我能做些什么?我尝试过fout.flush()每一次fout.write(string.encode()),但效果不佳。是否有更好的解决方案来处理大约 2 亿行的 .tsv 文件?

4

1 回答 1

0

我在smart_open. 这可以减轻Connection broke错误,但在某些情况下并不能完全解决它。

class Producer:
    def __init__(self, queue, bucket_dir, input_file):
        self.queue = queue
        self.bucket_dir = bucket_dir
        self.input_file = input_file

    def run(self):
        with open(os.path.join(self.bucket_dir, self.input_file), "rb") as fin:
            for line in tqdm(fin):
                while self.queue.full():
                    time.sleep(0.05)
                self.queue.put(line_to_write)
        self.queue.put("DONE")


class Consumer:
    def __init__(self, queue, bucket_dir, output_file):
        self.queue = queue
        self.bucket_dir = bucket_dir
        self.output_file = output_file

    def run(self):
        done = False
        to_write = ""
        count = 0
        with open(os.path.join(self.bucket_dir, self.output_file), "wb") as fout:
            while True:
                while self.queue.empty():
                    time.sleep(0.05)
                item = self.queue.get()
                if item == "DONE":
                    fout.write(to_write)
                    fout.flush()
                    self.queue.task_done()
                    return

                count += 1
                to_write += item
                if count % 256 == 0:  # batch write
                    fout.write(to_write.encode())
                    fout.flush()


def main(args):
    q = Queue(1024)

    producer = Producer(q, args.bucket_dir, args.input_file)
    producer_thread = threading.Thread(target=producer.run)

    consumer = Consumer(q, args.bucket_dir, args.output_file)
    consumer_thread = threading.Thread(target=consumer.run)

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()
    q.join()
于 2020-12-08T00:21:37.537 回答