3

我听说 Python 多线程有点棘手,我不确定实现我需要的最佳方法是什么。假设我有一个调用函数IO_intensive_function,它执行一些 API 调用,可能需要一段时间才能得到响应。

假设排队作业的过程可能如下所示:

import thread
for job_args in jobs:
    thread.start_new_thread(IO_intense_function, (job_args))

现在会IO_intense_function在后台执行它的任务并让我排队等待更多的工作吗?

我还查看了这个问题,似乎方法就是执行以下操作:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(2)
results = pool.map(IO_intensive_function, jobs)

因为我不需要这些任务相互通信,所以唯一的目标是尽可能快地发送我的 API 请求。这是最有效的方法吗?谢谢。

编辑:我发出 API 请求的方式是通过 Thrift 服务。

4

2 回答 2

1

我最近不得不创建代码来做类似的事情。我试图在下面使它通用。注意我是一个新手编码,所以请原谅我的不雅。但是,您可能会发现有价值的是我发现必须嵌入一些错误处理以捕获断开连接等。

我还发现以线程方式执行 json 处理很有价值。您有为您工作的线程,所以当您可以并行提取信息时,为什么要再次“串行”进行处理步骤。

有可能我在使其通用时编码错误。请不要犹豫,询问后续行动,我会澄清。

import requests
from multiprocessing.dummy import Pool as ThreadPool
from src_code.config import Config

        with open(Config.API_PATH + '/api_security_key.pem') as f:
            my_key = f.read().rstrip("\n")
            f.close()
        base_url = "https://api.my_api_destination.com/v1"
        headers = {"Authorization": "Bearer %s" % my_key}
        itm = list()
        itm.append(base_url)
        itm.append(headers)


        def call_API(call_var):
            base_url = call_var[0]
            headers = call_var[1]
            call_specific_tag = call_var[2]

            endpoint = f'/api_path/{call_specific_tag}'

            connection_tries = 0
            for i in range(3):
                try:
                    dat = requests.get((base_url + endpoint), headers=headers).json()
                except:
                    connection_tries += 1
                    print(f'Call for {api_specific_tag} failed after {i} attempt(s).  Pausing for 240 seconds.')
                    time.sleep(240)
                else:
                    break

            tag = list()
            vars_to_capture_01 = list()
            vars_to_capture_02 = list()

            connection_tries = 0

            try:
                if 'record_id' in dat:
                    vars_to_capture_01.append(dat['record_id'])
                    vars_to_capture_02.append(dat['second_item_of_interest'])
                else:
                    vars_to_capture_01.append(call_specific_tag)
                    print(f'Call specific tag {call_specific_tag} is unavailable.  Successful pull.')
                    vars_to_capture_02.append(-1)

            except:
                    print(f'{call_specific_tag} is unavailable.  Unsuccessful pull.')
                    vars_to_capture_01.append(call_specific_tag)
                    vars_to_capture_02.append(-1)
                    time.sleep(240)

            pack = list()
            pack.append(vars_to_capture_01)
            pack.append(vars_to_capture_02)

            return pack

        vars_to_capture_01 = list()
        vars_to_capture_02 = list()

        i = 0
        max_i = len(all_tags)
        while i < max_i:
            ind_rng = range(i, min((i + 10), (max_i)), 1)
            itm_lst = (itm.copy())
            call_var = [itm_lst + [all_tags[q]] for q in ind_rng]
            #packed = call_API(call_var[0]) # for testing of function without pooling
            pool = ThreadPool(len(call_var))
            packed = pool.map(call_API, call_var)
            pool.close()
            pool.join()
            for pack in packed:
                try:
                    vars_to_capture_01.append(pack[0][0])
                except:
                    print(f'Unpacking error for {all_tags[i]}.')
                vars_to_capture_02.append(pack[1][0])
于 2020-12-31T11:44:48.730 回答
1

对于网络 API 请求,您可以使用 asyncio。查看这篇文章https://realpython.com/python-concurrency/#asyncio-version以获取如何实现它的示例。

于 2020-12-31T07:13:57.147 回答